Newer
Older
# coding=utf-8
import copy
import datetime
from collections import defaultdict
from operator import attrgetter
from django.db import connection
from django.db.models import Q, Count
from .models import AppointmentType, Appointment, Visit
__author__ = 'Valentin Grouès'
QUERY_VISITS_RANKS = """
SELECT DISTINCT(rank() OVER (PARTITION BY subject_id ORDER BY datetime_begin)) AS rank FROM web_visit
"""
QUERY_APPOINTMENTS_COUNT = """
SELECT count(*) FROM web_appointment LEFT JOIN (SELECT id, subject_id as web_visit_subject_id, rank()
OVER (PARTITION BY subject_id ORDER BY datetime_begin) AS rnk FROM web_visit)
a ON a.id = web_appointment.visit_id
LEFT JOIN web_studysubject ON web_studysubject.id = web_visit_subject_id
WHERE a.rnk = %s
AND EXTRACT(MONTH FROM web_appointment.datetime_when) = %s
AND EXTRACT(YEAR FROM web_appointment.datetime_when) = %s
"""
QUERY_VISITS_ENDED_COUNT = """
SELECT count(*) FROM (SELECT id, subject_id as web_visit_subject_id, datetime_begin, datetime_end, rank()
OVER (PARTITION BY subject_id ORDER BY datetime_begin) AS rnk FROM web_visit) a
LEFT JOIN web_studysubject ON web_studysubject.id = web_visit_subject_id
WHERE a.rnk = %s AND
EXTRACT(MONTH FROM a.datetime_end) = %s AND
EXTRACT(YEAR FROM a.datetime_end) = %s
"""
QUERY_VISITS_STARTED_COUNT = """
SELECT count(*) FROM (SELECT id, subject_id as web_visit_subject_id, datetime_begin, datetime_end, rank()
OVER (PARTITION BY subject_id ORDER BY datetime_begin) AS rnk FROM web_visit) a
LEFT JOIN web_studysubject ON web_studysubject.id = web_visit_subject_id
WHERE a.rnk = %s AND
EXTRACT(MONTH FROM a.datetime_begin) = %s AND
EXTRACT(YEAR FROM a.datetime_begin) = %s
"""
QUERY_APPOINTMENTS = """
SELECT types.appointment_type_id, web_appointment.status, count(*) FROM web_appointment
LEFT JOIN (SELECT id, subject_id as web_visit_subject_id, rank() OVER (PARTITION BY subject_id ORDER BY datetime_begin) AS rnk
FROM web_visit) a ON a.id = web_appointment.visit_id LEFT JOIN web_appointmenttypelink types
ON types.appointment_id = web_appointment.id
LEFT JOIN web_studysubject ON web_studysubject.id = web_visit_subject_id
WHERE a.rnk = %s
AND EXTRACT(MONTH FROM web_appointment.datetime_when) = %s
AND EXTRACT(YEAR FROM web_appointment.datetime_when) = %s
GROUP BY TYPES.appointment_type_id, web_appointment.status
"""
class StatisticsManager(object):
def __init__(self):
self.appointment_types = {appointment_type.id: appointment_type for appointment_type in
AppointmentType.objects.all()}
self.statuses_list = Appointment.objects.filter().values_list('status', flat=True).distinct().order_by(
'status').all()
self.statuses_labels = [Appointment.APPOINTMENT_STATUS_CHOICES.get(status, status.title()) for status in
self.statuses_list]
self.visits_ranks = self._get_visits_ranks()
def get_statistics_for_month(self, month, year, subject_type=None, visit=None):
"""
Build dict with statistics for a given month of a given year.
Statistics include:
- number of appointments,
- number of visits ended,
- number of visits started
:param month: the month number [1;12]
:type month: int
:param year: the year (4 digits)
:type year: int
:param subject_type: the type of subject (patient or control or None for all)
:type subject_type: basestring
:param visit: the visit number or None for all
:type visit: basestring
:return: a dictionary containing the statistics
:rtype: dict
"""
results = {}
general_results = {}
filters_month_year_appointments, filters_month_year_visits_ended, filters_month_year_visits_started = self._build_filters(
month, subject_type, year)
number_of_appointments = self._get_number_of_appointments(filters_month_year_appointments, visit, month, year,
subject_type)
number_of_visits_started = self._get_number_visits_started(filters_month_year_visits_started, visit, month,
year, subject_type)
number_of_visits_ended = self._get_number_visits_ended(filters_month_year_visits_ended, visit, month, year,
subject_type)
general_results["appointments"] = number_of_appointments
general_results["visits_started"] = number_of_visits_started
general_results["visits_ended"] = number_of_visits_ended
results["general"] = general_results
results_appointments = self.get_appointments_per_type_and_status(filters_month_year_appointments, month,
self.statuses_list, visit, year, subject_type)
results["appointments"] = results_appointments
results["statuses_list"] = self.statuses_labels
appointment_types_values = map(attrgetter('code'), self.appointment_types.values())
sorted_appointment_types_values = sorted(appointment_types_values)
results["appointments_types_list"] = sorted_appointment_types_values
return results
def get_appointments_per_type_and_status(self, filters_month_year_appointments, month, statuses_list, visit, year,
subject_type=None):
if not visit:
results_appointments = {}
for appointment_type in self.appointment_types.values():
appointment_type_filters = copy.deepcopy(filters_month_year_appointments)
appointment_type_filters.add(Q(appointment_types=appointment_type), Q.AND)
results_appointment_set = Appointment.objects.filter(appointment_type_filters).values(
'status').order_by(
'status').annotate(
Count('status'))
results_appointment = [Appointment.objects.filter(appointment_type_filters).count()]
results_appointment_per_status = {result['status']: result['status__count'] for result in
results_appointment_set}
results_appointment.extend([results_appointment_per_status.get(status, 0) for status in statuses_list])
results_appointments[appointment_type.code] = results_appointment
else:
results_appointment_set = defaultdict(dict)
query = QUERY_APPOINTMENTS
subject_type_clause = ""
if subject_type is not None:
subject_type_clause = " AND web_studysubject.type = '{}'".format(subject_type)
query = query.format(subject_type_clause)
cursor.execute(query, [visit, month, year])
rows = cursor.fetchall()
for row in rows:
appointment_type_id, status, count = row
results_appointment_set[appointment_type_id][status] = int(count)
results_appointments = {}
for appointment_type in self.appointment_types.values():
if appointment_type.id not in results_appointment_set:
results_appointments[appointment_type.code] = [0 * i for i in range(0, len(statuses_list) + 1)]
continue
results_appointment_set_for_type = results_appointment_set[appointment_type.id]
total = [sum(results_appointment_set_for_type.values())]
total.extend([results_appointment_set_for_type.get(status, 0) for status in statuses_list])
results_appointments[appointment_type.code] = total
return results_appointments
@staticmethod
def _get_count_from_filters_or_sql(model, filters, query, visit, month, year, subject_type):
if subject_type is not None:
query += " AND web_studysubject.type = '{}'".format(subject_type)
with connection.cursor() as cursor:
cursor.execute(
query,
[visit, month, year])
row = cursor.fetchone()
count = int(row[0])
else:
count = model.objects.filter(filters).count()
return count
def _get_number_visits_started(self, filters_month_year_visits_started, visit, month, year, subject_type=None):
return self._get_count_from_filters_or_sql(Visit, filters_month_year_visits_started, QUERY_VISITS_STARTED_COUNT,
visit, month, year, subject_type)
def _get_number_visits_ended(self, filters_month_year_visits_ended, visit, month, year, subject_type=None):
return self._get_count_from_filters_or_sql(Visit, filters_month_year_visits_ended, QUERY_VISITS_ENDED_COUNT,
visit, month, year, subject_type)
def _get_number_of_appointments(self, filters, visit, month, year, subject_type=None):
return self._get_count_from_filters_or_sql(Appointment, filters, QUERY_APPOINTMENTS_COUNT, visit, month, year,
subject_type)
@staticmethod
def _build_filters(month, subject_type, year):
filters_month_year_appointments = Q()
filters_month_year_appointments.add(Q(datetime_when__month=month), Q.AND)
filters_month_year_appointments.add(Q(datetime_when__year=year), Q.AND)
filters_month_year_visits_started = Q()
filters_month_year_visits_started.add(Q(datetime_begin__month=month), Q.AND)
filters_month_year_visits_started.add(Q(datetime_begin__year=year), Q.AND)
filters_month_year_visits_ended = Q()
filters_month_year_visits_ended.add(Q(datetime_end__month=month), Q.AND)
filters_month_year_visits_ended.add(Q(datetime_end__year=year), Q.AND)
if subject_type is not None:
subject_type_filter = Q(subject__type=subject_type)
filters_month_year_visits_started.add(subject_type_filter, Q.AND)
filters_month_year_visits_ended.add(subject_type_filter, Q.AND)
subject_type_filter_appointments = Q(visit__subject__type=subject_type)
filters_month_year_appointments.add(subject_type_filter_appointments, Q.AND)
return filters_month_year_appointments, filters_month_year_visits_ended, filters_month_year_visits_started
@staticmethod
def _get_visits_ranks(subject_type=None):
query = QUERY_VISITS_RANKS
if subject_type is not None:
query += " LEFT JOIN web_studysubject ON web_studysubject.id = web_visit.subject_id WHERE web_studysubject.type = '{}'".format(
subject_type)
with connection.cursor() as cursor:
cursor.execute(
[])
rows = cursor.fetchall()
return [r[0] for r in rows]
def get_previous_year_and_month():
now = datetime.datetime.now()
return get_previous_year_and_month_for_date(now)
def get_previous_year_and_month_for_date(now):
previous_month = now.month - 1 or 12
year_now = now.year
if previous_month == 12:
year_previous_month = year_now - 1
else:
year_previous_month = year_now
return year_previous_month, previous_month