import logging from django.contrib.auth.decorators import login_required from django.db.models import Count, Case, When, Min from django.db.models import Q from django.http import JsonResponse from web.models import StudySubject, Visit, Appointment from web.models.constants import SUBJECT_TYPE_CHOICES from web.views import e500_error from web.views.notifications import get_subjects_with_no_visit, get_subjects_with_reminder, get_today_midnight_date from web.views.subject import SUBJECT_LIST_GENERIC, SUBJECT_LIST_NO_VISIT, SUBJECT_LIST_REQUIRE_CONTACT logger = logging.getLogger(__name__) @login_required def cities(request): result_subjects = StudySubject.objects.filter(city__isnull=False).values_list('city').distinct() return JsonResponse({ "cities": [x[0] for x in result_subjects] }) @login_required def referrals(request): result_subjects = StudySubject.objects.filter(referral__isnull=False).values_list('referral').distinct() return JsonResponse({ "referrals": [x[0] for x in result_subjects] }) @login_required def get_subjects(request, type): if type == SUBJECT_LIST_GENERIC: return StudySubject.objects.all() elif type == SUBJECT_LIST_NO_VISIT: return get_subjects_with_no_visit(request.user) elif type == SUBJECT_LIST_REQUIRE_CONTACT: return get_subjects_with_reminder(request.user) else: raise TypeError("Unknown query type: " + type) def order_by_visit(subjects_to_be_ordered, order_direction, visit_number): return subjects_to_be_ordered.annotate( sort_visit_date=Min(Case(When(visit__visit_number=visit_number, then='visit__datetime_begin')))).order_by( order_direction + 'sort_visit_date') def get_subjects_order(subjects_to_be_ordered, order_column, order_direction): result = subjects_to_be_ordered if order_direction == "asc": order_direction = "" else: order_direction = "-" if order_column == "first_name": result = subjects_to_be_ordered.order_by(order_direction + 'first_name') elif order_column == "last_name": result = subjects_to_be_ordered.order_by(order_direction + 'last_name') elif order_column == "nd_number": result = subjects_to_be_ordered.order_by(order_direction + 'nd_number') elif order_column == "screening_number": result = subjects_to_be_ordered.order_by(order_direction + 'screening_number') elif order_column == "default_location": result = subjects_to_be_ordered.order_by(order_direction + 'default_location') elif order_column == "dead": result = subjects_to_be_ordered.order_by(order_direction + 'subject__dead') elif order_column == "resigned": result = subjects_to_be_ordered.order_by(order_direction + 'resigned') elif order_column == "information_sent": result = subjects_to_be_ordered.order_by(order_direction + 'information_sent') elif order_column == "postponed": result = subjects_to_be_ordered.order_by(order_direction + 'postponed') elif order_column == "type": result = subjects_to_be_ordered.order_by(order_direction + 'type') elif order_column == "id": result = subjects_to_be_ordered.order_by(order_direction + 'id') elif order_column == "date_born": result = subjects_to_be_ordered.order_by(order_direction + 'date_born') elif order_column == "visit_1": result = order_by_visit(subjects_to_be_ordered, order_direction, 1) elif order_column == "visit_2": result = order_by_visit(subjects_to_be_ordered, order_direction, 2) elif order_column == "visit_3": result = order_by_visit(subjects_to_be_ordered, order_direction, 3) elif order_column == "visit_4": result = order_by_visit(subjects_to_be_ordered, order_direction, 4) elif order_column == "visit_5": result = order_by_visit(subjects_to_be_ordered, order_direction, 5) elif order_column == "visit_6": result = order_by_visit(subjects_to_be_ordered, order_direction, 6) elif order_column == "visit_7": result = order_by_visit(subjects_to_be_ordered, order_direction, 7) elif order_column == "visit_8": result = order_by_visit(subjects_to_be_ordered, order_direction, 8) else: logger.warn("Unknown sort column: " + order_column) return result def filter_by_visit(result, visit_number, visit_type): # we need to give custom names for filtering params that contain visit_number in it # because we might want to filter by few visits and they shouldn't collide datetime_begin_filter = 'visit_' + str(visit_number) + '_datetime_begin' datetime_end_filter = 'visit_' + str(visit_number) + '_datetime_end' is_finished_filter = 'visit_' + str(visit_number) + '_is_finished' finished_appointments_filter = 'visit_' + str(visit_number) + '_finished_appointments' scheduled_appointments_filter = 'visit_' + str(visit_number) + '_scheduled_appointments' # this is hack... instead of providing True/False value this field contain 1/0 value, the problem is that we need # to provide aggregate function for the interacting parameter # If we try to assign it with pure Case(When...) (without Count/Min/Max/Avg) we obtain duplicates # of the results which are later on messing up with the subject list result = result.annotate(**{ is_finished_filter: Count(Case(When(Q(visit__is_finished=True) & Q(visit__visit_number=visit_number), then=1))) }) # number of finished appointments result = result.annotate(**{finished_appointments_filter: Count(Case(When( Q(visit__appointment__status=Appointment.APPOINTMENT_STATUS_FINISHED) & Q(visit__visit_number=visit_number), then=1)))}) # number of scheduled appointments result = result.annotate(**{scheduled_appointments_filter: Count(Case(When( Q(visit__appointment__status=Appointment.APPOINTMENT_STATUS_SCHEDULED) & Q(visit__visit_number=visit_number), then=1)))}) # when visit starts result = result.annotate( **{datetime_begin_filter: Min(Case(When(visit__visit_number=visit_number, then='visit__datetime_begin')))}) # when visit finish result = result.annotate( **{datetime_end_filter: Min(Case(When(visit__visit_number=visit_number, then='visit__datetime_end')))}) if visit_type == "DONE": result = result.filter(**{datetime_begin_filter + "__lt": get_today_midnight_date()}). \ filter(**{is_finished_filter + "__gt": 0}). \ filter(**{finished_appointments_filter + "__gt": 0}) elif visit_type == "MISSED": result = result.filter(**{datetime_begin_filter + "__lt": get_today_midnight_date()}). \ filter(**{is_finished_filter + "__gt": 0}). \ filter(**{finished_appointments_filter: 0}) elif visit_type == "EXCEED": result = result.filter(**{datetime_begin_filter + "__lt": get_today_midnight_date()}). \ filter(**{is_finished_filter: 0}). \ filter(**{datetime_end_filter + "__lt": get_today_midnight_date()}) elif visit_type == "IN_PROGRESS": result = result.filter(**{datetime_begin_filter + "__lt": get_today_midnight_date()}). \ filter(**{is_finished_filter: 0}). \ filter(**{datetime_end_filter + "__gt": get_today_midnight_date()}). \ filter(**{scheduled_appointments_filter + "__gt": 0}) elif visit_type == "SHOULD_BE_IN_PROGRESS": result = result.filter(**{datetime_begin_filter + "__lt": get_today_midnight_date()}). \ filter(**{is_finished_filter: 0}). \ filter(**{datetime_end_filter + "__gt": get_today_midnight_date()}). \ filter(**{scheduled_appointments_filter: 0}) elif visit_type == "UPCOMING": result = result.filter(**{datetime_begin_filter + "__gt": get_today_midnight_date()}) return result def get_subjects_filtered(subjects_to_be_filtered, filters): result = subjects_to_be_filtered for row in filters: column = row[0] value = row[1] if column == "first_name": result = result.filter(first_name__icontains=value) elif column == "last_name": result = result.filter(last_name__icontains=value) elif column == "nd_number": result = result.filter(nd_number__icontains=value) elif column == "screening_number": result = result.filter(screening_number__icontains=value) elif column == "dead": result = result.filter(subject__dead=(value == "true")) elif column == "resigned": result = result.filter(resigned=(value == "true")) elif column == "postponed": result = result.filter(postponed=(value == "true")) elif column == "information_sent": result = result.filter(information_sent=(value == "true")) elif column == "default_location": result = result.filter(default_location=value) elif column == "type": result = result.filter(type=value) elif column == "visit_1": result = filter_by_visit(result, 1, value) elif column == "visit_2": result = filter_by_visit(result, 2, value) elif column == "visit_3": result = filter_by_visit(result, 3, value) elif column == "visit_4": result = filter_by_visit(result, 4, value) elif column == "visit_5": result = filter_by_visit(result, 5, value) elif column == "visit_6": result = filter_by_visit(result, 6, value) elif column == "visit_7": result = filter_by_visit(result, 7, value) elif column == "visit_8": result = filter_by_visit(result, 8, value) elif column == "": pass else: message = "UNKNOWN filter: " if column is None: message += "[None]" else: message += str(column) logger.warn(message) return result @login_required def subjects(request, type): try: # id of the query from dataTable: https://datatables.net/manual/server-side draw = int(request.GET.get("draw", "-1")) start = int(request.GET.get("start", "0")) length = int(request.GET.get("length", "10")) order = int(request.GET.get("order[0][column]", "0")) order_dir = request.GET.get("order[0][dir]", "asc") order_column = request.GET.get("columns[" + str(order) + "][data]", "last_name") filters = [] column_id = 0 while request.GET.get("columns[" + str(column_id) + "][search][value]", "unknown") != "unknown": val = request.GET.get("columns[" + str(column_id) + "][search][value]", "unknown") if val != "": filters.append([request.GET.get("columns[" + str(column_id) + "][data]"), val]) column_id += 1 all_subjects = get_subjects(request, type) count = all_subjects.count() ordered_subjects = get_subjects_order(all_subjects, order_column, order_dir) filtered_subjects = get_subjects_filtered(ordered_subjects, filters) sliced_subjects = filtered_subjects[start:(start + length)] result_subjects = sliced_subjects count_filtered = filtered_subjects.count() data = [] for subject in result_subjects: data.append(serialize_subject(subject)) return JsonResponse({ "draw": draw, "recordsTotal": count, "recordsFiltered": count_filtered, "data": data, }) except Exception as e: logger.error(e, exc_info=True) return e500_error(request) @login_required def types(request): data = [{"id": subject_type_id, "name": subject_type_name} for subject_type_id, subject_type_name in SUBJECT_TYPE_CHOICES.items()] return JsonResponse({ "types": data }) def get_yes_no(val): if val: return "YES" else: return "NO" def serialize_subject_visit(visit): status = "---" appointments = visit.appointment_set.filter() pass def serialize_subject(subject): location = "" if subject.default_location is not None: location = subject.default_location.name visits = Visit.objects.filter(subject=subject).order_by('visit_number') serialized_visits = [] for visit in visits: if visit.datetime_begin < get_today_midnight_date(): if visit.is_finished: finished_appointments_count = visit.appointment_set.filter( status=Appointment.APPOINTMENT_STATUS_FINISHED).count() if finished_appointments_count > 0: status = "DONE" else: status = "MISSED" elif visit.datetime_end < get_today_midnight_date(): status = "EXCEEDED" else: scheduled_appointments_count = visit.appointment_set.filter( status=Appointment.APPOINTMENT_STATUS_SCHEDULED).count() if scheduled_appointments_count > 0: status = "IN_PROGRESS" else: status = "SHOULD_BE_IN_PROGRESS" else: status = "UPCOMING" serialized_visits.append({ "status": status, "datetime_start": visit.datetime_begin.strftime('%Y-%m-%d'), "datetime_end": visit.datetime_end.strftime('%Y-%m-%d'), }) result = { "first_name": subject.first_name, "last_name": subject.last_name, "date_born": subject.date_born, "nd_number": subject.nd_number, "screening_number": subject.screening_number, "default_location": location, "dead": get_yes_no(subject.subject.dead), "resigned": get_yes_no(subject.resigned), "postponed": get_yes_no(subject.postponed), "information_sent": get_yes_no(subject.information_sent), "type": subject.get_type_display(), "id": subject.id, "visits": serialized_visits, } return result