import logging from distutils.util import strtobool from django import forms from django.forms import ModelForm from web.forms.forms import DATETIMEPICKER_DATE_ATTRS, get_worker_from_args from web.models import ConfigurationItem from web.models import StudySubject, Study, StudyColumns, VoucherType, Worker from web.models.constants import SCREENING_NUMBER_PREFIXES_FOR_TYPE, VISIT_SHOW_VISIT_NUMBER_FROM_ZERO from web.models.worker_study_role import WORKER_HEALTH_PARTNER from web.widgets.secure_file_widget import SecuredFileWidget logger = logging.getLogger(__name__) class StudySubjectForm(ModelForm): datetime_contact_reminder = forms.DateTimeField(label="Contact on", widget=forms.DateTimeInput(DATETIMEPICKER_DATE_ATTRS), required=False) referral_letter = forms.FileField(label='Referral letter', widget=SecuredFileWidget(), required=False) voucher_types = forms.ModelMultipleChoiceField(required=False, widget=forms.CheckboxSelectMultiple, queryset=VoucherType.objects.all()) def __init__(self, *args, **kwargs): super(StudySubjectForm, self).__init__(*args, **kwargs) self.fields['health_partner'].queryset = Worker.get_workers_by_worker_type( WORKER_HEALTH_PARTNER) visit_from_zero = ConfigurationItem.objects.get(type=VISIT_SHOW_VISIT_NUMBER_FROM_ZERO).value # True values are y, yes, t, true, on and 1; false values are n, no, f, false, off and 0. if strtobool(visit_from_zero): virus_visit_numbers = range(0, 5) for one_based_idx, virus_visit_number in enumerate(virus_visit_numbers, 1): field = 'virus_test_{}'.format(one_based_idx) self.fields[field].label = 'Visit {} RT-PCR'.format(virus_visit_number) date_field = 'virus_test_{}_updated'.format(one_based_idx) self.fields[date_field].label = 'Visit {} RT-PCR date'.format(virus_visit_number) for visit_number in range(1, 6): disable_virus_test_field(self, visit_number) self.update_virus_inconclusive_data(visit_number) def update_virus_inconclusive_data(self, visit_number): test_result_column_name = 'virus_test_{}'.format(visit_number) test_date_column_name = 'virus_test_{}_updated'.format(visit_number) self.fields[test_result_column_name].widget.choices.append(("Inc", 'Inconclusive')) instance = getattr(self, 'instance', None) if instance and instance.id: test_result = getattr(instance, test_result_column_name) test_date = getattr(instance, test_date_column_name) if test_result is None and test_date is not None: self.initial[test_result_column_name] = "Inc" class StudySubjectAddForm(StudySubjectForm): class Meta: model = StudySubject fields = '__all__' exclude = ['resigned', 'resign_reason', 'endpoint_reached', 'endpoint_reached_reason'] def __init__(self, *args, **kwargs): self.user = get_worker_from_args(kwargs) self.study = get_study_from_args(kwargs) super(StudySubjectAddForm, self).__init__(*args, **kwargs) for visit_number in range(1, 6): disable_virus_test_field(self, visit_number) prepare_study_subject_fields(fields=self.fields, study=self.study) def save(self, commit=True): self.instance.study_id = self.study.id return super(StudySubjectAddForm, self).save(commit) def build_screening_number(self, cleaned_data): screening_number = cleaned_data.get('screening_number', None) if not screening_number: prefix_screening_number = self.get_prefix_screening_number() if prefix_screening_number is not None: screening_number = get_new_screening_number(prefix_screening_number) return screening_number def clean(self): cleaned_data = super(StudySubjectAddForm, self).clean() screening_number = self.build_screening_number(cleaned_data) if screening_number is not None and self.study.columns.screening_number: cleaned_data['screening_number'] = screening_number validate_subject_screening_number(self, cleaned_data) validate_subject_nd_number(self, cleaned_data) validate_subject_mpower_number(self, cleaned_data) return cleaned_data def get_prefix_screening_number(self): default_location = self.cleaned_data.get('default_location', None) screening_number_prefix = None if default_location is not None and default_location.prefix: screening_number_prefix = default_location.prefix else: subject_type = self.cleaned_data.get('type', None) if subject_type is not None: screening_number_prefix = SCREENING_NUMBER_PREFIXES_FOR_TYPE[subject_type] if screening_number_prefix is None: return None prefix_screening_number = screening_number_prefix + "-" return prefix_screening_number def get_new_screening_number(screening_number_prefix): result_number = 0 subjects = StudySubject.objects.filter(screening_number__contains=screening_number_prefix) for subject in subjects: screening_numbers = subject.screening_number.split(";") for screening_number in screening_numbers: screening_number = screening_number.strip() if screening_number.startswith(screening_number_prefix): number = screening_number[len(screening_number_prefix):] try: result_number = max(result_number, int(number)) except ValueError: pass return screening_number_prefix + str(result_number + 1).zfill(3) class StudySubjectDetailForm(StudySubjectForm): class Meta: model = StudySubject fields = '__all__' def __init__(self, *args, **kwargs): super(StudySubjectDetailForm, self).__init__(*args, **kwargs) instance = getattr(self, 'instance', None) self.study = get_study_from_study_subject_instance(instance) prepare_study_subject_fields(fields=self.fields, study=self.study) def get_study_from_study_subject_instance(study_subject): if study_subject and study_subject.study_id: return Study.objects.filter(id=study_subject.study_id)[0] else: return Study(columns=StudyColumns()) class StudySubjectEditForm(StudySubjectForm): def __init__(self, *args, **kwargs): was_resigned = kwargs.pop('was_resigned', False) endpoint_was_reached = kwargs.pop('endpoint_was_reached', False) super(StudySubjectEditForm, self).__init__(*args, **kwargs) instance = getattr(self, 'instance', None) if instance and instance.id: self.fields['screening_number'].widget.attrs['readonly'] = True self.study = get_study_from_study_subject_instance(instance) self.fields['resigned'].disabled = was_resigned self.fields['endpoint_reached'].disabled = endpoint_was_reached prepare_study_subject_fields(fields=self.fields, study=self.study) def clean(self): validate_subject_nd_number(self, self.cleaned_data) validate_subject_mpower_number(self, self.cleaned_data) validate_subject_resign_reason(self, self.cleaned_data) class Meta: model = StudySubject fields = '__all__' def disable_virus_test_field(self, visit_number): test_result_column_name = 'virus_test_{}'.format(visit_number) test_date_column_name = 'virus_test_{}_updated'.format(visit_number) self.fields[test_result_column_name].widget.attrs['readonly'] = True self.fields[test_result_column_name].widget.attrs['disabled'] = True self.fields[test_date_column_name].widget.attrs['readonly'] = True def get_study_from_args(kwargs): study = kwargs.pop('study', None) if study is None: raise TypeError("Study not defined") return study def prepare_field(fields, visible_columns, field_name, required=False): if not getattr(visible_columns, field_name) and field_name in fields: del fields[field_name] elif required: fields[field_name].required = True def prepare_study_subject_fields(fields, study): prepare_field(fields, study.columns, 'default_location', required=True) prepare_field(fields, study.columns, 'type', required=True) prepare_field(fields, study.columns, 'screening_number') prepare_field(fields, study.columns, 'nd_number') prepare_field(fields, study.columns, 'datetime_contact_reminder') prepare_field(fields, study.columns, 'postponed') prepare_field(fields, study.columns, 'brain_donation_agreement') prepare_field(fields, study.columns, 'flying_team') prepare_field(fields, study.columns, 'mpower_id') prepare_field(fields, study.columns, 'comments') prepare_field(fields, study.columns, 'referral') prepare_field(fields, study.columns, 'diagnosis') prepare_field(fields, study.columns, 'year_of_diagnosis') prepare_field(fields, study.columns, 'information_sent') prepare_field(fields, study.columns, 'pd_in_family') prepare_field(fields, study.columns, 'endpoint_reached') prepare_field(fields, study.columns, 'excluded') prepare_field(fields, study.columns, 'resigned') prepare_field(fields, study.columns, 'resign_reason') prepare_field(fields, study.columns, 'referral_letter') prepare_field(fields, study.columns, 'health_partner') prepare_field(fields, study.columns, 'health_partner_feedback_agreement') prepare_field(fields, study.columns, 'screening') prepare_field(fields, study.columns, 'previously_in_study') prepare_field(fields, study.columns, 'voucher_types') prepare_field(fields, study.columns, 'virus_test_1') prepare_field(fields, study.columns, 'virus_test_2') prepare_field(fields, study.columns, 'virus_test_3') prepare_field(fields, study.columns, 'virus_test_4') prepare_field(fields, study.columns, 'virus_test_5') prepare_field(fields, study.columns, 'virus_test_1_updated') prepare_field(fields, study.columns, 'virus_test_2_updated') prepare_field(fields, study.columns, 'virus_test_3_updated') prepare_field(fields, study.columns, 'virus_test_4_updated') prepare_field(fields, study.columns, 'virus_test_5_updated') def validate_subject_screening_number(self, cleaned_data): if self.study.columns.resign_reason: subjects_from_db = StudySubject.objects.filter(screening_number=cleaned_data["screening_number"], study=self.study) if len(subjects_from_db) > 0: self.add_error('screening_number', "Screening number already in use") def validate_subject_nd_number(self, cleaned_data): if self.study.columns.nd_number: nd_number = cleaned_data['nd_number'] if nd_number is None: self.add_error('nd_number', "None ND number. ND number can be blank but not None.") elif nd_number: if not self.study.check_nd_number(nd_number): self.add_error('nd_number', "Invalid ND number") else: subjects_from_db = StudySubject.objects.filter( nd_number=nd_number, study=self.study) if subjects_from_db: if subjects_from_db[0].screening_number != cleaned_data.get('screening_number', ''): self.add_error('nd_number', "ND number already in use") # else: #empty nd_number is valid def validate_subject_resign_reason(self, cleaned_data): if self.study.columns.resigned and self.study.columns.resign_reason: if cleaned_data['resigned'] and cleaned_data['resign_reason'] == '': self.add_error('resign_reason', "Resign reason cannot be empty") def validate_subject_mpower_number(self, cleaned_data): if self.study.columns.mpower_id: if cleaned_data['mpower_id'] != "": subjects_from_db = StudySubject.objects.filter( mpower_id=cleaned_data['mpower_id']) if subjects_from_db: if subjects_from_db[0].screening_number != cleaned_data.get('screening_number', ''): self.add_error('mpower_id', "mPower number already in use")