Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
study_subject_forms.py 16.41 KiB
import datetime
import logging
import re

from django import forms
from django.forms import ModelForm

from web.forms.forms import DATETIMEPICKER_DATE_ATTRS, get_worker_from_args, DATEPICKER_DATE_ATTRS
from web.models import StudySubject, Study, StudyColumns, VoucherType, Worker, Visit
from web.models.constants import (
    CUSTOM_FIELD_TYPE_TEXT,
    CUSTOM_FIELD_TYPE_BOOLEAN,
    CUSTOM_FIELD_TYPE_INTEGER,
    CUSTOM_FIELD_TYPE_DOUBLE,
    CUSTOM_FIELD_TYPE_DATE,
    CUSTOM_FIELD_TYPE_SELECT_LIST,
    CUSTOM_FIELD_TYPE_FILE,
)
from web.models.custom_data import CustomStudySubjectField, CustomStudySubjectValue
from web.models.custom_data.custom_study_subject_field import get_study_subject_field_id
from web.models.worker_study_role import WORKER_HEALTH_PARTNER
from web.widgets.secure_file_widget import SecuredFileWidget

logger = logging.getLogger(__name__)


def get_custom_select_choices(possible_values):
    result = []
    index = 1
    result.append(("0", "---"))
    for value in possible_values.split(";"):
        result.append((str(index), value))
        index += 1
    return result


def create_field_for_custom_study_subject_field(
    study_subject_field: CustomStudySubjectField, field_value: CustomStudySubjectValue = None
) -> forms.Field:
    val = study_subject_field.default_value
    if field_value is not None:
        val = field_value.value
    if study_subject_field.type == CUSTOM_FIELD_TYPE_TEXT:
        field = forms.CharField(
            label=study_subject_field.name,
            initial=val,
            required=study_subject_field.required,
            disabled=study_subject_field.readonly,
        )
    elif study_subject_field.type == CUSTOM_FIELD_TYPE_BOOLEAN:
        initial = False
        if val is not None and val.lower() == "true":
            initial = True
        field = forms.BooleanField(
            label=study_subject_field.name,
            initial=initial,
            required=study_subject_field.required,
            disabled=study_subject_field.readonly,
        )
    elif study_subject_field.type == CUSTOM_FIELD_TYPE_INTEGER:
        initial = None
        if val is not None and re.match(r"[-+]?\d+$", val) is not None:
            initial = int(val)
        field = forms.IntegerField(
            label=study_subject_field.name,
            initial=initial,
            required=study_subject_field.required,
            disabled=study_subject_field.readonly,
        )
    elif study_subject_field.type == CUSTOM_FIELD_TYPE_DOUBLE:
        initial = None
        if val is not None and re.match(r"[-+]?\d+?\.\d+?$", val) is not None:
            initial = float(val)
        field = forms.FloatField(
            label=study_subject_field.name,
            initial=initial,
            required=study_subject_field.required,
            disabled=study_subject_field.readonly,
        )
    elif study_subject_field.type == CUSTOM_FIELD_TYPE_DATE:
        initial = None
        if val is not None and val != "":
            initial = datetime.datetime.strptime(val, "%Y-%m-%d")
        field = forms.DateTimeField(
            label=study_subject_field.name,
            initial=initial,
            required=study_subject_field.required,
            disabled=study_subject_field.readonly,
            widget=forms.DateInput(DATEPICKER_DATE_ATTRS, "%Y-%m-%d"),
        )
    elif study_subject_field.type == CUSTOM_FIELD_TYPE_SELECT_LIST:
        initial = "0"
        for v, k in get_custom_select_choices(study_subject_field.possible_values):
            if k == val:
                initial = v
        field = forms.ChoiceField(
            label=study_subject_field.name,
            initial=initial,
            required=study_subject_field.required,
            disabled=study_subject_field.readonly,
            choices=get_custom_select_choices(study_subject_field.possible_values),
        )
    elif study_subject_field.type == CUSTOM_FIELD_TYPE_FILE:
        initial = None
        if val is not None:

            class CustomFileField:
                url = ""

                def __str__(self):
                    return f"{self.url}"

                def __unicode__(self):
                    return f"{self.url}"

            initial = CustomFileField()
            initial.url = val

        field = forms.FileField(
            label=study_subject_field.name,
            required=study_subject_field.required,
            disabled=study_subject_field.readonly,
            initial=initial,
            widget=SecuredFileWidget(),
        )
    else:
        raise NotImplementedError
    return field


