-
Carlos Vega authoredCarlos Vega authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
subject.py 19.59 KiB
import logging
from django.db.models import Count, Case, When, Min, Max
from django.db.models import Q
from django.http import JsonResponse
from web.api_views.serialization_utils import bool_to_yes_no, flying_team_to_str, location_to_str, add_column, \
serialize_date, serialize_datetime, get_filters_for_data_table_request
from web.models import StudySubject, Visit, Appointment, Subject, SubjectColumns, StudyColumns, Study, ContactAttempt
from web.models.constants import SUBJECT_TYPE_CHOICES, GLOBAL_STUDY_ID
from web.models.study_subject_list import SUBJECT_LIST_GENERIC, SUBJECT_LIST_NO_VISIT, SUBJECT_LIST_REQUIRE_CONTACT, \
StudySubjectList, SUBJECT_LIST_VOUCHER_EXPIRY
from web.views import e500_error
from web.views.notifications import get_subjects_with_no_visit, get_subjects_with_reminder, get_today_midnight_date, \
get_subjects_with_almost_expired_vouchers
logger = logging.getLogger(__name__)
# noinspection PyUnusedLocal
def cities(request):
result_subjects = Subject.objects.filter(city__isnull=False).values_list('city').distinct()
return JsonResponse({
"cities": [x[0] for x in result_subjects]
})
# noinspection PyUnusedLocal
def referrals(request):
result_subjects = StudySubject.objects.filter(referral__isnull=False).values_list('referral').distinct()
return JsonResponse({
"referrals": [x[0] for x in result_subjects]
})
def get_subject_columns(request, subject_list_type):
study = Study.objects.filter(id=GLOBAL_STUDY_ID)[0]
study_subject_lists = StudySubjectList.objects.filter(study=study, type=subject_list_type)
if len(study_subject_lists) > 0:
study_subject_list = study_subject_lists[0]
subject_columns = study_subject_list.visible_subject_columns
study_subject_columns = study_subject_list.visible_subject_study_columns
else:
study_subject_list = StudySubjectList()
subject_columns = SubjectColumns()
study_subject_columns = StudyColumns()
result = []
add_column(result, "ND", "nd_number", study_subject_columns, "string_filter", study.columns)
add_column(result, "Screening", "screening_number", study_subject_columns, "string_filter", study.columns)
add_column(result, "First name", "first_name", subject_columns, "string_filter")
add_column(result, "Last name", "last_name", subject_columns, "string_filter")
add_column(result, "Social Security Number", "social_security_number", subject_columns, "string_filter")
add_column(result, "Date of birth", "date_born", subject_columns, None)
add_column(result, "Contact on", "datetime_contact_reminder", study_subject_columns, None, study.columns)
add_column(result, "Last contact attempt", "last_contact_attempt", study_subject_list, None)
add_column(result, "Referred by", "referral", study_subject_columns, "string_filter", study.columns)
add_column(result, "Health partner name", "health_partner_first_name", None, "string_filter",
add_param=study.columns.health_partner,
visible_param=study_subject_columns.health_partner)
add_column(result, "Health partner last name", "health_partner_last_name", None, "string_filter",
add_param=study.columns.health_partner,
visible_param=study_subject_columns.health_partner)
add_column(result, "Location", "default_location", study_subject_columns, "location_filter", study.columns)
add_column(result, "Flying team location", "flying_team", study_subject_columns, "flying_team_filter",
study.columns)
add_column(result, "Deceased", "dead", subject_columns, "yes_no_filter")
add_column(result, "Resigned", "resigned", study_subject_columns, "yes_no_filter", study.columns)
add_column(result, "Postponed", "postponed", study_subject_columns, "yes_no_filter", study.columns)
add_column(result, "Excluded", "excluded", study_subject_columns, "yes_no_filter", study.columns)
add_column(result, "Info sent", "information_sent", study_subject_columns, "yes_no_filter", study.columns)
add_column(result, "Type", "type", study_subject_columns, "type_filter", study.columns)
add_column(result, "Edit", "edit", None, None, sortable=False)
for visit_number in range(1, 9):
visit_key = "visit_" + str(visit_number)
add_column(result, "Visit " + str(visit_number), visit_key, None, "visit_filter",
visible_param=study_subject_list.visits)
return JsonResponse({"columns": result})
def get_subjects(request, type):
if type == SUBJECT_LIST_GENERIC:
return StudySubject.objects.all()
elif type == SUBJECT_LIST_NO_VISIT:
return get_subjects_with_no_visit(request.user)
elif type == SUBJECT_LIST_REQUIRE_CONTACT:
return get_subjects_with_reminder(request.user)
elif type == SUBJECT_LIST_VOUCHER_EXPIRY:
return get_subjects_with_almost_expired_vouchers(request.user)
else:
raise TypeError("Unknown query type: " + type)
def order_by_visit(subjects_to_be_ordered, order_direction, visit_number):
return subjects_to_be_ordered.annotate(
sort_visit_date=Min(Case(When(visit__visit_number=visit_number, then='visit__datetime_begin')))).order_by(
order_direction + 'sort_visit_date')
def get_subjects_order(subjects_to_be_ordered, order_column, order_direction, column_filters={}):
result = subjects_to_be_ordered
if order_direction == "asc":
order_direction = ""
else:
order_direction = "-"
if order_column == "first_name":
result = subjects_to_be_ordered.order_by(order_direction + 'subject__first_name')
elif order_column == "last_name":
result = subjects_to_be_ordered.order_by(order_direction + 'subject__last_name')
elif order_column == "nd_number":
result = subjects_to_be_ordered.order_by(order_direction + 'nd_number')
elif order_column == "referral":
result = subjects_to_be_ordered.order_by(order_direction + 'referral')
elif order_column == "screening_number":
if u'screening_number' not in column_filters:
pattern = None
else:
pattern = column_filters[u'screening_number']
result = subjects_to_be_ordered.all()
result = sorted(result, key=lambda t: t.sort_matched_screening_first(pattern, reverse=order_direction == '-'), reverse = order_direction == '-' )
elif order_column == "default_location":
result = subjects_to_be_ordered.order_by(order_direction + 'default_location')
elif order_column == "flying_team":
result = subjects_to_be_ordered.order_by(order_direction + 'flying_team')
elif order_column == "dead":
result = subjects_to_be_ordered.order_by(order_direction + 'subject__dead')
elif order_column == "resigned":
result = subjects_to_be_ordered.order_by(order_direction + 'resigned')
elif order_column == "information_sent":
result = subjects_to_be_ordered.order_by(order_direction + 'information_sent')
elif order_column == "health_partner_first_name":
result = subjects_to_be_ordered.order_by(order_direction + 'health_partner__first_name')
elif order_column == "health_partner_last_name":
result = subjects_to_be_ordered.order_by(order_direction + 'health_partner__last_name')
elif order_column == "social_security_number":
result = subjects_to_be_ordered.order_by(order_direction + 'subject__social_security_number')
elif order_column == "postponed":
result = subjects_to_be_ordered.order_by(order_direction + 'postponed')
elif order_column == "excluded":
result = subjects_to_be_ordered.order_by(order_direction + 'excluded')
elif order_column == "type":
result = subjects_to_be_ordered.order_by(order_direction + 'type')
elif order_column == "id":
result = subjects_to_be_ordered.order_by(order_direction + 'id')
elif order_column == "date_born":
result = subjects_to_be_ordered.order_by(order_direction + 'subject__date_born')
elif order_column == "datetime_contact_reminder":
result = subjects_to_be_ordered.order_by(order_direction + 'datetime_contact_reminder')
elif order_column == "last_contact_attempt":
# noinspection SpellCheckingInspection
result = subjects_to_be_ordered.annotate(sort_contact_attempt=Max("contactattempt__datetime_when")).order_by(
order_direction + 'sort_contact_attempt')
elif str(order_column).startswith("visit_"):
visit_number = get_visit_number_from_visit_x_string(order_column)
result = order_by_visit(subjects_to_be_ordered, order_direction, visit_number)
else:
logger.warn("Unknown sort column: " + str(order_column))
return result
def get_visit_number_from_visit_x_string(order_column):
return int(str(order_column).split("_")[1])
def filter_by_visit(result, visit_number, visit_type):
# we need to give custom names for filtering params that contain visit_number in it
# because we might want to filter by few visits and they shouldn't collide
datetime_begin_filter = 'visit_' + str(visit_number) + '_datetime_begin'
datetime_end_filter = 'visit_' + str(visit_number) + '_datetime_end'
is_finished_filter = 'visit_' + str(visit_number) + '_is_finished'
finished_appointments_filter = 'visit_' + str(visit_number) + '_finished_appointments'
scheduled_appointments_filter = 'visit_' + str(visit_number) + '_scheduled_appointments'
# this is hack... instead of providing True/False value this field contain 1/0 value, the problem is that we need
# to provide aggregate function for the interacting parameter
# If we try to assign it with pure Case(When...) (without Count/Min/Max/Avg) we obtain duplicates
# of the results which are later on messing up with the subject list
result = result.annotate(**{
is_finished_filter: Count(Case(When(Q(visit__is_finished=True) & Q(visit__visit_number=visit_number), then=1)))
})
# number of finished appointments
result = result.annotate(**{finished_appointments_filter: Count(Case(When(
Q(visit__appointment__status=Appointment.APPOINTMENT_STATUS_FINISHED) & Q(visit__visit_number=visit_number),
then=1)))})
# number of scheduled appointments
result = result.annotate(**{scheduled_appointments_filter: Count(Case(When(
Q(visit__appointment__status=Appointment.APPOINTMENT_STATUS_SCHEDULED) & Q(visit__visit_number=visit_number),
then=1)))})
# when visit starts
result = result.annotate(
**{datetime_begin_filter: Min(Case(When(visit__visit_number=visit_number, then='visit__datetime_begin')))})
# when visit finish
result = result.annotate(
**{datetime_end_filter: Min(Case(When(visit__visit_number=visit_number, then='visit__datetime_end')))})
if visit_type == "DONE":
result = result.filter(**{datetime_begin_filter + "__lt": get_today_midnight_date()}). \
filter(**{is_finished_filter + "__gt": 0}). \
filter(**{finished_appointments_filter + "__gt": 0})
elif visit_type == "MISSED":
result = result.filter(**{datetime_begin_filter + "__lt": get_today_midnight_date()}). \
filter(**{is_finished_filter + "__gt": 0}). \
filter(**{finished_appointments_filter: 0})
elif visit_type == "EXCEED":
result = result.filter(**{datetime_begin_filter + "__lt": get_today_midnight_date()}). \
filter(**{is_finished_filter: 0}). \
filter(**{scheduled_appointments_filter: 0}). \
filter(**{datetime_end_filter + "__lt": get_today_midnight_date()})
elif visit_type == "IN_PROGRESS":
result = result.filter(**{datetime_begin_filter + "__lt": get_today_midnight_date()}). \
filter(**{is_finished_filter: 0}). \
filter(**{scheduled_appointments_filter + "__gt": 0})
elif visit_type == "SHOULD_BE_IN_PROGRESS":
result = result.filter(**{datetime_begin_filter + "__lt": get_today_midnight_date()}). \
filter(**{is_finished_filter: 0}). \
filter(**{datetime_end_filter + "__gt": get_today_midnight_date()}). \
filter(**{scheduled_appointments_filter: 0})
elif visit_type == "UPCOMING":
result = result.filter(**{datetime_begin_filter + "__gt": get_today_midnight_date()})
return result
def get_subjects_filtered(subjects_to_be_filtered, filters):
result = subjects_to_be_filtered
for row in filters:
column = row[0]
value = row[1]
if column == "first_name":
result = result.filter(subject__first_name__icontains=value)
elif column == "last_name":
result = result.filter(subject__last_name__icontains=value)
elif column == "nd_number":
result = result.filter(nd_number__icontains=value)
elif column == "referral":
result = result.filter(referral__icontains=value)
elif column == "screening_number":
result = result.filter(screening_number__icontains=value)
elif column == "dead":
result = result.filter(subject__dead=(value == "true"))
elif column == "resigned":
result = result.filter(resigned=(value == "true"))
elif column == "postponed":
result = result.filter(postponed=(value == "true"))
elif column == "information_sent":
result = result.filter(information_sent=(value == "true"))
elif column == "health_partner_first_name":
result = result.filter(health_partner__first_name__icontains=value)
elif column == "health_partner_last_name":
result = result.filter(health_partner__last_name__icontains=value)
elif column == "social_security_number":
result = result.filter(subject__social_security_number__icontains=value)
elif column == "default_location":
result = result.filter(default_location=value)
elif column == "flying_team":
result = result.filter(flying_team=value)
elif column == "type":
result = result.filter(type=value)
elif str(column).startswith("visit_"):
visit_number = get_visit_number_from_visit_x_string(column)
result = filter_by_visit(result, visit_number, value)
elif column == "":
pass
else:
message = "UNKNOWN filter: "
if column is None:
message += "[None]"
else:
message += str(column)
logger.warn(message)
return result
def subjects(request, type):
try:
# id of the query from dataTable: https://datatables.net/manual/server-side
draw = int(request.GET.get("draw", "-1"))
start = int(request.GET.get("start", "0"))
length = int(request.GET.get("length", "10"))
order = int(request.GET.get("order[0][column]", "0"))
order_dir = request.GET.get("order[0][dir]", "asc")
order_column = request.GET.get("columns[" + str(order) + "][data]", "last_name")
filters = get_filters_for_data_table_request(request)
all_subjects = get_subjects(request, type)
count = all_subjects.count()
filtered_subjects = get_subjects_filtered(all_subjects, filters)
ordered_subjects = get_subjects_order(filtered_subjects, order_column, order_dir, column_filters=dict(filters))
sliced_subjects = ordered_subjects[start:(start + length)]
result_subjects = sliced_subjects
count_filtered = filtered_subjects.count()
data = []
for subject in result_subjects:
data.append(serialize_subject(subject))
return JsonResponse({
"draw": draw,
"recordsTotal": count,
"recordsFiltered": count_filtered,
"data": data,
})
except Exception as e:
logger.error(e, exc_info=True)
return e500_error(request)
# noinspection PyUnusedLocal
def types(request):
data = [{"id": subject_type_id, "name": subject_type_name} for subject_type_id, subject_type_name in
SUBJECT_TYPE_CHOICES.items()]
return JsonResponse({
"types": data
})
def serialize_subject(study_subject):
location = location_to_str(study_subject.default_location)
flying_team = flying_team_to_str(study_subject.flying_team)
visits = Visit.objects.filter(subject=study_subject).order_by('visit_number')
serialized_visits = []
for visit in visits:
if visit.datetime_begin < get_today_midnight_date():
if visit.is_finished:
finished_appointments_count = visit.appointment_set.filter(
status=Appointment.APPOINTMENT_STATUS_FINISHED).count()
if finished_appointments_count > 0:
status = "DONE"
else:
status = "MISSED"
elif visit.datetime_end < get_today_midnight_date():
scheduled_appointments_count = visit.appointment_set.filter(
status=Appointment.APPOINTMENT_STATUS_SCHEDULED).count()
if scheduled_appointments_count > 0:
status = "IN_PROGRESS"
else:
status = "EXCEEDED"
else:
scheduled_appointments_count = visit.appointment_set.filter(
status=Appointment.APPOINTMENT_STATUS_SCHEDULED).count()
if scheduled_appointments_count > 0:
status = "IN_PROGRESS"
else:
status = "SHOULD_BE_IN_PROGRESS"
else:
status = "UPCOMING"
serialized_visits.append({
"status": status,
"datetime_start": serialize_date(visit.datetime_begin),
"datetime_end": serialize_date(visit.datetime_end),
})
contact_reminder = serialize_datetime(study_subject.datetime_contact_reminder)
contact_attempts = ContactAttempt.objects.filter(subject=study_subject).order_by("-datetime_when")
if len(contact_attempts) > 0:
last_contact_attempt = contact_attempts[0]
last_contact_attempt_string = serialize_datetime(last_contact_attempt.datetime_when) + "<br/>" + unicode(
last_contact_attempt.worker) + "<br/> Success: " + bool_to_yes_no(
last_contact_attempt.success) + "<br/>" + last_contact_attempt.comment
else:
last_contact_attempt_string = ""
health_partner_first_name = ""
if study_subject.health_partner:
health_partner_first_name = study_subject.health_partner.first_name
health_partner_last_name = ""
if study_subject.health_partner:
health_partner_last_name = study_subject.health_partner.last_name
result = {
"first_name": study_subject.subject.first_name,
"last_name": study_subject.subject.last_name,
"date_born": study_subject.subject.date_born,
"datetime_contact_reminder": contact_reminder,
"last_contact_attempt": last_contact_attempt_string,
"nd_number": study_subject.nd_number,
"screening_number": study_subject.screening_number,
"referral": study_subject.referral,
"default_location": location,
"flying_team": flying_team,
"dead": bool_to_yes_no(study_subject.subject.dead),
"resigned": bool_to_yes_no(study_subject.resigned),
"postponed": bool_to_yes_no(study_subject.postponed),
"excluded": bool_to_yes_no(study_subject.excluded),
"information_sent": bool_to_yes_no(study_subject.information_sent),
"health_partner_first_name": health_partner_first_name,
"health_partner_last_name": health_partner_last_name,
"social_security_number": study_subject.subject.social_security_number,
"type": study_subject.get_type_display(),
"id": study_subject.id,
"visits": serialized_visits,
}
return result