# coding=utf-8 import os import getpass import django from django.conf import settings os.environ.setdefault("DJANGO_SETTINGS_MODULE", "smash.settings") django.setup() from smash.local_settings import MEDIA_ROOT from shutil import copyfile import pandas as pd import numpy as np import logging import datetime from dateutil.relativedelta import relativedelta import re from operator import itemgetter from collections import OrderedDict, defaultdict import sys import string from django.contrib.auth.models import User from web.models.constants import VOUCHER_STATUS_IN_USE, SUBJECT_TYPE_CHOICES_PATIENT, GLOBAL_STUDY_ID, SEX_CHOICES, SEX_CHOICES_MALE, SEX_CHOICES_FEMALE from web.algorithm import VerhoeffAlgorithm, LuhnAlgorithm from web.utils import is_valid_social_security_number from web.models import VoucherType, Voucher, Country, AppointmentTypeLink, AppointmentType, Study, Worker, Language, Subject, WorkerStudyRole, StudySubject, Location, FlyingTeam, Visit, Appointment, AppointmentType from web.models.worker_study_role import ROLE_CHOICES_TECHNICIAN, WORKER_STAFF, ROLE_CHOICES_SECRETARY, ROLE_CHOICES_HEALTH_PARTNER, \ WORKER_HEALTH_PARTNER, ROLE_CHOICES_VOUCHER_PARTNER, WORKER_VOUCHER_PARTNER DEFAULT_LOCATION = 'CHL' DEFAULT_LOCATION_PREFIX = 'P' date_regex = re.compile(r'\d{1,2}\.\d{1,2}\.\d{4}') language_base_dir = 'web/static/flags/' language_flags = { 'French': 'FR.png', 'English': 'GB.png', 'Finnish': 'FI.png', 'Italian': 'IT.png', 'Spanish': 'ES.png', 'Portuguese': 'PT.png', 'Luxembourgish': 'LU.png', 'German': 'DE.png', 'Dutch': 'NL.png' } def get_new_screening_number(screening_number_prefix): result_number = 0 subjects = StudySubject.objects.filter(screening_number__contains=screening_number_prefix) for subject in subjects: screening_numbers = subject.screening_number.split(";") for screening_number in screening_numbers: screening_number = screening_number.strip() if screening_number.startswith(screening_number_prefix): number = screening_number[len(screening_number_prefix)+1:] try: result_number = max(result_number, int(number)) except ValueError: pass return screening_number_prefix + '-' + str(result_number + 1).zfill(3) def itembetter(items, lst): if len(items) == 1: return [itemgetter(*items)(lst)] else: return list(itemgetter(*items)(lst)) def indexof(element, l): return [i for i, x in enumerate(l) if x == element] ''' # Things that *could* already be on the database: - Language - Country - Location - Flying Team - Referals (Health Partner) - A subject with the same SS number, first name and last name - A studySubject with the same ND number, subject # Columns to be transformed to a standard format - Gender - Language - Prefered writen language - Country - SS number - Date of birth - Date added (V1) # visits might have comments like (Tel) - ... (V2) - ... (V3) - ... (V4) - Voucher activity (remove cells that include None in any form and split by breakline) - Voucher reference (split) Boolean - Deceased - Postponed - Resigned - Excluded - PDP 1.0 ''' ''' Column Converter Functions ''' # converters # Boolean: # Deceased # Postponed # Resigned # Excluded # PDP 1.0 # Flying Team (FT) def parse_voucher_reference(vr): vr = vr.strip() #strip spaces return vr.split('\n') if vr != u'' else [] #if empty string then return empty list, otherwise split by break line def parse_voucher_type(vt): vt = '' if 'NONE' in vt.upper() else vt #if vt includes none in any form, then return empty vt = vt.strip() #strip spaces return vt.split('\n') if vt != u'' else [] #if empty string then return empty list, otherwise split by break line def parse_boolean(boolean_Y_N): ''' Return True if 'y' or 'Y' is found. Otherwise return False even if it fails ''' try: if isinstance(boolean_Y_N, float) and np.isnan(boolean_Y_N): return False elif boolean_Y_N.upper() == 'Y': return True else: return False except Exception as e: logging.warn('parse_boolean failed for {}.'.format(boolean_Y_N)) logging.warn('{} {}'.format(e.message, e.args)) return False # birth date def parse_column_date_of_birth(date): return datetime.datetime.strptime(date, '%d.%m.%Y').strftime('%Y-%m-%d') # gender gender_table = {'m': SEX_CHOICES_MALE, 'f': SEX_CHOICES_FEMALE} def parse_column_gender(gender): try: return gender_table[gender.lower()] except: return None # SS number def parse_column_ss_number(ss): ss = ss.replace(' ', '') if not ss.isdigit(): return None if len(ss) == 11: logging.debug('Old SS number: |{}|'.format(ss)) eleventh_digit = VerhoeffAlgorithm.calculate_verhoeff_check_sum(ss[ :11]) twelfth_digit = LuhnAlgorithm.calculate_luhn_checksum(ss[:11]) ss = ss + eleventh_digit + twelfth_digit # we revalidate the old ss too after we append the digits if len(ss) == 13: if not is_valid_social_security_number(ss): logging.debug('Invalid SS number: |{}|'.format(ss)) else: logging.debug('Invalid SS number: (Length not valid) |{}|'.format(ss)) return ss # Language language_table = { 'L': 'Luxembourgish', 'LT': 'Lithuanian', 'IT': 'Italian', 'F': 'French', 'D': 'German', 'E': 'English', 'P': 'Portuguese', 'A': 'Arabic', 'SP': 'Spanish', 'FIN': 'Finnish' } locale_table = { 'Luxembourgish': ('lb_LU', 'LU'), 'Lithuanian': ('lt_LT', 'LT'), 'Italian': ('it_IT', 'IT'), 'French': ('fr_FR', 'FR'), 'German': ('de_DE', 'DE'), 'English': ('en_GB', 'GB'), 'Portuguese': ('pt_PT', 'PT'), 'Arabic': ('ar_sa', None), 'Spanish': ('es_ES', 'ES') , 'Finnish': ('fi_FI', 'FI') } language_translation_table = { # deletions ord(u')'): None, ord(u'('): None, ord(u' '): None, # replacements ord(u','): u';' } def apply_column_prefered_language(languages): return apply_column_languages(languages)[:1] def apply_column_languages(languages): languages = languages.strip() if type(languages) != float and len(languages) > 0: # replacements and transformations languages = unicode(languages).upper().translate( language_translation_table) new_list = [] for language in languages.split(';'): if language in language_table: language = language_table[language] new_list.append(language) return np.array(new_list) else: logging.debug( 'Parse Languages: Empty, NaN, or invalid Languages: |{}|'.format(languages)) return np.array([]) # Country country_table = { 'LUX': 'Luxembourg' } def apply_column_country(country): try: return country_table[country] except: logging.warn('Invalid Country: {}'.format(country)) return country ''' Instead of using the converters parameter from read_excel method, we opt for make the transformations later since the read_excel method does not allow converters that return a list. ''' converters = { 'DATE OF BIRTH': parse_column_date_of_birth, 'GENDER': parse_column_gender, 'SS NUMBER': parse_column_ss_number, 'COUNTRY': apply_column_country, 'LANGUAGES': apply_column_languages, 'PREFERED WRITEN LANGUAGE': apply_column_prefered_language, 'DECEASED': parse_boolean, 'POSTPONED': parse_boolean, 'RESIGNED': parse_boolean, 'EXCLUDED': parse_boolean, 'PDP 1.0': parse_boolean, 'FLYING TEAM (FT)': parse_boolean, 'VOUCHER ACTIVITY': parse_voucher_type, 'VOUCHER REFERENCE': parse_voucher_reference } # add voucher for subject voucher_partners = {} voucher_partners['ZIT'] = 'Zitha' def add_subject_vouchers(voucher_reference, referral, voucher_types): nd_number, date, voucher_partner, voucher_type, num = voucher_reference.split('-') nd_number = nd_number.upper().replace('ND', 'PDP') issue_date = datetime.datetime.strptime(date, '%Y%m%d') expiry_date = issue_date + relativedelta(months=+6) usage_partner, created = Worker.objects.update_or_create( name=voucher_partners.get(voucher_partner, voucher_partner), last_name=voucher_partners.get(voucher_partner, voucher_partner), voucher_partner_code=voucher_partner[0].upper()) usage_partner.roles.update(role=ROLE_CHOICES_VOUCHER_PARTNER) # create workerStudyRole workerStudyRole, _ = WorkerStudyRole.objects.update_or_create(worker=usage_partner, study_id=GLOBAL_STUDY_ID, role=ROLE_CHOICES_VOUCHER_PARTNER) usage_partner.voucher_types.set(voucher_types.values()) usage_partner.save() if created: logging.warn('New Voucher Partner created: {}'.format(voucher_partner)) vt = VoucherType.objects.get(code=voucher_type) study_subject = StudySubject.objects.get(nd_number=nd_number) voucher, created = Voucher.objects.update_or_create(number=voucher_reference, issue_date=issue_date, expiry_date=expiry_date, voucher_type=vt, study_subject=study_subject, status=VOUCHER_STATUS_IN_USE, usage_partner=usage_partner, issue_worker=referral) logging.warn('New Voucher added: {}'.format(voucher_reference)) return voucher # create voucher types def create_voucher_types(voucher_types_dict, study): voucher_types = {} for name, code in voucher_types_dict.items(): voucher_type, _ = VoucherType.objects.update_or_create(code=code, description=name, study=study) voucher_types[name] = voucher_type return voucher_types # create appointment types def create_appointment_types(assessments): appointmentTypes = [] for name, duration in assessments.items(): code = filter(str.isupper, name) appointmentType, _ = AppointmentType.objects.update_or_create( code=code, default_duration=duration, description=name) appointmentType.save() appointmentTypes.append(appointmentType) return appointmentTypes def parse_row(index, row, visit_columns, appointmentTypes, voucher_types, lcsb_worker): # Languages if len(row['LANGUAGES']) == 0 and len(row['PREFERED WRITEN LANGUAGE']) == 0: logging.warn('No Languages available') elif len(row['LANGUAGES']) == 0 and len(row['PREFERED WRITEN LANGUAGE']) > 0: row['LANGUAGES'] = row['PREFERED WRITEN LANGUAGE'] elif len(row['LANGUAGES']) > 0 and len(row['PREFERED WRITEN LANGUAGE']) == 0: row['PREFERED WRITEN LANGUAGE'] = row['LANGUAGES'] languages = [] for language in row['LANGUAGES']: lang, created = Language.objects.get_or_create( name=language, locale=locale_table.get(language,(None, None))[0]) languages.append(lang) if created: logging.warn('New Language added: {}'.format(language)) lang.save() if language in language_flags: src = os.path.join(language_base_dir, language_flags[language]) basename = os.path.basename(src) dst = os.path.join(MEDIA_ROOT, basename) copyfile(src, dst) # .save(basename, File(open(dst, 'rb'))) #SimpleUploadedFile(name=path, content=open(path, 'rb').read(), content_type='image/png') lang.image = basename lang.save() for language in row['PREFERED WRITEN LANGUAGE'][:1]: pref_lang, created = Language.objects.get_or_create(name=language ,locale=locale_table.get(language,(None, None))[0]) if created: logging.warn( 'New Language (from Prefered) added: {}'.format(language)) pref_lang.save() # Country country = row['COUNTRY'] country, created = Country.objects.get_or_create(name=country) if created: logging.warn('New Country added: {}'.format(row['COUNTRY'])) country.save() # Location and Flying Team # If no FT, then default location is CHL ft = None location = None prefix = None if not row['FLYING TEAM (FT)']: prefix = DEFAULT_LOCATION_PREFIX location, created = Location.objects.get_or_create( name=DEFAULT_LOCATION, prefix=prefix) if created: logging.warn('New location added: {}'.format(DEFAULT_LOCATION)) location.save() else: prefix = 'F' location, created = Location.objects.get_or_create( name='Flying Team', prefix=prefix) if created: logging.warn('New location added: Flying Team') location.save() # Create Flying Team ft, created = FlyingTeam.objects.get_or_create( place=row['LOCATION OF FT']) if created: logging.warn('New Flying Team added: {}'.format( row['LOCATION OF FT'])) ft.save() # Health Partner # create health partner (Referral) health_partner, created = Worker.objects.get_or_create(name=row['REFERRAL']) health_partner.roles.update(role=ROLE_CHOICES_HEALTH_PARTNER) # create workerStudyRole workerStudyRole, _ = WorkerStudyRole.objects.update_or_create( worker=health_partner, study_id=GLOBAL_STUDY_ID, role=ROLE_CHOICES_HEALTH_PARTNER) health_partner.save() if created: logging.warn('New Health Partner added: {}'.format(row['REFERRAL'])) subject, created = Subject.objects.get_or_create(social_security_number=row['SS NUMBER'], first_name=row['FIRST NAME'], last_name=row['LAST NAME'], defaults={ 'social_security_number': row['SS NUMBER'], 'first_name': row['FIRST NAME'], 'last_name': row['LAST NAME'], 'sex': row['GENDER'], 'phone_number': row['PHONE NUMBER 1'], 'phone_number_2': row['PHONE NUMBER 2'], 'email': row['E-MAIL'], 'date_born': row['DATE OF BIRTH'], 'address': row['ADDRESS'], 'postal_code': row['POSTAL CODE'], 'city': row['CITY'], 'country': country, 'dead': row['DECEASED'], 'default_written_communication_language': pref_lang }) subject.languages.set(languages) subject.save() if created: logging.warn('New Subject added with SS number: {}'.format(row['SS NUMBER'])) # StudySubject study = Study.objects.filter(id=GLOBAL_STUDY_ID)[0] nd_number = row['ND NUMBER'].upper().replace('ND', 'PDP') studySubject, created = StudySubject.objects.get_or_create(subject=subject, nd_number=nd_number, defaults={ 'subject': subject, 'study': study, 'postponed': row['POSTPONED'], 'nd_number': nd_number, 'resigned': row['RESIGNED'], 'resign_reason': row['REASON'], 'type': SUBJECT_TYPE_CHOICES_PATIENT, 'excluded': row['EXCLUDED'], 'exclude_reason': row['REASON.1'], 'previously_in_study': row['PDP 1.0'], 'comments': row['COMMENT'], 'default_location': location, 'flying_team': ft, 'screening_number': get_new_screening_number(prefix), 'date_added': parse_column_date_of_birth(row['DATE ADDED (V1)']) }) #all study subjects can have all voucher types studySubject.voucher_types.set(voucher_types.values()) studySubject.save() if created: logging.warn('New StudySubject added with ND number: {}'.format(nd_number)) #VOUCHERS voucher_references = row['VOUCHER REFERENCE'] for voucher_reference in voucher_references: voucher = add_subject_vouchers(voucher_reference, health_partner, voucher_types) # Visits # Consider all visits as part of the same visit with multiple appointments appointments = [] appointment = None ''' map(date_regex.findall gets all the dates in the strings ignoring comments such as Tel sum(Ans, []) flattens the resulting list from the map since each findall returns a list map to convert string to datetime ''' visit_dates = map(lambda x: datetime.datetime.strptime( x, '%d.%m.%Y'), sum(map(date_regex.findall, row[visit_columns].values), [])) # get first and last elements of the sorted element datetime_begin, datetime_end = itemgetter(*[0, -1])(sorted(visit_dates)) datetime_begin = datetime_begin.strftime('%Y-%m-%d') datetime_end = datetime_end.strftime('%Y-%m-%d') visit, created = Visit.objects.get_or_create( subject=studySubject, datetime_begin=datetime_begin, datetime_end=datetime_end, defaults={ 'is_finished': True}) if created: logging.warn('New Visit added for ND number {} starting on {}'.format( nd_number, datetime_begin)) appointment_types = appointmentTypes[:len(set(visit_dates))] #in this case appointment types are incremental visit.appointment_types.set(appointment_types) visit.save() ''' If there are two Vx with the same date we put together the appointment types in the same appointment ''' starting_hour = 9 for visit_date in set(visit_dates): datetime_when = visit_date.replace(hour=starting_hour, minute=0, second=0, microsecond=0) starting_hour+=1 # get the indices of each occurrence of the date and use them to get # the appointment types appointment_types = itembetter( indexof(visit_date, visit_dates), appointmentTypes) # creatre appointment appointment, _ = Appointment.objects.update_or_create( visit=visit, length=sum([a.default_duration for a in appointment_types]), flying_team=ft, location=location, status=Appointment.APPOINTMENT_STATUS_FINISHED, datetime_when=datetime_when, worker_assigned=lcsb_worker) date_when = visit_date.replace( hour=9, minute=0, second=0, microsecond=0) for appointment_type in appointment_types: app_type_link = AppointmentTypeLink( appointment=appointment, date_when=date_when, appointment_type=appointment_type, worker=lcsb_worker) date_when += datetime.timedelta( minutes=appointment_type.default_duration) app_type_link.save() def createWorker(password, email='', username='admin', first_name='LCSB', last_name='Admin', specialization='Technician', unit='LCSB'): # create user defaults = {'email': email, 'password': password} user, _ = User.objects.update_or_create(username=username, defaults=defaults) user.set_password(password) user.is_superuser = True user.is_staff = True user.is_admin = True user.save() # create worker defaults = {'first_name': first_name, 'last_name': last_name, 'email': email, 'unit': unit, 'specialization': specialization , 'user': user} worker, _ = Worker.objects.update_or_create(first_name=first_name, last_name=last_name, defaults=defaults) locations = Location.objects.all() worker.locations.set(locations) languages = Language.objects.all() worker.languages.set(languages) worker.save() # create workerStudyRole workerStudyRole, _ = WorkerStudyRole.objects.update_or_create(worker=worker, study_id=GLOBAL_STUDY_ID, role=ROLE_CHOICES_TECHNICIAN) workerStudyRole.save() logging.info('SuperUser and Worker {} created'.format(username)) return worker if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) if len(sys.argv) < 2: logging.warn('Please, execute the program as: python {} file_path.xlsx'.format(sys.argv[0])) sys.exit(1) file = sys.argv[1] if not os.path.isfile(file): logging.warn('Please, execute the program with a valid file path.') sys.exit(1) #create worker and super user pass1 = '' pass2 = None while pass1 != pass2: pass1 = getpass.getpass('Please type a password for the Admin user: ') pass2 = getpass.getpass('Please type your password again: ') if pass1 != pass2: print 'Password mismatch, please try again' lcsb_worker = createWorker(pass1) df = pd.read_excel(file, dtype=object) df = df.fillna('').astype(unicode) df.columns = [c.upper() for c in df.columns] # make transformations for column, function in converters.items(): logging.warn(column) df[column] = df[column].apply(function) # get visits columns regex = re.compile(r'\(V\d\)') # study = Study.objects.filter(id=GLOBAL_STUDY_ID)[0] #enable vouchers study.columns.voucher_types = True study.columns.vouchers = True study.nd_number_study_subject_regex = r'^PDP\d{4}$' study.columns.save() study.save() # visit_columns = filter(regex.search, df.columns) assessments = OrderedDict([('Cognitive Test', 180), ('Risk Factor', 120), ('Voucher Distribution', 120), ('Follow Up', 90)]) appointmentTypes = create_appointment_types(assessments) voucher_types_dict = OrderedDict([('Cognitive Activity', 'CA'), ('Neurofit', 'NF'), ('Mobilfit', 'MF'), ('Diet', 'D'), ('Consulte ORL', 'CORL'), ('Physical Activity', 'PA'), ('Individual Cognitive Training', 'IT'), ('Social', 'S'), ('Test', 'T')]) voucher_types = create_voucher_types(voucher_types_dict, study) # process each row for index, row in df.iterrows(): parse_row(index, row, visit_columns, appointmentTypes, voucher_types, lcsb_worker)