class StudySubjectForm(ModelForm):
    datetime_contact_reminder = forms.DateTimeField(
        label="Contact on", widget=forms.DateTimeInput(DATETIMEPICKER_DATE_ATTRS), required=False
    )
    referral_letter = forms.FileField(label="Referral letter", widget=SecuredFileWidget(), required=False)
    voucher_types = forms.ModelMultipleChoiceField(
        required=False, widget=forms.CheckboxSelectMultiple, queryset=VoucherType.objects.all()
    )

    study: Study

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        instance = kwargs.get("instance")
        if instance:
            for value in instance.custom_data_values:
                field_id = get_study_subject_field_id(value.study_subject_field)
                self.fields[field_id] = create_field_for_custom_study_subject_field(value.study_subject_field, value)
        else:
            for field_type in CustomStudySubjectField.objects.filter(study=self.study):
                field_id = get_study_subject_field_id(field_type)
                self.fields[field_id] = create_field_for_custom_study_subject_field(field_type)

        self.fields["health_partner"].queryset = Worker.get_workers_by_worker_type(WORKER_HEALTH_PARTNER)

    def clean(self):
        cleaned_data = super().clean()
        subject_id = -1
        if getattr(self, "instance", None) is not None:
            subject_id = getattr(self, "instance", None).id
        for field_type in CustomStudySubjectField.objects.filter(study=self.study):
            if field_type.unique:
                field_id = get_study_subject_field_id(field_type)
                value = cleaned_data[field_id]
                if value is not None and value != "":
                    count = (
                        StudySubject.objects.filter(
                            customstudysubjectvalue__study_subject_field=field_type,
                            customstudysubjectvalue__value=value,
                            study=self.study,
                        )
                        .exclude(id=subject_id)
                        .count()
                    )
                    if count > 0:
                        self.add_error(field_id, "Value must be unique within the study")
        return cleaned_data


class StudySubjectAddForm(StudySubjectForm):
    class Meta:
        model = StudySubject
        fields = "__all__"
        exclude = ["resigned", "resign_reason", "endpoint_reached", "endpoint_reached_reason"]

    def __init__(self, *args, **kwargs):
        self.user = get_worker_from_args(kwargs)
        self.study = get_study_from_args(kwargs)

        super().__init__(*args, **kwargs)

        prepare_study_subject_fields(fields=self.fields, study=self.study)

    def save(self, commit=True) -> StudySubject:
        self.instance.study_id = self.study.id
        instance = super().save(commit)
        # we can add custom values only after object exists in the database
        for field_type in CustomStudySubjectField.objects.filter(study=self.study):
            if not field_type.readonly:
                self.instance.set_custom_data_value(
                    field_type, get_study_subject_field_value(field_type, self[get_study_subject_field_id(field_type)])
                )
        return instance

    def build_screening_number(self, cleaned_data):
        screening_number = cleaned_data.get("screening_number", None)
        if not screening_number:
            prefix_screening_number = self.get_prefix_screening_number()
            if prefix_screening_number is not None:
                screening_number = get_new_screening_number(prefix_screening_number)
        return screening_number

    def clean(self):
        cleaned_data = super().clean()
        screening_number = self.build_screening_number(cleaned_data)
        if screening_number is not None and self.study.columns.screening_number:
            cleaned_data["screening_number"] = screening_number
        validate_subject_screening_number(self, cleaned_data)
        validate_subject_nd_number(self, cleaned_data)
        return cleaned_data

    def get_prefix_screening_number(self):
        default_location = self.cleaned_data.get("default_location", None)
        screening_number_prefix = None
        if default_location is not None and default_location.prefix:
            screening_number_prefix = default_location.prefix
        else:
            subject_type = self.cleaned_data.get("type", None)
            if subject_type is not None:
                screening_number_prefix = subject_type.screening_number_prefix
        if screening_number_prefix is None:
            return None
        prefix_screening_number = screening_number_prefix + "-"
        return prefix_screening_number


def get_new_screening_number(screening_number_prefix):
    result_number = 0
    subjects = StudySubject.objects.filter(screening_number__contains=screening_number_prefix)
    for subject in subjects:
        screening_numbers = subject.screening_number.split(";")
        for screening_number in screening_numbers:
            screening_number = screening_number.strip()
            if screening_number.startswith(screening_number_prefix):
                number = screening_number[len(screening_number_prefix):]
                try:
                    result_number = max(result_number, int(number))
                except ValueError:
                    pass

    return screening_number_prefix + str(result_number + 1).zfill(3)


class StudySubjectDetailForm(StudySubjectForm):
    class Meta:
        model = StudySubject
        fields = "__all__"

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        instance = getattr(self, "instance", None)

        self.study = get_study_from_study_subject_instance(instance)

        prepare_study_subject_fields(fields=self.fields, study=self.study)


def get_study_from_study_subject_instance(study_subject):
    if study_subject and study_subject.study_id:
        return Study.objects.filter(id=study_subject.study_id)[0]
    else:
        return Study(columns=StudyColumns())


