Hi! Welcome to RecordTrac's technical documentation. Here you'll find the source code for the Flask application. A good place to start is by browsing views.py, models.py and prr.py. All of the templates referenced can be found here. A full table of contents is below.
db_helpers.py
CSV export""" .. module:: db_helpers :synopsis: Functions that interact with the Postgres database via Flask-SQLAlchemy .. modlueauthor:: Richa Agarwal <richa@codeforamerica.org> """ from public_records_portal import db, app from models import * from datetime import datetime, timedelta from sqlalchemy.exc import IntegrityError, InvalidRequestError from sqlalchemy import func, not_, and_, or_ from sqlalchemy.dialects import postgresql import uuid import json import os import logging ### @export "get_subscriber" def get_subscriber(request_id, user_id): # Returns the subscriber for a given request by user ID if request_id and user_id: return Subscriber.query.filter_by(user_id = user_id).filter_by(request_id = request_id).first() return None ### @export "get_count" def get_count(obj_type): return db.session.query(func.count(eval(obj_type).id)).scalar() ### @export "get_obj" def get_obj(obj_type, obj_id): """ Query the database for an object via its class/type (defined in models.py) and ID and return the object. """ if not obj_id: return None return eval(obj_type).query.get(obj_id) ### @export "get_objs" def get_objs(obj_type): """ Query the database for all objects of a certain class/type (defined in models.py) and return queryset. """ # There has to be a better way of doing this if obj_type == "User": return User.query.all() elif obj_type == "Request": return Request.query.all() elif obj_type == "Owner": return Owner.query.all() elif obj_type == "Note": return Note.query.all() elif obj_type == "QA": return QA.query.all() elif obj_type == "Subscriber": return Subscriber.query.all() elif obj_type == "Record": return Record.query.all() return None ### @export "get_avg_response_time" def get_avg_response_time(department): app.logger.info("\n\nCalculating average response time for department: %s" % department) d = Department.query.filter_by(name = department).first() response_time = None num_closed = 0 for request in d.requests: date_created = request.date_received or request.date_created if request.status and 'Closed' in request.status: if response_time: response_time = response_time + (request.status_updated - date_created).total_seconds() else: response_time = (request.status_updated - date_created).total_seconds() num_closed = num_closed + 1 if num_closed > 0: avg = response_time / num_closed return avg return None ### @export "get_request_by_owner" def get_request_by_owner(owner_id): """ Return the request that a particular owner belongs to """ if not owner_id: return None return Owner.query.get(owner_id).request ### @export "get_owners_by_user_id" def get_owners_by_user_id(user_id): """ Return the queryset of owners for a particular user. (A user can be associated with multiple owners).""" if not user_id: return None return Owner.query.filter_by(user_id = user_id) ### @export "get_prr_liaison_by_dept" def get_contact_by_dept(dept): """ Return the contact for a given department. """ q = db.session.query(User).filter(func.lower(User.contact_for).like("%%%s%%" % dept.lower())) if len(q.all()) > 0: return q[0].email app.logger.debug("Department: %s" % dept) return None ### @export "get_backup_by_dept" def get_backup_by_dept(dept): """ Return the contact for a given department. """ q = db.session.query(User).filter(func.lower(User.backup_for).like("%%%s%%" % dept.lower())) if len(q.all()) > 0: return q[0].email app.logger.debug("Department: %s" % dept) return None ### @export "put_obj" def put_obj(obj): """ Add and commit the object to the database. Return true if successful. """ if obj: db.session.add(obj) db.session.commit() app.logger.info("\n\nCommitted object to database: %s" % obj) return True return False ### @export "get_attribute" def get_attribute(attribute, obj_id = None, obj_type = None, obj = None): """ Obtain the object by obj_id and obj_type if obj is not provided, and return the specified attribute for that object. """ if obj_id and obj_type: obj = get_obj(obj_type, obj_id) if obj: try: return getattr(obj, attribute) except: return None return None ### @export "update_obj" def update_obj(attribute, val, obj_type = None, obj_id = None, obj = None): """ Obtain the object by obj_id and obj_type if obj is not provided, and update the specified attribute for that object. Return true if successful. """ app.logger.info("\n\nUpdating attribute: %s with value: %s for obj_type: %s, obj_id: %s, obj: %s"%(attribute, val,obj_type, obj_id, obj)) if obj_id and obj_type: obj = get_obj(obj_type, obj_id) if obj: try: setattr(obj, attribute, val) db.session.add(obj) db.session.commit() return True except: return False return False ### @export "create_QA" def create_QA(request_id, question, user_id): """ Create a QA object and return the ID. """ qa = QA(request_id = request_id, question = question, user_id = user_id) db.session.add(qa) db.session.commit() return qa.id ### @export "create_request" def create_request(text, user_id, offline_submission_type = None, date_received = None): """ Create a Request object and return the ID. """ req = Request(text = text, creator_id = user_id, offline_submission_type = offline_submission_type, date_received = date_received) db.session.add(req) db.session.commit() req.set_due_date() return req.id ### @export "create_subscriber" def create_subscriber(request_id, user_id): """ Create a Subscriber object and return the ID. """ subscriber = Subscriber.query.filter_by(request_id = request_id, user_id = user_id).first() if not subscriber: subscriber = Subscriber(request_id = request_id, user_id = user_id) db.session.add(subscriber) db.session.commit() return subscriber.id, True return subscriber.id, False ### @export "create_note" def create_note(request_id, text, user_id): """ Create a Note object and return the ID. """ try: note = Note(request_id = request_id, text = text, user_id = user_id) put_obj(note) return note.id except Exception, e: app.logger.info("\n\nThere was an issue with creating a note with text: %s %s" % (text, e)) return None ### @export "create_record" def create_record(request_id, user_id, description, doc_id = None, filename = None, access = None, url = None): try: record = Record(doc_id = doc_id, request_id = request_id, user_id = user_id, description = description, filename = filename, url = url, access = access) put_obj(record) return record.id except Exception, e: app.logger.info("\n\nThere was an issue with creating a record: %s" % e) return None def remove_obj(obj_type, obj_id): obj = get_obj(obj_type, obj_id) db.session.delete(obj) db.session.commit() ### @export "create_answer" def create_answer(qa_id, subscriber_id, answer): qa = get_obj("QA", qa_id) if not qa: app.logger.info("\n\nQA with id: %s does not exist" % (qa_id)) return None qa.subscriber_id = subscriber_id qa.answer = answer db.session.add(qa) db.session.commit() return qa.request_id # Following three functions are for integration with Mozilla Persona ### @export "get_user" def get_user(kwargs): return User.query.filter(User.email == kwargs.get('email')).filter(User.is_staff == True).first() ### @export "get_user_by_id" def get_user_by_id(id): return User.query.get(id) ### @export "create_or_return_user" def create_or_return_user(email=None, alias = None, phone = None, department = None, contact_for = None, backup_for = None, not_id = False, is_staff = None): app.logger.info("\n\nCreating or returning user...") if email: user = User.query.filter(User.email == func.lower(email)).first() if department and type(department) != int and not department.isdigit(): d = Department.query.filter_by(name = department).first() if d: department = d.id else: d = Department(name = department) db.session.add(d) db.session.commit() department = d.id if not user: user = create_user(email = email.lower(), alias = alias, phone = phone, department = department, contact_for = contact_for, backup_for = backup_for, is_staff = is_staff) else: if alias or phone or department or contact_for or backup_for: # Update user if fields to update are provided user = update_user(user = user, alias = alias, phone = phone, department = department, contact_for = contact_for, backup_for = backup_for, is_staff = is_staff) if not_id: return user return user.id else: user = create_user(alias = alias, phone = phone, is_staff = is_staff) return user.id ### @export "create_user" def create_user(email=None, alias = None, phone = None, department = None, contact_for = None, backup_for = None, is_staff = None): user = User(email = email, alias = alias, phone = phone, department = department, contact_for = contact_for, backup_for = backup_for, is_staff = is_staff) db.session.add(user) db.session.commit() app.logger.info("\n\nCreated new user, alias: %s id: %s" % (user.alias, user.id)) return user ### @export "update_user" def update_user(user, alias = None, phone = None, department = None, contact_for = None, backup_for = None, is_staff = None): if alias: user.alias = alias if phone: user.phone = phone if department: if type(department) != int and not department.isdigit(): d = Department.query.filter_by(name = department).first() if d: user.department = d.id else: user.department = department if contact_for: if user.contact_for and contact_for not in user.contact_for: contact_for = user.contact_for + "," + contact_for user.contact_for = contact_for if backup_for: if user.backup_for and backup_for not in user.backup_for: backup_for = user.backup_for + "," + backup_for user.backup_for = backup_for if is_staff: user.is_staff = is_staff db.session.add(user) db.session.commit() app.logger.info("\n\nUpdated user %s, alias: %s phone: %s department: %s" % (user.id, alias, phone, department)) return user ### @export "create_owner" def create_owner(request_id, reason, email = None, user_id = None): """ Adds a staff member to the request without assigning them as current owner. (i.e. "participant") Useful for multidepartmental requests.""" if not user_id: user_id = create_or_return_user(email = email) participant = Owner(request_id = request_id, user_id = user_id, reason = reason) db.session.add(participant) db.session.commit() app.logger.info("\n\nCreated owner with id: %s" % participant.id) return participant.id ### @export "change_request_status" def change_request_status(request_id, status): req = get_obj("Request", request_id) req.status = status req.status_updated = datetime.now().isoformat() db.session.add(req) app.logger.info("\n\nChanged status for request: %s to %s" % (request_id, status)) db.session.commit() ### @export "find_request" def find_request(text): req = Request.query.filter_by(text = text).first() if req: return req.id return None ### @export "add_staff_participant" def add_staff_participant(request_id, is_point_person = False, email = None, user_id = None, reason = None): """ Creates an owner for the request if it doesn't exist, and returns the owner ID and True if a new one was created. Returns the owner ID and False if existing.""" is_new = True if not user_id: user_id = create_or_return_user(email = email) participant = Owner.query.filter_by(request_id = request_id, user_id = user_id, active = True).first() if not participant: if not reason: reason = "Added a response" participant = Owner(request_id = request_id, user_id = user_id, reason = reason, is_point_person = is_point_person) app.logger.info("\n\nStaff participant with owner ID: %s added to request %s. Is point of contact: %s" %(participant.id, request_id, is_point_person)) else: if is_point_person and not participant.is_point_person: participant.is_point_person = True participant.date_updated = datetime.now().isoformat() if reason: # Update the reason participant.reason = reason app.logger.info("\n\nStaff participant with owner ID: %s is now the point of contact for request %s" %(participant.id, request_id)) else: is_new = False app.logger.info("\n\nStaff participant with owner ID: %s already active on request %s" %(participant.id, request_id)) db.session.add(participant) db.session.commit() return participant.id, is_new ### @export "remove_staff_participant" def remove_staff_participant(owner_id, reason = None): participant = Owner.query.get(owner_id) participant.active = False participant.date_updated = datetime.now().isoformat() participant.reason_unassigned = reason db.session.add(participant) db.session.commit() app.logger.info("\n\n Staff participant with owner ID: %s has been removed for following reason %s" %(owner_id, reason)) return owner_id ### @export "update_subscriber" def update_subscriber(request_id, alias, phone): """ Update a subscriber for a given request with the name and phone number provided. """ user_id = create_or_return_user(alias = alias, phone = phone) r = Request.query.get(request_id) sub = r.subscribers[0] sub.user_id = user_id db.session.add(sub) db.session.commit() app.logger.info("\n\nUpdated subscriber for request %s with alias: %s and phone: %s" % (request_id, alias, phone))
csv_export.py
Notifications""" public_records_portal.csv_export ~~~~~~~~~~~~~~~~ Implements an export to CSV function (for staff only) for relevant database fields. """ from public_records_portal import models, db import csv def export(): records = models.Request.query.order_by(models.Request.id).all() db_headers = ['id', 'text', 'date_received', 'date_created', 'due_date', 'extended'] all_headers = ['Request ID', 'Request Text', 'Date Received', 'Date Created', 'Date Due', 'Extended?', 'Requester Name', 'Requester Phone', 'Department Name', 'Point of Contact', 'All staff involved', 'Status'] yield '\t'.join(all_headers) + '\n' for curr in records: row = [] for name in db_headers: if name == 'text': text = getattr(curr,'text') text = text.replace('\n', '').replace('\r', '').replace('\t', '') text = text.encode('utf8') # print text row.append(str(text)) continue row.append(str(getattr(curr,name))) row.append(str(curr.requester_name().encode('utf8'))) row.append(str(curr.requester_phone())) row.append(str(curr.department_name())) row.append(str(curr.point_person_name())) row.append(str(','.join(curr.all_owners()))) row.append(str(curr.solid_status(cron_job = True))) yield '\t'.join(row) + '\n'
notifications.py
API and admin setup""" public_records_portal.notifications ~~~~~~~~~~~~~~~~ Implements e-mail notifications for RecordTrac. SendGrid (https://sendgrid.com/) is a dependency, and the following environment variables need to be set in order for this to work: MAIL_USERNAME, MAIL_PASSWORD, and DEFAULT_MAIL_SENDER. """ from datetime import datetime, timedelta from public_records_portal import app import os import json from db_helpers import * import sendgrid from flask import render_template import helpers import logging # Set flags: send_emails = False test = "[TEST] " if app.config['ENVIRONMENT'] == 'PRODUCTION': send_emails = True test = "" elif 'DEV_EMAIL' in app.config: send_emails = True ### @export "generate_prr_emails" def generate_prr_emails(request_id, notification_type, user_id = None): app.logger.info("\n\n Generating e-mails for request with ID: %s, notification type: %s, and user ID: %s" %(request_id, notification_type, user_id)) app_url = app.config['APPLICATION_URL'] # Define the e-mail template: template = "generic_email.html" if notification_type == "Request made": template = "new_request_email.html" # Get information on who to send the e-mail to and with what subject line based on the notification type: email_info = get_email_info(notification_type=notification_type) email_subject = "Public Records Request %s: %s" %(request_id, email_info["Subject"]) recipient_types = email_info["Recipients"] include_unsubscribe_link = True unfollow_link = None for recipient_type in recipient_types: # Skip anyone that has unsubscribed if user_id and (recipient_type == "Requester" or recipient_type == "Subscriber"): subscriber = get_subscriber(request_id = request_id, user_id = user_id) should_notify = get_attribute(attribute = "should_notify", obj = subscriber) if should_notify == False: app.logger.info("\n\nSubscriber %s unsubscribed, no notification sent." % subscriber.id) continue # Set up the e-mail page = "%srequest/%s" %(app_url,request_id) # The request URL if "Staff" in recipient_type: page = "%scity/request/%s" %(app_url,request_id) include_unsubscribe_link = False # Gets excluded for city staff else: unfollow_link = "%sunfollow/%s/" %(app_url, request_id) if notification_type == "Request closed": page = "%sfeedback/request/%s" %(app_url,request_id) if recipient_type in ["Staff owner","Requester","Subscriber","Staff participant"]: if user_id: recipient = get_attribute(attribute = "email", obj_id = user_id, obj_type = "User") # if recipient_type != "Subscriber" or get_attribute(attribute="") if recipient: if unfollow_link: unfollow_link = unfollow_link + recipient send_prr_email(page = page, recipients = [recipient], subject = email_subject, template = template, include_unsubscribe_link = include_unsubscribe_link, unfollow_link = unfollow_link) else: app.logger.debug("\n\n No user ID provided") elif recipient_type == "Subscribers": subscribers = get_attribute(attribute = "subscribers", obj_id = request_id, obj_type = "Request") for subscriber in subscribers: if subscriber.should_notify == False: app.logger.info("\n\n Subscriber %s unsubscribed" % subscriber.id) continue recipient = get_attribute(attribute = "email", obj_id = subscriber.user_id, obj_type = "User") if recipient: if unfollow_link: unfollow_link = unfollow_link + recipient send_prr_email(page = page, recipients = [recipient], subject = email_subject, template = template, include_unsubscribe_link = include_unsubscribe_link, unfollow_link = unfollow_link) # Each subscriber needs to get a separate e-mail. elif recipient_type == "Staff participants": recipients = [] participants = get_attribute(attribute = "owners", obj_id = request_id, obj_type = "Request") for participant in participants: if participant.active: # Only send an e-mail if they are active in the request recipient = get_attribute(attribute = "email", obj_id = participant.user_id, obj_type = "User") if recipient: recipients.append(recipient) send_prr_email(page = page, recipients = recipients, subject = email_subject, template = template, include_unsubscribe_link = include_unsubscribe_link, cc_everyone = False, unfollow_link = unfollow_link) app.logger.info("\n\nRecipients: %s" % recipients) else: app.logger.info("Not a valid recipient type: %s" % recipient_type) ### @export "send_prr_email" def send_prr_email(page, recipients, subject, template, include_unsubscribe_link = True, cc_everyone = False, password = None, unfollow_link = None): app.logger.info("\n\nAttempting to send an e-mail to %s with subject %s, referencing page %s and template %s" % (recipients, subject, page, template)) if recipients: if send_emails: try: send_email(body = render_template(template, unfollow_link = unfollow_link, page = page, password = password), recipients = recipients, subject = subject, include_unsubscribe_link = include_unsubscribe_link, cc_everyone = cc_everyone) app.logger.info("\n\n E-mail sent successfully!") except Exception, e: app.logger.info("\n\nThere was an error sending the e-mail: %s" % e) else: app.logger.info("\n\n E-mail flag turned off, no e-mails sent.") ### @export "send_email" def send_email(body, recipients, subject, include_unsubscribe_link = True, cc_everyone = False): mail = sendgrid.Sendgrid(app.config['MAIL_USERNAME'], app.config['MAIL_PASSWORD'], secure = True) sender = app.config['DEFAULT_MAIL_SENDER'] plaintext = "" html = body message = sendgrid.Message(sender, subject, plaintext, html) if not include_unsubscribe_link: message.add_filter_setting("subscriptiontrack", "enable", 0) if 'DEV_EMAIL' in app.config: recipients = [app.config['DEV_EMAIL']] if cc_everyone: # Not being used for now message.add_to(recipients[0]) for recipient in recipients: # if should_notify(recipient): message.add_cc(recipient) else: for recipient in recipients: # if should_notify(recipient): message.add_to(recipient) message.add_bcc(sender) if send_emails: app.logger.info("\n\n Attempting to send e-mail with body: %s, subject: %s, to %s" %(body, subject, recipients)) try: status = mail.web.send(message) if status == False: app.logger.info("\n\nSendgrid did not deliver e-mail.") return status except Exception, e: app.logger.error("\n\nNo e-mail was sent, error: %s" % e) return False app.logger.info("\n\nNo e-mail was sent, probably because you're in a non-production environment.") return False ### @export "due_date" def due_date(date_obj, extended = None, format = True): days_to_fulfill = 10 if extended == True: days_to_fulfill = days_to_fulfill + 14 if not date_obj: return None if type(date_obj) is not datetime: date_obj = datetime.strptime(date_obj, "%Y-%m-%dT%H:%M:%S.%f") due_date = date_obj + timedelta(days = days_to_fulfill) if format: return format_date(due_date) return due_date ### @export "is_overdue" def is_overdue(date_obj, extended = None): current_date = datetime.now() due = due_date(date_obj = date_obj, extended = extended, format = False) if (current_date >= due): return True, due return False, due ### @export "get_email_info" def get_email_info(notification_type): email_json = open(os.path.join(app.root_path, 'static/json/emails.json')) json_data = json.load(email_json) return json_data["Notification types"][notification_type] ### @export "notify_due" def notify_due(): requests = get_objs("Request") email_json = open(os.path.join(app.root_path, 'static/json/emails.json')) json_data = json.load(email_json) for req in requests: status = req.solid_status if status != "closed": # Check if it is due in 2 days if status == "due soon": change_request_status(req.id, "Due soon") email_subject = "%sPublic Records Request %s: %s" %(test, req.id, json_data["Notification types"]["Request due"]) elif status == "overdue": change_request_status(req.id, "Overdue") email_subject = "%sPublic Records Request %s: %s" %(test, req.id, json_data["Notification types"]["Request overdue"]["Subject"]) else: continue recipients = get_staff_recipients(req) app_url = app.config['APPLICATION_URL'] page = "%scity/request/%s" %(app_url,req.id) body = "You can view the request and take any necessary action at the following webpage: <a href='%s'>%s</a>.</br></br> This is an automated message. You are receiving it because you are listed as the Public Records Request Liaison, Backup or Supervisor for your department." %(page, page) # Need to figure out a way to pass in generic email template outside application context. For now, hardcoding the body. send_email(body = body, recipients = recipients, subject = email_subject, include_unsubscribe_link = False) ### @export "get_staff_recipients" def get_staff_recipients(request): recipients = [] owner_email = request.point_person().user.email if owner_email: recipients.append(owner_email) # Look up the department for the request, and get the contacts and backup: dept = request.department_name() if dept != "N/A": contact_email = get_contact_by_dept(dept) if contact_email and contact_email not in recipients: recipients.append(contact_email) backup_email = get_backup_by_dept(dept) if backup_email and backup_email not in recipients: recipients.append(backup_email) if recipients: return recipients else: raise ValueError('No staff recipients for request %s' %(request.id)) ### @export "should_notify" def should_notify(user_email): """ Looks up the user in do_not_email.json and returns False if found. """ do_not_email = open(os.path.join(app.root_path, 'static/json/do_not_email.json')) json_data = json.load(do_not_email) for department in json_data: emails = json_data[department]['Emails'] for email in emails: if email.lower() == user_email.lower(): return False return True ### @export "format_date" def format_date(obj): """ Take a datetime object and return it in format Jun 12, 2013 """ if not obj: return None return helpers.localize(obj).strftime('%b %d, %Y')
prflask.py
Core public records requests functions""" public_records_portal.prflask ~~~~~~~~~~~~~~~~ Sets up API and admin endpoints for the RecordTrac flask application. """ from public_records_portal import app, models, db, views from views import * # Import all the functions that render templates from flask.ext.restless import APIManager from flask.ext.admin import Admin, expose, BaseView, AdminIndexView from flask.ext.admin.contrib.sqlamodel import ModelView # Create API manager = APIManager(app, flask_sqlalchemy_db=db) # The endpoints created are /api/object, e.g. publicrecordsareawesome.com/api/request/ manager.create_api(models.Request, methods=['GET'], results_per_page = 10, allow_functions = True, include_columns=['date_created', 'date_received', 'department', 'id', 'notes', 'offline_submission_type', 'owners', 'qas', 'records', 'status', 'status_updated', 'text']) # manager.create_api(models.Owner, methods=['GET'], results_per_page = 10, allow_functions = True) manager.create_api(models.Note, methods=['GET'], results_per_page = 10, allow_functions = True) manager.create_api(models.Record, methods=['GET'], results_per_page = 10, allow_functions = True) manager.create_api(models.QA, methods=['GET'], results_per_page =10, allow_functions = True) # manager.create_api(models.Subscriber, methods=['GET'], results_per_page = 10, allow_functions = True) manager.create_api(models.Visualization, methods=['GET'], results_per_page = 10, allow_functions = True) class HomeView(AdminIndexView): @expose('/') def home(self): return self.render('admin.html') def is_accessible(self): if current_user.is_authenticated(): if 'LIST_OF_ADMINS' in app.config: admins = app.config['LIST_OF_ADMINS'].split(",") if current_user.email.lower() in admins: return True return False # Create Admin admin = Admin(app, name='RecordTrac Admin', url='/admin', index_view = HomeView(name='Home')) class AdminView(ModelView): def is_accessible(self): if current_user.is_authenticated(): if 'LIST_OF_ADMINS' in app.config: admins = app.config['LIST_OF_ADMINS'].split(",") if current_user.email.lower() in admins: return True return False class RequestView(AdminView): can_create = False can_edit = True column_list = ('id', 'text', 'date_created', 'status') # The fields the admin can view column_searchable_list = ('status', 'text') # The fields the admin can search a request by form_excluded_columns = ('date_created', 'extended', 'status', 'status_updated', 'current_owner') # The fields the admin cannot edit. class RecordView(AdminView): can_create = False column_searchable_list = ('description', 'filename', 'url', 'download_url', 'access') column_list = ('request_id', 'description', 'filename', 'url', 'download_url', 'access') can_edit = False class QAView(AdminView): can_create = False can_edit = True column_list = ('request_id', 'question', 'answer', 'date_created') form_excluded_columns = ('date_created') class NoteView(AdminView): can_create = False can_edit = True column_list = ('request_id', 'text', 'date_created') form_excluded_columns = ('date_created') admin.add_view(RequestView(models.Request, db.session)) admin.add_view(RecordView(models.Record, db.session)) admin.add_view(NoteView(models.Note, db.session)) admin.add_view(QAView(models.QA, db.session))
prr.py
The 'Request' view""" public_records_portal.prr ~~~~~~~~~~~~~~~~ Implements functions specific to managing or creating a public records request. """ from public_records_portal import app, db_helpers import os, time, json from flask import Flask, request from flask.ext.login import current_user from datetime import datetime, timedelta from db_helpers import find_request, create_request, get_obj, add_staff_participant, remove_staff_participant, update_obj, get_attribute, change_request_status, create_or_return_user, create_subscriber, create_record, create_note, create_QA, create_answer from models import * from ResponsePresenter import ResponsePresenter from RequestPresenter import RequestPresenter from notifications import generate_prr_emails import scribd_helpers from spam import is_spam import logging import csv import urllib ### @export "add_resource" def add_resource(resource, request_body, current_user_id = None): fields = request_body if "extension" in resource: return request_extension(int(fields['request_id']), fields.getlist('extend_reason'), current_user_id) if "note" in resource: return add_note(request_id = int(fields['request_id']), text = fields['note_text'], user_id = current_user_id, passed_spam_filter = True) # Bypass spam filter because they are logged in. elif "record" in resource: if fields['record_description'] == "": return "When uploading a record, please fill out the 'summary' field." if 'record_access' in fields and fields['record_access'] != "": return add_offline_record(int(fields['request_id']), fields['record_description'], fields['record_access'], current_user_id) elif 'link_url' in fields and fields['link_url'] != "": return add_link(request_id = int(fields['request_id']), url = fields['link_url'], description = fields['record_description'], user_id = current_user_id) else: document = None try: document = request.files['record'] except: app.logger.info("\n\nNo file passed in") return upload_record(request_id = int(fields['request_id']), document = document, description = fields['record_description'], user_id = current_user_id) elif "qa" in resource: return ask_a_question(request_id = int(fields['request_id']), user_id = current_user_id, question = fields['question_text']) elif "owner" in resource: participant_id, new = add_staff_participant(request_id = fields['request_id'], email = fields['owner_email'], reason = fields['owner_reason']) if new: generate_prr_emails(request_id = fields['request_id'], notification_type = "Staff participant added", user_id = get_attribute("user_id", obj_id = participant_id, obj_type = "Owner")) return participant_id elif "subscriber" in resource: return add_subscriber(request_id=fields['request_id'], email = fields['follow_email']) else: return False ### @export "update_resource" def update_resource(resource, request_body): fields = request_body if "owner" in resource: if "reason_unassigned" in fields: return remove_staff_participant(owner_id = fields['owner_id'], reason = fields['reason_unassigned']) else: change_request_status(int(fields['request_id']), "Rerouted") return assign_owner(int(fields['request_id']), fields['owner_reason'], fields['owner_email']) elif "reopen" in resource: change_request_status(int(fields['request_id']), "Reopened") return fields['request_id'] elif "request_text" in resource: update_obj(attribute = "text", val = fields['request_text'], obj_type = "Request", obj_id = fields['request_id']) elif "note_text" in resource: update_obj(attribute = "text", val = fields['note_text'], obj_type = "Note", obj_id = fields['response_id']) # Need to store note text somewhere else (or just do delete here as well) elif "note_delete" in resource: # Need to store note somewhere else remove_obj("Note", int(fields['response_id'])) elif "record_delete" in resource: remove_obj("Record", int(fields['record_id'])) # Need to store record somewhere else and prompt them to delete from Scribd as well, if they'd like else: return False ### @export "request_extension" def request_extension(request_id, extension_reasons, user_id): req = Request.query.get(request_id) req.extension() text = "Request extended:" for reason in extension_reasons: text = text + reason + "</br>" add_staff_participant(request_id = request_id, user_id = user_id) return add_note(request_id = request_id, text = text, user_id = user_id, passed_spam_filter = True) # Bypass spam filter because they are logged in. ### @export "add_note" def add_note(request_id, text, user_id = None, passed_spam_filter = False): if not text or text == "" or (not passed_spam_filter): return False note_id = create_note(request_id = request_id, text = text, user_id = user_id) if note_id: change_request_status(request_id, "A response has been added.") if user_id: add_staff_participant(request_id = request_id, user_id = user_id) generate_prr_emails(request_id = request_id, notification_type = "City response added") else: generate_prr_emails(request_id = request_id, notification_type = "Public note added") return note_id return False ### @export "upload_record" def upload_record(request_id, description, user_id, document = None): """ Creates a record with upload/download attributes """ try: doc_id, filename = scribd_helpers.upload_file(document = document, request_id = request_id) except: return "The upload timed out, please try again." if doc_id == False: return "Extension type '%s' is not allowed." % filename else: if str(doc_id).isdigit(): record_id = create_record(doc_id = doc_id, request_id = request_id, user_id = user_id, description = description, filename = filename, url = app.config['HOST_URL'] + doc_id) change_request_status(request_id, "A response has been added.") generate_prr_emails(request_id = request_id, notification_type = "City response added") add_staff_participant(request_id = request_id, user_id = user_id) return record_id return "There was an issue with your upload." ### @export "add_offline_record" def add_offline_record(request_id, description, access, user_id): """ Creates a record with offline attributes """ record_id = create_record(request_id = request_id, user_id = user_id, access = access, description = description) # To create an offline record, we need to know the request ID to which it will be added, the user ID for the person adding the record, how it can be accessed, and a description/title of the record. if record_id: change_request_status(request_id, "A response has been added.") generate_prr_emails(request_id = request_id, notification_type = "City response added") add_staff_participant(request_id = request_id, user_id = user_id) return record_id return False ### @export "add_link" def add_link(request_id, url, description, user_id): """ Creates a record with link attributes """ record_id = create_record(url = url, request_id = request_id, user_id = user_id, description = description) if record_id: change_request_status(request_id, "A response has been added.") generate_prr_emails(request_id = request_id, notification_type = "City response added") add_staff_participant(request_id = request_id, user_id = user_id) return record_id return False ### @export "make_request" def make_request(text, email = None, user_id = None, phone = None, alias = None, department = None, passed_spam_filter = False, offline_submission_type = None, date_received = None): """ Make the request. At minimum you need to communicate which record(s) you want, probably with some text.""" if not passed_spam_filter: return None, False request_id = find_request(text) if request_id: # Same request already exists return request_id, False assigned_to_email = app.config['DEFAULT_OWNER_EMAIL'] assigned_to_reason = app.config['DEFAULT_OWNER_REASON'] if department: app.logger.info("\n\nDepartment chosen: %s" %department) prr_email = db_helpers.get_contact_by_dept(department) if prr_email: assigned_to_email = prr_email assigned_to_reason = "PRR Liaison for %s" %(department) else: app.logger.info("%s is not a valid department" %(department)) department = None request_id = create_request(text = text, user_id = user_id, offline_submission_type = offline_submission_type, date_received = date_received) # Actually create the Request object new_owner_id = assign_owner(request_id = request_id, reason = assigned_to_reason, email = assigned_to_email) # Assign someone to the request open_request(request_id) # Set the status of the incoming request to "Open" if email or alias or phone: subscriber_user_id = create_or_return_user(email = email, alias = alias, phone = phone) subscriber_id, is_new_subscriber = create_subscriber(request_id = request_id, user_id = subscriber_user_id) if subscriber_id: generate_prr_emails(request_id, notification_type = "Request made", user_id = subscriber_user_id) # Send them an e-mail notification return request_id, True ### @export "add_subscriber" def add_subscriber(request_id, email): user_id = create_or_return_user(email = email) subscriber_id, is_new_subscriber = create_subscriber(request_id = request_id, user_id = user_id) if subscriber_id: generate_prr_emails(request_id, notification_type = "Request followed", user_id = user_id) return subscriber_id return False ### @export "ask_a_question" def ask_a_question(request_id, user_id, question): """ City staff can ask a question about a request they are confused about.""" req = get_obj("Request", request_id) qa_id = create_QA(request_id = request_id, question = question, user_id = user_id) if qa_id: change_request_status(request_id, "Pending") requester = req.requester() if requester: generate_prr_emails(request_id, notification_type = "Question asked", user_id = requester.user_id) add_staff_participant(request_id = request_id, user_id = user_id) return qa_id return False ### @export "answer_a_question" def answer_a_question(qa_id, answer, subscriber_id = None, passed_spam_filter = False): """ A requester can answer a question city staff asked them about their request.""" if (not answer) or (answer == "") or (not passed_spam_filter): return False else: request_id = create_answer(qa_id, subscriber_id, answer) # We aren't changing the request status if someone's answered a question anymore, but we could change_request_status(request_id, "Pending") generate_prr_emails(request_id = request_id, notification_type = "Question answered") return True ### @export "open_request" def open_request(request_id): change_request_status(request_id, "Open") ### @export "assign_owner" def assign_owner(request_id, reason, email = None): """ Called any time a new owner is assigned. This will overwrite the current owner.""" req = get_obj("Request", request_id) past_owner_id = None # If there is already an owner, unassign them: if req.point_person(): past_owner_id = req.point_person().id past_owner = get_obj("Owner", req.point_person().id) update_obj(attribute = "is_point_person", val = False, obj = past_owner) owner_id, is_new_owner = add_staff_participant(request_id = request_id, reason = reason, email = email, is_point_person = True) if (past_owner_id == owner_id): # Already the current owner, so don't send any e-mails return owner_id app.logger.info("\n\nA new owner has been assigned: Owner: %s" % owner_id) new_owner = get_obj("Owner", owner_id) # Update the associated department on request update_obj(attribute = "department_id", val = new_owner.user.department, obj = req) user_id = get_attribute(attribute = "user_id", obj_id = owner_id, obj_type = "Owner") # Send notifications if is_new_owner: generate_prr_emails(request_id = request_id, notification_type = "Request assigned", user_id = user_id) return owner_id ### @export "get_request_data_chronologically" def get_request_data_chronologically(req): public = False if current_user.is_anonymous(): public = True responses = [] if not req: return responses for i, note in enumerate(req.notes): if not note.user_id: responses.append(RequestPresenter(note = note, index = i, public = public, request = req)) for i, qa in enumerate(req.qas): responses.append(RequestPresenter(qa = qa, index = i, public = public, request = req)) if not responses: return responses responses.sort(key = lambda x:x.date(), reverse = True) return responses ### @export "get_responses_chronologically" def get_responses_chronologically(req): responses = [] if not req: return responses for note in req.notes: if note.user_id: responses.append(ResponsePresenter(note = note)) for record in req.records: responses.append(ResponsePresenter(record = record)) if not responses: return responses responses.sort(key = lambda x:x.date(), reverse = True) if "Closed" in req.status: responses[0].set_icon("icon-archive") # Set most recent note (closed note)'s icon return responses ### @export "set_directory_fields" def set_directory_fields(): # Set basic user data if 'STAFF_URL' in app.config: # This gets run at regular internals via db_users.py in order to keep the staff user list up to date. Before users are added/updated, ALL users get reset to 'inactive', and then only the ones in the current CSV are set to active. for user in User.query.filter(User.is_staff == True).all(): update_user(user = user, is_staff = False) csvfile = urllib.urlopen(app.config['STAFF_URL']) dictreader = csv.DictReader(csvfile, delimiter=',') for row in dictreader: create_or_return_user(email = row['email'].lower(), alias = row['name'], phone = row['phone number'], department = row['department name'], is_staff = True) # Set liaisons data (who is a PRR liaison for what department) if 'LIAISONS_URL' in app.config: csvfile = urllib.urlopen(app.config['LIAISONS_URL']) dictreader = csv.DictReader(csvfile, delimiter=',') for row in dictreader: user = create_or_return_user(email = row['PRR liaison'], contact_for = row['department name']) if row['PRR backup'] != "": user = create_or_return_user(email = row['PRR backup'], backup_for = row['department name']) else: app.logger.info("\n\n Please update the config variable LIAISONS_URL for where to find department liaison data for your agency.") else: app.logger.info("\n\n Please update the config variable STAFF_URL for where to find csv data on the users in your agency.") if 'DEFAULT_OWNER_EMAIL' in app.config and 'DEFAULT_OWNER_REASON' in app.config: create_or_return_user(email = app.config['DEFAULT_OWNER_EMAIL'].lower(), alias = app.config['DEFAULT_OWNER_EMAIL'], department = app.config['DEFAULT_OWNER_REASON'], is_staff = True) app.logger.info("\n\n Creating a single user from DEFAULT_OWNER_EMAIL and DEFAULT_OWNER_REASON for now. You may log in with %s" %(app.config['DEFAULT_OWNER_EMAIL'])) else: app.logger.info("\n\n Unable to create any users. No one will be able to log in.") ### @export "close_request" def close_request(request_id, reason = "", user_id = None): req = get_obj("Request", request_id) change_request_status(request_id, "Closed") # Create a note to capture closed information: create_note(request_id, reason, user_id) generate_prr_emails(request_id = request_id, notification_type = "Request closed") add_staff_participant(request_id = request_id, user_id = user_id)
RequestPresenter.py
The 'Response' view""" public_records_portal.RequestPresenter ~~~~~~~~~~~~~~~~ Returns the html needed for the 'Request' portion of the case page. """ from public_records_portal import models from models import Note, QA from db_helpers import get_obj class RequestPresenter: def __init__(self, request, qa = None, note = None, index = None, public = False): self.index = index self.public = public self.request = request if qa: self.response = qa self.type = "qa" self.uid = self.response.owner_id self.staff = get_obj("User", self.uid) self.staff_email = "N/A" self.staff_department = "N/A" self.staff_phone = "N/A" self.staff_alias = "N/A" if self.staff: if self.staff.email: self.staff_email = self.staff.email if self.staff.department: self.staff_department = self.staff.department if self.staff.phone: self.staff_phone = self.staff.phone if self.staff.alias: self.staff_alias = self.staff.alias directory_popover = "directoryPopover('%s', '%s', '%s', '#contactinfoPopoverQA%s')" %(self.staff_email, self.staff_department, self.staff_phone, index) self.owner_link = '<a href="/staff_card/%s" data-placement="top" data-toggle="popover" href="#" id="contactinfoPopoverQA%s" class="hidden-phone hidden-tablet"><span class="contactinfoPopover" onmouseover="%s">%s</span></a>' % (self.response.owner_id, index, directory_popover, self.staff_alias or self.staff_email) self.icon = "icon-question icon-large" if note: self.response = note self.type = "note" self.icon = "icon-edit icon-large" def get_id(self): return self.response.id def display_text(self): if self.type == "qa": text = "%s - %s" %(self.response.question, self.owner_link) if self.response.answer: text = text + "<p>%s - <span class='requester'>Requester</span></p>" %(self.response.answer) else: if self.request.is_closed(): text = text + "<i><p>No response from requester</p></i>" else: if self.public: text = text + """ <form name='respond_question' class='form-inline' id='answer' method='post' action='/update_a_qa' autocomplete='on'> <label class='control-label'>Answer</label><input type='hidden' name='qa_id' value='%s'/><input type='hidden' name='request_id' value='%s'/> <textarea id='answerTextarea' name='answer_text' class='input-xlarge' rows="2" type='text' rows='1' placeholder='Can you respond to the above question?' required/></textarea> <button id='askQuestion' class='btn btn-primary' type='submit'>Respond</button> </form> """ % (self.response.id, self.request.id) else: text = text + "<p>Requester hasn't answered yet.</p>" return text elif self.type == "note": return "%s - Requester" %(self.response.text) def get_icon(self): return self.icon def set_icon(self, icon): self.icon = icon def date(self): return self.response.date_created
ResponsePresenter.py
Scribd API helpers""" public_records_portal.RequestPresenter ~~~~~~~~~~~~~~~~ Returns the html needed for the 'Response' portion of the case page. """ from models import Record, Note import scribd_helpers class ResponsePresenter: def __init__(self, record = None, note = None): if record: self.response = record self.update_url = "update_a_record_delete" if self.response.access: self.type = "offline" elif self.response.doc_id: self.type = "document" else: self.type = "link" if note: self.response = note self.update_url = "update_a_note_delete" self.type = "note" if "Request extended:" in self.response.text: self.type = "extension" if self.type=="offline": self.icon = "icon-file-alt icon-large" elif self.type=="note": self.icon = "icon-edit icon-large" elif self.type=="link": self.icon = "icon-link icon-large" elif self.type =="document": self.icon = "icon-file-alt icon-large" elif self.type=="extension": self.icon = "icon-calendar icon-large" def get_update_url(self): return self.update_url def get_id(self): return self.response.id def uid(self): return self.response.user_id def staff_name(self): return get_attribute(attribute = "alias", obj_id = self.response.uid, obj_type = "User") def staff_dept(self): return get_attribute(attribute = "department", obj_id = self.response.uid, obj_type = "User") def staff_phone(self): return get_attribute(attribute = "phone", obj_id = self.response.uid, obj_type = "User") def staff_email(self): return get_attribute(attribute = "email", obj_id = self.response.uid, obj_type = "User") def display_text(self): if self.type == "offline": return "Name of Record: %s<br> How to Access Record: %s" %(self.response.description, self.response.access) elif self.type == "document": download_url = self.response.download_url if not download_url: download_url = scribd_helpers.get_scribd_download_url(doc_id = self.response.doc_id, record_id = self.response.id) if not download_url: download_url = "This document is still being uploaded, but it will be available shortly." return """ <a href='%(download_url)s' rel='tooltip' data-toggle='tooltip' data-placement='top' data-original-title='%(download_url)s' target='_blank'><b>%(description)s </b></a> <a href = '%(scribd_url)s' rel='tooltip' data-toggle='tooltip' data-placement='top' data-original-title='View document on Scribd hosting service' target='_blank'><small><i class='icon-external-link'> </i></small></a> """ %{"download_url": download_url, "description": self.response.description, "scribd_url": self.response.url} elif self.type == "note": return self.response.text elif self.type == "link": return "<a href='%s' rel='tooltip' data-toggle='tooltip' data-placement='top' data-original-title='%s'>%s </a>" % (self.response.url, self.response.url, self.response.description) elif self.type == "extension": text = self.response.text.strip("Request extended:") return text def get_icon(self): return self.icon def set_icon(self, icon): self.icon = icon def date(self): return self.response.date_created
scribd_helpers.py
Spam""" public_records_portal.scribd_helpers ~~~~~~~~~~~~~~~~ Implements functions to interact with Scribd API for RecordTrac """ import scribd from public_records_portal import app, models from timeout import timeout from werkzeug import secure_filename import tempfile def should_upload(): if app.config['ENVIRONMENT'] != 'LOCAL': return True elif 'UPLOAD_DOCS' in app.config: return True return False # These are the extensions that can be uploaded to Scribd.com: ALLOWED_EXTENSIONS = ['txt', 'pdf', 'doc', 'ps', 'rtf', 'epub', 'key', 'odt', 'odp', 'ods', 'odg', 'odf', 'sxw', 'sxc', 'sxi', 'sxd', 'ppt', 'pps', 'xls', 'zip', 'docx', 'pptx', 'ppsx', 'xlsx', 'tif', 'tiff'] def progress(bytes_sent, bytes_total): app.logger.info("Scribd upload in progress: %s of %s (%s%%)" % (bytes_sent, bytes_total, bytes_sent*100/bytes_total)) def upload(document, filename, API_KEY, API_SECRET, description): # Configure the Scribd API. scribd.config(API_KEY, API_SECRET) doc_id = None try: # Upload the document from a file. doc = scribd.api_user.upload( targetfile = document, name = filename, progress_callback=progress, req_buffer = tempfile.TemporaryFile() ) doc.description = description doc.save() doc_id = doc.id return doc_id except scribd.ResponseError, err: app.logger.info('Scribd failed: code=%d, error=%s' % (err.errno, err.strerror)) return err.strerror def get_scribd_download_url(doc_id, record_id = None): if not should_upload(): return None API_KEY = app.config['SCRIBD_API_KEY'] API_SECRET = app.config['SCRIBD_API_SECRET'] try: scribd.config(API_KEY, API_SECRET) doc = scribd.api_user.get(doc_id) doc_url = doc.get_download_url() if record_id: set_scribd_download_url(download_url = doc_url, record_id = record_id) return doc_url except: return None def set_scribd_download_url(download_url, record_id): update_obj('download_url', download_url, obj_type = 'Record', obj_id = record_id) def scribd_batch_download(): req = Request.query.all() for record in req.records: if record.download_url: urllib.urlretrieve(record.downlaod_url, "saved_records/%s" %(record.filename)) def make_public(doc_id, API_KEY, API_SECRET): scribd.config(API_KEY, API_SECRET) doc = scribd.api_user.get(doc_id) doc.access = 'public' doc.save() def make_private(doc_id, API_KEY, API_SECRET): scribd.config(API_KEY, API_SECRET) doc = scribd.api_user.get(doc_id) doc.access = 'private' doc.save() def update_descriptions(API_KEY, API_SECRET): scribd.config(API_KEY, API_SECRET) for doc in scribd.api_user.all(): record = models.Record.query.filter_by(doc_id = doc.id).first() if record: link_back = app.config['APPLICATION_URL'] + 'request/' + str(record.request_id) description = "This document was uploaded via RecordTrac in response to a public records request for the %s. You can view the original request here: %s" % ( app.config['AGENCY_NAME'], link_back) doc.description = description doc.save() app.logger.info("\n\nUpdated Scribd document %s's description to %s" %(doc.id, description)) @timeout(seconds=20) def upload_file(document, request_id): # Uploads file to scribd.com and returns doc ID. File can be accessed at scribd.com/doc/id if not should_upload(): return '1', None # Don't need to do real uploads locally if document: allowed = allowed_file(document.filename) if allowed[0]: filename = secure_filename(document.filename) link_back = app.config['APPLICATION_URL'] + 'request/' + str(request_id) doc_id = upload(document = document, filename = filename, API_KEY = app.config['SCRIBD_API_KEY'], API_SECRET = app.config['SCRIBD_API_SECRET'], description = "This document was uploaded via RecordTrac in response to a public records request for the %s. You can view the original request here: %s" % (app.config['AGENCY_NAME'], link_back)) return doc_id, filename else: return allowed # Returns false and extension return None, None ### @export "allowed_file" def allowed_file(filename): ext = filename.rsplit('.', 1)[1] return ext in ALLOWED_EXTENSIONS, ext
spam.py
Views""" public_records_portal.spam ~~~~~~~~~~~~~~~~ Implements spam filters used on RecordTrac's forms that don't require login. Akismet is a dependency (https://akismet.com) and the following environment variables need to be set in order for this to work: AKISMET_KEY, APPLICATION_URL """ from public_records_portal import app import akismet import logging from flask.ext.login import current_user def check_for_spam(): if current_user.is_authenticated(): # Spam filter is currently implemented to prevent bot spamming, so if someone is logged in they have already verified they are human return False if app.config['ENVIRONMENT'] == 'PRODUCTION': # This only needs to work in production, unless a local config variable is set to indicate otherwise return True elif 'CHECK_FOR_SPAM' in app.config: return True return False def is_spam(comment, user_ip, user_agent): if check_for_spam(): app.logger.info("\n\nAttempting to check for spam...") key = app.config['AKISMET_KEY'] blog = app.config['APPLICATION_URL'] if not is_working_akismet_key(key = key, blog = blog): app.logger.info("\n\nThere was a problem verifying the supplied AKISMET_KEY. Unable to check for spam.") return False if isinstance(comment, unicode): comment = comment.encode('utf8', 'ignore') if akismet.comment_check(key = key, blog = blog, user_ip = user_ip, user_agent = user_agent, comment_content = comment) or 'http' in comment: app.logger.info("Spam detected: %s" % comment ) return True return False def is_working_akismet_key(key, blog): key = app.config['AKISMET_KEY'] blog = app.config['APPLICATION_URL'] return akismet.verify_key(key=key, blog=blog)
views.py
Database schema""" public_records_portal.views ~~~~~~~~~~~~~~~~ Implements functions that render the Jinja (http://jinja.pocoo.org/) templates/html for RecordTrac. """ from flask import render_template, request, redirect, url_for, jsonify from flask.ext.login import LoginManager, login_user, logout_user, current_user, login_required from flaskext.browserid import BrowserID from public_records_portal import app, db, models from prr import add_resource, update_resource, make_request, close_request from db_helpers import get_user_by_id # finds a user by their id from db_helpers import get_user # finds a user based on BrowserID response import os, json from urlparse import urlparse, urljoin from notifications import send_prr_email, format_date from spam import is_spam, is_working_akismet_key from requests import get from time import time from flask.ext.cache import Cache from recaptcha.client import captcha from timeout import timeout from flask import jsonify, request, Response import anyjson import helpers import csv_export from datetime import datetime, timedelta from filters import * import re from db_helpers import get_count, get_obj from sqlalchemy import func, not_, and_, or_ import pytz # Initialize login login_manager = LoginManager() login_manager.user_loader(get_user_by_id) login_manager.init_app(app) browser_id = BrowserID() browser_id.user_loader(get_user) browser_id.init_app(app) # Submitting a new request @app.route("/new", methods=["GET", "POST"]) def new_request(passed_recaptcha = False, data = None): if data or request.method == 'POST': if not data and not passed_recaptcha: data = request.form.copy() email = data['request_email'] request_text = data['request_text'] if request_text == "": return render_template('error.html', message = "You cannot submit an empty request.") if email == "" and 'ignore_email' not in data and not passed_recaptcha: return render_template('missing_email.html', form = data) if not passed_recaptcha and (is_spam(comment = request_text, user_ip = request.remote_addr, user_agent = request.headers.get('User-Agent'))): return render_template('recaptcha_request.html', form = data, message = "Hmm, your request looks like spam. To submit your request, type the numbers or letters you see in the field below.") alias = None phone = None offline_submission_type = None date_received = None department = None if 'request_department' in data: department = data['request_department'] if 'request_alias' in data: alias = data['request_alias'] if 'request_phone' in data: phone = data['request_phone'] if 'format_received' in data: offline_submission_type = data['format_received'] if 'date_received' in data: # From the jQuery datepicker date_received = data['date_received'] if date_received != "": try: date_received = datetime.strptime(date_received, '%m/%d/%Y') tz = pytz.timezone(app.config['TIMEZONE']) offset = tz.utcoffset(datetime.now()) offset = (offset.days * 86400 + offset.seconds) / 3600 date_received = date_received - timedelta(hours = offset) # This is somewhat of a hack, but we need to get this back in UTC time but still treat it as a 'naive' datetime object except ValueError: return render_template('error.html', message = "Please use the datepicker to select a date.") request_id, is_new = make_request(text = request_text, email = email, alias = alias, phone = phone, passed_spam_filter = True, department = department, offline_submission_type = offline_submission_type, date_received = date_received) if is_new: return redirect(url_for('show_request_for_x', request_id = request_id, audience = 'new')) if not request_id: return render_template('error.html', message = "Your request looks a lot like spam.") app.logger.info("\n\nDuplicate request entered: %s" % request_text) return render_template('error.html', message = "Your request is the same as /request/%s" % request_id) else: departments = None routing_available = False if 'LIAISONS_URL' in app.config: routing_available = True departments = db.session.query(models.Department).all() if current_user.is_authenticated(): return render_template('offline_request.html', routing_available = routing_available, departments = departments) else: return render_template('new_request.html', routing_available = routing_available, departments = departments) @app.route("/export") @login_required def to_csv(): return Response(csv_export.export(), mimetype='text/csv') @app.route("/", methods = ["GET", "POST"]) def index(): if current_user.is_anonymous() == False: return redirect(url_for('display_all_requests')) else: return landing() @app.route("/landing") def landing(): return render_template('landing.html') @login_manager.unauthorized_handler def unauthorized(): return render_template("alpha.html") @app.errorhandler(404) def page_not_found(e): return render_template('404.html'), 404 def explain_all_actions(): action_json = open(os.path.join(app.root_path, 'static/json/actions.json')) json_data = json.load(action_json) actions = [] for data in json_data: actions.append("%s: %s" %(data, json_data[data]["What"])) return render_template('actions.html', actions = actions) # Returns a view of the case based on the audience. Currently views exist for city staff or general public. @app.route("/<string:audience>/request/<int:request_id>") def show_request_for_x(audience, request_id): if "city" in audience: return show_request_for_city(request_id = request_id) return show_request(request_id = request_id, template = "manage_request_%s.html" %(audience)) show_request_for_x.methods = ['GET', 'POST'] @app.route("/city/request/<int:request_id>") @login_required def show_request_for_city(request_id): if is_supported_browser(): return show_request(request_id = request_id, template = "manage_request_city.html") else: return show_request(request_id = request_id, template = "manage_request_city_less_js.html") @app.route("/response/<int:request_id>") def show_response(request_id): req = get_obj("Request", request_id) if not req: return render_template('error.html', message = "A request with ID %s does not exist." % request_id) return render_template("response.html", req = req) @app.route("/track", methods = ["POST"]) def track(request_id = None): if request.method == 'POST': if not request_id: request_id = request.form['request_id'] if not current_user.is_anonymous(): audience = 'city' else: audience = 'public' return redirect(url_for('show_request_for_x', audience= audience, request_id = request_id)) else: return render_template("track.html") @app.route("/unfollow/<int:request_id>/<string:email>") def unfollow(request_id, email): success = False user_id = create_or_return_user(email.lower()) subscriber = get_subscriber(request_id = request_id, user_id = user_id) if subscriber: success = update_obj(attribute = "should_notify", val = False, obj = subscriber) if success: return show_request(request_id = request_id, template = "manage_request_unfollow.html") else: return render_template('error.html', message = "Unfollowing this request was unsuccessful. You probably weren't following it to begin with.") @app.route("/request/<int:request_id>") def show_request(request_id, template = "manage_request_public.html"): req = get_obj("Request", request_id) if not req: return render_template('error.html', message = "A request with ID %s does not exist." % request_id) if req.status and "Closed" in req.status and template != "manage_request_feedback.html": template = "closed.html" return render_template(template, req = req) @app.route("/api/staff") def staff_to_json(): users = models.User.query.filter(models.User.is_staff == True).all() staff_data = [] for u in users: staff_data.append({'alias': u.alias, 'email': u.email}) return jsonify(**{'objects': staff_data}) @app.route("/api/departments") def departments_to_json(): departments = models.Department.query.all() department_data = [] for d in departments: department_data.append({'department': d.name}) return jsonify(**{'objects': department_data}) def docs(): return redirect('http://codeforamerica.github.io/public-records/docs/1.0.0') @app.route("/edit/request/<int:request_id>") @login_required def edit_case(request_id): req = get_obj("Request", request_id) return render_template("edit_case.html", req = req) @app.route("/add_a_<string:resource>", methods = ["GET", "POST"]) @login_required def add_a_resource(resource): if request.method == 'POST': resource_id = add_resource(resource = resource, request_body = request.form, current_user_id = get_user_id()) if type(resource_id) == int or str(resource_id).isdigit(): app.logger.info("\n\nSuccessfully added resource: %s with id: %s" % (resource, resource_id)) return redirect(url_for('show_request_for_city', request_id = request.form['request_id'])) elif resource_id == False: app.logger.info("\n\nThere was an issue with adding resource: %s" % resource) return render_template('error.html') else: app.logger.info("\n\nThere was an issue with the upload: %s" % resource_id) return render_template('help_with_uploads.html', message = resource_id) return render_template('error.html', message = "You can only update requests from a request page!") @app.route("/public_add_a_<string:resource>", methods = ["GET", "POST"]) def public_add_a_resource(resource, passed_recaptcha = False, data = None): if (data or request.method == 'POST') and ('note' in resource or 'subscriber' in resource): if not data: data = request.form.copy() if 'note' in resource: if not passed_recaptcha and is_spam(comment = data['note_text'], user_ip = request.remote_addr, user_agent = request.headers.get('User-Agent')): return render_template('recaptcha_note.html', form = data, message = "Hmm, your note looks like spam. To submit your note, type the numbers or letters you see in the field below.") resource_id = prr.add_note(request_id = data['request_id'], text = data['note_text'], passed_spam_filter = True) else: resource_id = prr.add_resource(resource = resource, request_body = data, current_user_id = None) if type(resource_id) == int: request_id = data['request_id'] audience = 'public' if 'subscriber' in resource: audience = 'follower' return redirect(url_for('show_request_for_x', audience=audience, request_id = request_id)) return render_template('error.html') @app.route("/update_a_<string:resource>", methods = ["GET", "POST"]) def update_a_resource(resource, passed_recaptcha = False, data = None): if (data or request.method == 'POST'): if not data: data = request.form.copy() if 'qa' in resource: if not passed_recaptcha and is_spam(comment = data['answer_text'], user_ip = request.remote_addr, user_agent = request.headers.get('User-Agent')): return render_template('recaptcha_answer.html', form = data, message = "Hmm, your answer looks like spam. To submit your answer, type the numbers or letters you see in the fiel dbelow.") prr.answer_a_question(qa_id = int(data['qa_id']), answer = data['answer_text'], passed_spam_filter = True) else: update_resource(resource, data) if current_user.is_anonymous() == False: return redirect(url_for('show_request_for_city', request_id = request.form['request_id'])) else: return redirect(url_for('show_request', request_id = request.form['request_id'])) return render_template('error.html', message = "You can only update requests from a request page!") # Closing is specific to a case, so this only gets called from a case (that only city staff have a view of) @app.route("/close", methods = ["GET", "POST"]) @login_required def close(request_id = None): if request.method == 'POST': template = 'closed.html' request_id = request.form['request_id'] reason = "" if 'close_reason' in request.form: reason = request.form['close_reason'] elif 'close_reasons' in request.form: for close_reason in request.form.getlist('close_reasons'): reason += close_reason + " " close_request(request_id = request_id, reason = reason, user_id = get_user_id()) return show_request(request_id, template= template) return render_template('error.html', message = "You can only close from a requests page!") def filter_department(departments_selected, results): if departments_selected and 'All departments' not in departments_selected: app.logger.info("\n\nDepartment filters:%s." % departments_selected) department_ids = [] for department_name in departments_selected: if department_name: department = models.Department.query.filter_by(name = department_name).first() if department: department_ids.append(department.id) if department_ids: results = results.filter(models.Request.department_id.in_(department_ids)) else: # Just return an empty query set results = results.filter(models.Request.department_id < 0) return results def filter_search_term(search_input, results): if search_input: app.logger.info("Searching for '%s'." % search_input) search_terms = search_input.strip().split(" ") # Get rid of leading and trailing spaces and generate a list of the search terms num_terms = len(search_terms) # Set up the query search_query = "" if num_terms > 1: for x in range(num_terms - 1): search_query = search_query + search_terms[x] + ' & ' search_query = search_query + search_terms[num_terms - 1] + ":*" # Catch substrings results = results.filter("to_tsvector(text) @@ to_tsquery('%s')" % search_query) return results def get_filter_value(filters_map, filter_name, is_list = False, is_boolean = False): if filter_name in filters_map: val = filters_map[filter_name] if filter_name == 'department' and val: return [val] elif is_list: return filters_map.getlist(filter_name) elif is_boolean: return str(val.lower()) else: return val return None def is_supported_browser(): browser = request.user_agent.browser version = request.user_agent.version and int(request.user_agent.version.split('.')[0]) platform = request.user_agent.platform uas = request.user_agent.string if browser and version: if (browser == 'msie' and version < 9) \ or (browser == 'firefox' and version < 4) \ or (platform == 'android' and browser == 'safari' and version < 534) \ or (platform == 'iphone' and browser == 'safari' and version < 7000) \ or ((platform == 'macos' or platform == 'windows') and browser == 'safari' and not re.search('Mobile', uas) and version < 534) \ or (re.search('iPad', uas) and browser == 'safari' and version < 7000) \ or (platform == 'windows' and re.search('Windows Phone OS', uas)) \ or (browser == 'opera') \ or (re.search('BlackBerry', uas)): return False return False @app.route("/view_requests") def display_all_requests(methods = ["GET"]): """ Dynamically load requests page depending on browser. """ if is_supported_browser(): return backbone_requests() else: return no_backbone_requests() @app.route("/view_requests_backbone") def backbone_requests(): return render_template("all_requests.html", departments = db.session.query(models.Department).all(), total_requests_count = get_count("Request")) @app.route("/view_requests_no_backbone") def no_backbone_requests(): return fetch_requests() @app.route("/requests", methods = ["GET"]) def fetch_requests(output_results_only = False, filters_map = None, date_format = '%Y-%m-%d', checkbox_value = 'on'): user_id = get_user_id() if not filters_map: if request.args: if is_supported_browser(): return backbone_requests() else: # Clear URL filters_map = request.args else: filters_map = request.form # Set defaults is_open = checkbox_value is_closed = None due_soon = checkbox_value overdue = checkbox_value mine_as_poc = checkbox_value mine_as_helper = checkbox_value departments_selected = [] sort_column = "id" sort_direction = "asc" min_due_date = None max_due_date = None min_date_received = None max_date_received = None requester_name = None page_number = 1 search_term = None if filters_map: departments_selected = get_filter_value(filters_map = filters_map, filter_name = 'departments_selected', is_list = True) or get_filter_value(filters_map, 'department') is_open = get_filter_value(filters_map = filters_map, filter_name = 'is_open', is_boolean = True) is_closed = get_filter_value(filters_map = filters_map, filter_name = 'is_closed', is_boolean = True) due_soon = get_filter_value(filters_map = filters_map, filter_name = 'due_soon', is_boolean = True) overdue = get_filter_value(filters_map = filters_map, filter_name = 'overdue', is_boolean = True) mine_as_poc = get_filter_value(filters_map = filters_map, filter_name = 'mine_as_poc', is_boolean = True) mine_as_helper = get_filter_value(filters_map = filters_map, filter_name = 'mine_as_helper', is_boolean = True) sort_column = get_filter_value(filters_map, 'sort_column') or 'id' sort_direction = get_filter_value(filters_map, 'sort_direction') or 'asc' search_term = get_filter_value(filters_map, 'search_term') min_due_date = get_filter_value(filters_map, 'min_due_date') max_due_date = get_filter_value(filters_map, 'max_due_date') min_date_received = get_filter_value(filters_map, 'min_date_received') max_date_received = get_filter_value(filters_map, 'max_date_received') requester_name = get_filter_value(filters_map, 'requester_name') page_number = int(get_filter_value(filters_map, 'page_number') or '1') results = get_results_by_filters(departments_selected = departments_selected, is_open = is_open, is_closed = is_closed, due_soon = due_soon, overdue = overdue, mine_as_poc = mine_as_poc, mine_as_helper = mine_as_helper, sort_column = sort_column, sort_direction = sort_direction, search_term = search_term, min_due_date = min_due_date, max_due_date = max_due_date, min_date_received = min_date_received, max_date_received = max_date_received, requester_name = requester_name, page_number = page_number, user_id = user_id, date_format = date_format, checkbox_value = checkbox_value) # Execute query limit = 15 offset = limit * (page_number - 1) app.logger.info("Page Number: {0}, Limit: {1}, Offset: {2}".format(page_number, limit, offset)) more_results = False num_results = results.count() start_index = 0 end_index = 0 if num_results != 0: start_index = (page_number - 1) * limit if start_index == 0: start_index = 1 if num_results > (limit * page_number): more_results = True end_index = start_index + 14 else: end_index = num_results results = results.limit(limit).offset(offset).all() requests = prepare_request_fields(results = results) if output_results_only == True: return requests, num_results, more_results, start_index, end_index return render_template("all_requests_less_js.html", total_requests_count = get_count("Request"), requests = requests, departments = db.session.query(models.Department).all(), departments_selected = departments_selected, is_open = is_open, is_closed = is_closed, due_soon = due_soon, overdue = overdue, mine_as_poc = mine_as_poc, mine_as_helper = mine_as_helper, sort_column = sort_column, sort_direction = sort_direction, search_term = search_term, min_due_date = min_due_date, max_due_date = max_due_date, min_date_received = min_date_received, max_date_received = max_date_received, requester_name = requester_name, page_number = page_number, more_results = more_results, num_results = num_results, start_index = start_index, end_index = end_index) @app.route("/custom/request", methods = ["GET", "POST"]) def json_requests(): """ Ultra-custom API endpoint for serving up requests. Supports limit, search, and page parameters and returns json with an object that has a list of results in the 'objects' field. """ objects, num_results, more_results, start_index, end_index = fetch_requests(output_results_only = True, filters_map = request.args, date_format = '%m/%d/%Y', checkbox_value = 'true') matches = { "objects": objects, "num_results": num_results, "more_results": more_results, "start_index": start_index, "end_index": end_index } response = anyjson.serialize(matches) return Response(response, mimetype = "application/json") def prepare_request_fields(results): if current_user.is_anonymous(): return map(lambda r: { "id": r.id, \ "text": helpers.clean_text(r.text), \ "date_received": helpers.date(r.date_received or r.date_created), \ "department": r.department_name(), \ "status": r.status, \ # The following two attributes are defined as model methods, # and not regular SQLAlchemy attributes. "contact_name": r.point_person_name(), \ "solid_status": r.solid_status() }, results) else: return map(lambda r: { "id": r.id, \ "text": helpers.clean_text(r.text), \ "date_received": helpers.date(r.date_received or r.date_created), \ "department": r.department_name(), \ "requester": r.requester_name(), \ "due_date": format_date(r.due_date), \ "status": r.status, \ # The following two attributes are defined as model methods, # and not regular SQLAlchemy attributes. "contact_name": r.point_person_name(), \ "solid_status": r.solid_status() }, results) def get_results_by_filters(departments_selected, is_open, is_closed, due_soon, overdue, mine_as_poc, mine_as_helper, sort_column, sort_direction, search_term, min_due_date, max_due_date, min_date_received, max_date_received, requester_name, page_number, user_id, date_format, checkbox_value): # Initialize query results = db.session.query(models.Request) # Set filters on the query results = filter_department(departments_selected = departments_selected, results = results) results = filter_search_term(search_input = search_term, results = results) # Accumulate status filters status_filters = [] if is_open == checkbox_value: status_filters.append(models.Request.open) if not user_id: status_filters.append(models.Request.due_soon) status_filters.append(models.Request.overdue) if is_closed == checkbox_value: status_filters.append(models.Request.closed) if min_date_received and max_date_received and min_date_received != "" and max_date_received != "": try: min_date_received = datetime.strptime(min_date_received, date_format) max_date_received = datetime.strptime(max_date_received, date_format) + timedelta(hours = 23, minutes = 59) results = results.filter(and_(models.Request.date_received >= min_date_received, models.Request.date_received <= max_date_received)) app.logger.info('Request Date Bounding. Min: {0}, Max: {1}'.format(min_date_received, max_date_received)) except: app.logger.info('There was an error parsing the request date filters. Received Min: {0}, Max {1}'.format(min_date_received, max_date_received)) # Filters for agency staff only: if user_id: if due_soon == checkbox_value: status_filters.append(models.Request.due_soon) if overdue == checkbox_value: status_filters.append(models.Request.overdue) if min_due_date and max_due_date and min_due_date != "" and max_due_date != "": try: min_due_date = datetime.strptime(min_due_date, date_format) max_due_date = datetime.strptime(max_due_date, date_format) + timedelta(hours = 23, minutes = 59) results = results.filter(and_(models.Request.due_date >= min_due_date, models.Request.due_date <= max_due_date)) app.logger.info('Due Date Bounding. Min: {0}, Max: {1}'.format(min_due_date, max_due_date)) except: app.logger.info('There was an error parsing the due date filters. Due Date Min: {0}, Max {1}'.format(min_due_date, max_due_date)) # PoC and Helper filters if mine_as_poc == checkbox_value: if mine_as_helper == checkbox_value: # Where am I the Point of Contact *or* the Helper? results = results.filter(models.Request.id == models.Owner.request_id) \ .filter(models.Owner.user_id == user_id) \ .filter(models.Owner.active == True) else: # Where am I the Point of Contact only? results = results.filter(models.Request.id == models.Owner.request_id) \ .filter(models.Owner.user_id == user_id) \ .filter(models.Owner.is_point_person == True) elif mine_as_helper == checkbox_value: # Where am I a Helper only? results = results.filter(models.Request.id == Owner.request_id) \ .filter(models.Owner.user_id == user_id) \ .filter(models.Owner.active == True) \ .filter(models.Owner.is_point_person == False) # Filter based on requester name requester_name = requester_name if requester_name and requester_name != "": results = results.join(models.Subscriber, models.Request.subscribers).join(models.User).filter(func.lower(models.User.alias).like("%%%s%%" % requester_name.lower())) # Apply the set of status filters to the query. # Using 'or', they're non-exclusive! results = results.filter(or_(*status_filters)) if sort_column: app.logger.info("Sort Direction: %s" % sort_direction) app.logger.info("Sort Column: %s" % sort_column) if sort_direction == "desc": results = results.order_by((getattr(models.Request, sort_column)).desc()) else: results = results.order_by((getattr(models.Request, sort_column)).asc()) return results.order_by(models.Request.id.desc()) @app.route("/<page>") def any_page(page): try: return render_template('%s.html' %(page)) except: return render_template('error.html', message = "%s totally doesn't exist." %(page)) def tutorial(): user_id = get_user_id() app.logger.info("\n\nTutorial accessed by user: %s." % user_id) return render_template('tutorial.html') @app.route("/staff_card/<int:user_id>") def staff_card(user_id): return render_template('staff_card.html', uid = user_id) @app.route("/logout") @login_required def logout(): logout_user() return index() def get_user_id(): if current_user.is_authenticated(): return current_user.id return None # Used as AJAX POST endpoint to check if new request text contains certain keyword # See new_requests.(html/js) @app.route("/is_public_record", methods = ["POST"]) def is_public_record(): request_text = request.form['request_text'] not_records_filepath = os.path.join(app.root_path, 'static/json/notcityrecords.json') not_records_json = open(not_records_filepath) json_data = json.load(not_records_json) request_text = request_text.lower() app.logger.info("Someone input %s" %(request_text)) if "birth" in request_text or "death" in request_text or "marriage" in request_text: return json_data["Certificate"] if "divorce" in request_text: return json_data["Divorce"] return '' def get_redirect_target(): """ Taken from http://flask.pocoo.org/snippets/62/ """ for target in request.values.get('next'), request.referrer: if not target: continue if is_safe_url(target): return target def is_safe_url(target): """ Taken from http://flask.pocoo.org/snippets/62/ """ ref_url = urlparse(request.host_url) test_url = urlparse(urljoin(request.host_url, target)) return test_url.scheme in ('http', 'https') and \ ref_url.netloc == test_url.netloc @app.route("/recaptcha_<string:templatetype>", methods = ["GET", "POST"]) def recaptcha_templatetype(templatetype): if request.method == 'POST': template = "recaptcha_" + templatetype + ".html" response = captcha.submit( request.form['recaptcha_challenge_field'], request.form['recaptcha_response_field'], app.config['RECAPTCHA_PRIVATE_KEY'], request.remote_addr ) if not response.is_valid: message = "Invalid. Please try again." return render_template(template, message = message, form = request.form) else: if templatetype == "note": return public_add_a_resource(passed_recaptcha = True, data = request.form, resource = "note") elif templatetype == "answer": app.logger.info("Template type is answer!") return update_a_resource(passed_recaptcha = True, data = request.form, resource = "qa") elif templatetype == "request": return new_request(passed_recaptcha = True, data = request.form) else: app.logger.info("\n\nAttempted access to recaptcha not via POST") return render_template('error.html', message = "You don't need to be here.") @app.route("/.well-known/status", methods = ["GET"]) def well_known_status(): ''' ''' response = { 'status': 'ok', 'updated': int(time()), 'dependencies': ['Akismet', 'Scribd', 'Sendgrid', 'Postgres'], 'resources': {} } # # Try to connect to the database and get the first user. # try: if not get_obj('User', 1): raise Exception('Failed to get the first user') except Exception, e: response['status'] = 'Database fail: %s' % e return jsonify(response) # # Try to connect to Akismet and see if the key is valid. # try: if not is_working_akismet_key(): raise Exception('Akismet reported a non-working key') except Exception, e: response['status'] = 'Akismet fail: %s' % e return jsonify(response) # # Try to ask Sendgrid how many emails we have sent in the past month. # try: url = 'https://sendgrid.com/api/stats.get.json?api_user=%(MAIL_USERNAME)s&api_key=%(MAIL_PASSWORD)s&days=30' % app.config got = get(url) if got.status_code != 200: raise Exception('HTTP status %s from Sendgrid /api/stats.get' % got.status_code) mails = sum([m['delivered'] + m['repeat_bounces'] for m in got.json()]) response['resources']['Sendgrid'] = 100 * float(mails) / int(app.config.get('SENDGRID_MONTHLY_LIMIT') or 40000) except Exception, e: response['status'] = 'Sendgrid fail: %s' % e return jsonify(response) return jsonify(response)
models.py
""" public_records_portal.models ~~~~~~~~~~~~~~~~ Defines RecordTrac's database schema, and implements helper functions. """ from flask.ext.sqlalchemy import SQLAlchemy, sqlalchemy from flask.ext.login import current_user from sqlalchemy import Table, Column, Integer, String, ForeignKey from sqlalchemy.orm import relationship, backref from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method from sqlalchemy import and_, or_ from datetime import datetime, timedelta from public_records_portal import db, app from werkzeug.security import generate_password_hash, check_password_hash import json import re from validate_email import validate_email ### @export "User" class User(db.Model): __tablename__ = 'user' id = db.Column(db.Integer, primary_key = True) alias = db.Column(db.String(100)) email = db.Column(db.String(100), unique=True) phone = db.Column(db.String()) date_created = db.Column(db.DateTime) password = db.Column(db.String(255)) department = db.Column(Integer, ForeignKey("department.id")) current_department = relationship("Department", foreign_keys = [department], uselist = False) contact_for = db.Column(db.String()) # comma separated list backup_for = db.Column(db.String()) # comma separated list owners = relationship("Owner") subscribers = relationship("Subscriber") is_staff = db.Column(db.Boolean, default = False) # Is this user an active agency member? def is_authenticated(self): return True def is_active(self): return True def is_anonymous(self): return False def get_id(self): return unicode(self.id) def get_alias(self): if self.alias and self.alias != "": return self.alias return "N/A" def get_phone(self): if self.phone and self.phone != "": return self.phone return "N/A" def __init__(self, email=None, alias = None, phone=None, department = None, contact_for=None, backup_for=None, is_staff = False): if email and validate_email(email): self.email = email self.alias = alias if phone and phone != "": self.phone = phone self.date_created = datetime.now().isoformat() if department and department != "": self.department = department if contact_for and contact_for != "": self.contact_for = contact_for if backup_for and backup_for != "": self.backup_for = backup_for if is_staff: self.is_staff = is_staff def __repr__(self): return '<User %r>' % self.email def __str__(self): return self.email def department_name(self): if self.current_department and self.current_department.name: return self.current_department.name else: app.logger.error("\n\nUser %s is not associated with a department." % self.email) return "N/A" ### @export "Department" class Department(db.Model): __tablename__ = 'department' id = db.Column(db.Integer, primary_key =True) date_created = db.Column(db.DateTime) date_updated = db.Column(db.DateTime) name = db.Column(db.String(), unique=True) users = relationship("User") # The list of users in this department requests = relationship("Request", order_by = "Request.date_created.asc()") # The list of requests currently associated with this department def __init__(self, name): self.name = name self.date_created = datetime.now().isoformat() def __repr__(self): return '<Department %r>' % self.name def __str__(self): return self.name def get_name(self): return self.name or "N/A" ### @export "Request" class Request(db.Model): # The public records request __tablename__ = 'request' id = db.Column(db.Integer, primary_key =True) date_created = db.Column(db.DateTime) due_date = db.Column(db.DateTime) extended = db.Column(db.Boolean, default = False) # Has the due date been extended? qas = relationship("QA", cascade="all,delete", order_by = "QA.date_created.desc()") # The list of QA units for this request status_updated = db.Column(db.DateTime) text = db.Column(db.String(), unique=True) # The actual request text. owners = relationship("Owner", cascade = "all, delete", order_by="Owner.date_created.asc()") subscribers = relationship("Subscriber", cascade ="all, delete") # The list of subscribers following this request. records = relationship("Record", cascade="all,delete", order_by = "Record.date_created.desc()") # The list of records that have been uploaded for this request. notes = relationship("Note", cascade="all,delete", order_by = "Note.date_created.desc()") # The list of notes appended to this request. status = db.Column(db.String(400)) # The status of the request (open, closed, etc.) creator_id = db.Column(db.Integer, db.ForeignKey('user.id')) # If city staff created it on behalf of the public, otherwise the creator is the subscriber with creator = true department_id = db.Column(db.Integer, db.ForeignKey("department.id")) department = relationship("Department", uselist = False) date_received = db.Column(db.DateTime) offline_submission_type = db.Column(db.String()) def __init__(self, text, creator_id = None, offline_submission_type = None, date_received = None): self.text = text self.date_created = datetime.now().isoformat() self.creator_id = creator_id self.offline_submission_type = offline_submission_type if date_received and type(date_received) is datetime: self.date_received = date_received def __repr__(self): return '<Request %r>' % self.text def set_due_date(self): if not self.date_received: self.date_received = self.date_created if self.extended == True: self.due_date = self.date_received + timedelta(days = int(app.config['DAYS_AFTER_EXTENSION'])) else: self.due_date = self.date_received + timedelta(days = int(app.config['DAYS_TO_FULFILL'])) def extension(self): self.extended = True self.due_date = self.due_date + timedelta(days = int(app.config['DAYS_AFTER_EXTENSION'])) def point_person(self): for o in self.owners: if o.is_point_person: return o return None def all_owners(self): all_owners = [] for o in self.owners: all_owners.append(o.user.get_alias()) return all_owners def requester(self): if self.subscribers: return self.subscribers[0] or None # The first subscriber is always the requester return None def requester_name(self): requester = self.requester() if requester and requester.user: return requester.user.get_alias() return "N/A" def requester_phone(self): requester = self.requester() if requester and requester.user: return requester.user.get_phone() return "N/A" def point_person_name(self): point_person = self.point_person() if point_person and point_person.user: return point_person.user.get_alias() return "N/A" def department_name(self): if self.department: return self.department.get_name() return "N/A" def is_closed(self): if self.status: return re.match('.*(closed).*', self.status, re.IGNORECASE) is not None else: app.logger.info("\n\n Request with this ID has no status: %s" % self.id) return False def solid_status(self, cron_job = False): if self.is_closed(): return "closed" else: if cron_job or (not current_user.is_anonymous()): if self.due_date: if datetime.now() >= self.due_date: return "overdue" elif (datetime.now() + timedelta(days = int(app.config['DAYS_UNTIL_OVERDUE']))) >= self.due_date: return "due soon" return "open" @hybrid_property def open(self): two_days = datetime.now() + timedelta(days = 2) return and_(~self.closed, self.due_date > two_days) @hybrid_property def due_soon(self): two_days = datetime.now() + timedelta(days = 2) return and_(self.due_date < two_days, self.due_date > datetime.now(), ~self.closed) @hybrid_property def overdue(self): return and_(self.due_date < datetime.now(), ~self.closed) @hybrid_property def closed(self): return Request.status.ilike("%closed%") ### @export "QA" class QA(db.Model): # A Q & A block for a request __tablename__ = 'qa' id = db.Column(db.Integer, primary_key = True) question = db.Column(db.String()) answer = db.Column(db.String()) request_id = db.Column(db.Integer, db.ForeignKey('request.id')) owner_id = db.Column(db.Integer, db.ForeignKey('user.id')) # Actually just a user ID subscriber_id = db.Column(db.Integer, db.ForeignKey('user.id')) # Actually just a user ID date_created = db.Column(db.DateTime) def __init__(self, request_id, question, user_id = None): self.question = question self.request_id = request_id self.date_created = datetime.now().isoformat() self.owner_id = user_id def __repr__(self): return "<QA Q: %r A: %r>" %(self.question, self.answer) ### @export "Owner" class Owner(db.Model): # A member of city staff assigned to a particular request, that may or may not upload records towards that request. __tablename__ = 'owner' id = db.Column(db.Integer, primary_key =True) user_id = Column(Integer, ForeignKey('user.id')) user = relationship("User", uselist = False) request_id = db.Column(db.Integer, db.ForeignKey('request.id')) request = relationship("Request", foreign_keys = [request_id]) active = db.Column(db.Boolean, default = True) # Indicate whether they're still involved in the request or not. reason = db.Column(db.String()) # Reason they were assigned reason_unassigned = db.Column(db.String()) # Reason they were unassigned date_created = db.Column(db.DateTime) date_updated = db.Column(db.DateTime) is_point_person = db.Column(db.Boolean) def __init__(self, request_id, user_id, reason= None, is_point_person = False): self.reason = reason self.user_id = user_id self.request_id = request_id self.date_created = datetime.now().isoformat() self.date_updated = self.date_created self.is_point_person = is_point_person def __repr__(self): return '<Owner %r>' %self.id ### @export "Subscriber" class Subscriber(db.Model): # A person subscribed to a request, who may or may not have created the request, and may or may not own a part of the request. __tablename__ = 'subscriber' id = db.Column(db.Integer, primary_key = True) should_notify = db.Column(db.Boolean, default = True) # Allows a subscriber to unsubscribe user_id = db.Column(db.Integer, db.ForeignKey('user.id')) user = relationship("User", uselist = False) request_id = db.Column(db.Integer, db.ForeignKey('request.id')) date_created = db.Column(db.DateTime) owner_id = db.Column(db.Integer, db.ForeignKey('owner.id')) # Not null if responsible for fulfilling a part of the request. UPDATE 6-11-2014: This isn't used. we should get rid of it. def __init__(self, request_id, user_id, creator = False): self.user_id = user_id self.request_id = request_id self.date_created = datetime.now().isoformat() def __repr__(self): return '<Subscriber %r>' %self.user_id ### @export "Record" class Record(db.Model): # A record that is attached to a particular request. A record can be online (uploaded document, link) or offline. __tablename__ = 'record' id = db.Column(db.Integer, primary_key = True) date_created = db.Column(db.DateTime) user_id = db.Column(db.Integer, db.ForeignKey('user.id')) # The user who uploaded the record, right now only city staff can doc_id = db.Column(db.Integer) # The document ID. Currently using Scribd API to upload documents. request_id = db.Column(db.Integer, db.ForeignKey('request.id')) # The request this record was uploaded for description = db.Column(db.String(400)) # A short description of what the record is. filename = db.Column(db.String(400)) # The original name of the file being uploaded. url = db.Column(db.String()) # Where it exists on the internet. download_url = db.Column(db.String()) # Where it can be downloaded on the internet. access = db.Column(db.String()) # How to access it. Probably only defined on offline docs for now. def __init__(self, request_id, user_id, url = None, filename = None, doc_id = None, description = None, access = None): self.doc_id = doc_id self.request_id = request_id self.user_id = user_id self.date_created = datetime.now().isoformat() self.description = description self.url = url self.filename = filename self.access = access def __repr__(self): return '<Record %r>' % self.description ### @export "Note" class Note(db.Model): # A note on a request. __tablename__ = 'note' id = db.Column(db.Integer, primary_key = True) date_created = db.Column(db.DateTime) text = db.Column(db.String()) request_id = db.Column(db.Integer, db.ForeignKey('request.id')) # The request it belongs to. user_id = db.Column(db.Integer, db.ForeignKey('user.id')) # The user who wrote the note. Right now only stored for city staff - otherwise it's an anonymous/ 'requester' note. def __init__(self, request_id, text, user_id): self.text = text self.request_id = request_id self.user_id = user_id self.date_created = datetime.now().isoformat() def __repr__(self): return '<Note %r>' % self.text ### @export "Visualization" class Visualization(db.Model): __tablename__ = 'visualization' id = db.Column(db.Integer, primary_key = True) content = db.Column(db.String()) date_created = db.Column(db.DateTime) date_updated = db.Column(db.DateTime) type_viz = db.Column(db.String()) def __init__(self, type_viz, content): self.type_viz = type_viz self.content = content self.date_created = datetime.now().isoformat() def __repr__(self): return '<Visualization %r>' % self.type_viz