def get_study_subject_field_value(field_type: CustomStudySubjectField, field: forms.BoundField):
    if field_type.type == CUSTOM_FIELD_TYPE_TEXT:
        return field.value()
    elif field_type.type == CUSTOM_FIELD_TYPE_BOOLEAN:
        return str(field.value())
    elif field_type.type == CUSTOM_FIELD_TYPE_INTEGER:
        if field.value() is None:
            return None
        return str(field.value())
    elif field_type.type == CUSTOM_FIELD_TYPE_DOUBLE:
        if field.value() is None:
            return None
        return str(field.value())
    elif field_type.type == CUSTOM_FIELD_TYPE_DATE:
        if field.value() is None:
            return None
        return field.value()
    elif field_type.type == CUSTOM_FIELD_TYPE_SELECT_LIST:
        if field.value() == "0":
            return ""
        if field.value() is None:
            return None
        for v, k in get_custom_select_choices(field_type.possible_values):
            if v == field.value():
                return k
        return None
    elif field_type.type == CUSTOM_FIELD_TYPE_FILE:
        # this must be handled in view
        return None
    else:
        raise NotImplementedError


class StudySubjectEditForm(StudySubjectForm):
    def __init__(self, *args, **kwargs):
        was_resigned = kwargs.pop("was_resigned", False)
        endpoint_was_reached = kwargs.pop("endpoint_was_reached", False)
        super().__init__(*args, **kwargs)
        instance: StudySubject = getattr(self, "instance", None)
        if instance and instance.id:
            self.fields["screening_number"].widget.attrs["readonly"] = True
        self.study = get_study_from_study_subject_instance(instance)
        self.original_type = instance.type

        self.fields["resigned"].disabled = was_resigned
        self.fields["endpoint_reached"].disabled = endpoint_was_reached

        prepare_study_subject_fields(fields=self.fields, study=self.study)

    def clean(self):
        cleaned_data = super().clean()
        validate_subject_nd_number(self, cleaned_data)
        validate_subject_resign_reason(self, cleaned_data)
        return cleaned_data

    def save(self, commit=True) -> StudySubject:
        for field_type in CustomStudySubjectField.objects.filter(study=self.study):
            if not field_type.readonly:
                self.instance.set_custom_data_value(
                    field_type, get_study_subject_field_value(field_type, self[get_study_subject_field_id(field_type)])
                )

        if self.original_type != self.instance.type:
            self.instance.visit_used_to_compute_followup_date = (
                Visit.objects.filter(subject=self.instance).order_by("-visit_number").first()
            )

        return super().save(commit)

    class Meta:
        model = StudySubject
        fields = "__all__"


def get_study_from_args(kwargs):
    study = kwargs.pop("study", None)
    if study is None:
        raise TypeError("Study not defined")
    return study


def prepare_field(fields, visible_columns: StudyColumns, field_name: str, required: bool = False):
    if not getattr(visible_columns, field_name) and field_name in fields:
        del fields[field_name]
    elif required:
        fields[field_name].required = True


def prepare_study_subject_fields(fields, study: Study):
    prepare_field(fields, study.columns, "default_location", required=True)
    prepare_field(fields, study.columns, "type", required=True)
    prepare_field(fields, study.columns, "screening_number")
    prepare_field(fields, study.columns, "nd_number")
    prepare_field(fields, study.columns, "datetime_contact_reminder")
    prepare_field(fields, study.columns, "postponed")
    prepare_field(fields, study.columns, "flying_team")
    prepare_field(fields, study.columns, "comments")
    prepare_field(fields, study.columns, "referral")
    prepare_field(fields, study.columns, "information_sent")
    prepare_field(fields, study.columns, "endpoint_reached")
    prepare_field(fields, study.columns, "endpoint_reached_reason")
    prepare_field(fields, study.columns, "excluded")
    prepare_field(fields, study.columns, "exclude_reason")
    prepare_field(fields, study.columns, "resigned")
    prepare_field(fields, study.columns, "resign_reason")
    prepare_field(fields, study.columns, "referral_letter")
    prepare_field(fields, study.columns, "health_partner")
    prepare_field(fields, study.columns, "health_partner_feedback_agreement")
    if not study.columns.vouchers:
        del fields["voucher_types"]


def validate_subject_screening_number(self, cleaned_data):
    if self.study.columns.resign_reason and self.study.columns.screening_number:
        subjects_from_db = StudySubject.objects.filter(
            screening_number=cleaned_data["screening_number"], study=self.study
        )
        if len(subjects_from_db) > 0:
            self.add_error("screening_number", "Screening number already in use")


def validate_subject_nd_number(self, cleaned_data):
    if self.study.columns.nd_number:
        nd_number = cleaned_data["nd_number"]
        if nd_number is None:
            self.add_error("nd_number", "None ND number. ND number can be blank but not None.")
        elif nd_number:
            if not self.study.check_nd_number(nd_number):
                self.add_error("nd_number", "Invalid ND number")
            else:
                subjects_from_db = StudySubject.objects.filter(nd_number=nd_number, study=self.study)
                if subjects_from_db:
                    if subjects_from_db[0].screening_number != cleaned_data.get("screening_number", ""):
                        self.add_error("nd_number", "Subject number already in use")
        # else: #empty nd_number is valid


def validate_subject_resign_reason(self, cleaned_data):
    if self.study.columns.resigned and self.study.columns.resign_reason:
        if cleaned_data["resigned"] and cleaned_data["resign_reason"] == "":
            self.add_error("resign_reason", "Resign reason cannot be empty")