# coding=utf-8 import os import sys sys.path.append(sys.path.append(os.path.join(os.path.dirname(__file__), '..'))) #run script as it was on parent folder import getpass import django os.environ.setdefault("DJANGO_SETTINGS_MODULE", "smash.settings") django.setup() from smash.local_settings import MEDIA_ROOT from shutil import copyfile import pandas as pd import numpy as np import logging import datetime from dateutil.relativedelta import relativedelta import re from operator import itemgetter from collections import OrderedDict from django.contrib.auth.models import User from web.models.constants import VOUCHER_STATUS_IN_USE, SUBJECT_TYPE_CHOICES_PATIENT, GLOBAL_STUDY_ID, SEX_CHOICES_MALE, SEX_CHOICES_FEMALE from web.algorithm import VerhoeffAlgorithm, LuhnAlgorithm from web.utils import is_valid_social_security_number from web.models import VoucherType, Voucher, Country, AppointmentTypeLink, Study, Worker, Language, Subject, WorkerStudyRole, StudySubject, Location, FlyingTeam, Visit, Appointment, AppointmentType from web.models.worker_study_role import ROLE_CHOICES_TECHNICIAN, ROLE_CHOICES_SECRETARY, ROLE_CHOICES_HEALTH_PARTNER, \ ROLE_CHOICES_DOCTOR, ROLE_CHOICES_VOUCHER_PARTNER, ROLE_CHOICES_NURSE, ROLE_CHOICES_PSYCHOLOGIST, ROLE_CHOICES_PROJECT_MANAGER DEFAULT_LOCATION = 'CHL' DEFAULT_LOCATION_PREFIX = 'P' date_regex = re.compile(r'\d{1,2}\.\d{1,2}\.\d{4}') language_base_dir = 'web/static/flags/' language_flags = { 'French': 'FR.png', 'English': 'GB.png', 'Finnish': 'FI.png', 'Italian': 'IT.png', 'Spanish': 'ES.png', 'Portuguese': 'PT.png', 'Luxembourgish': 'LU.png', 'German': 'DE.png', 'Dutch': 'NL.png' } def get_new_screening_number(screening_number_prefix): result_number = 0 subjects = StudySubject.objects.filter(screening_number__contains=screening_number_prefix) for subject in subjects: screening_numbers = subject.screening_number.split(";") for screening_number in screening_numbers: screening_number = screening_number.strip() if screening_number.startswith(screening_number_prefix): number = screening_number[len(screening_number_prefix)+1:] try: result_number = max(result_number, int(number)) except ValueError: pass return screening_number_prefix + '-' + str(result_number + 1).zfill(3) def itembetter(items, lst): if len(items) == 1: return [itemgetter(*items)(lst)] else: return list(itemgetter(*items)(lst)) def indexof(element, l): return [i for i, x in enumerate(l) if x == element] ''' # Things that *could* already be on the database: - Language - Country - Location - Flying Team - Referals (Health Partner) - A subject with the same SS number, first name and last name - A studySubject with the same ND number, subject # Columns to be transformed to a standard format - Gender - Language - Prefered writen language - Country - SS number - Date of birth - Date added (V1) # visits might have comments like (Tel) - ... (V2) - ... (V3) - ... (V4) - Voucher activity (remove cells that include None in any form and split by breakline) - Voucher reference (split) Boolean - Deceased - Postponed - Resigned - Excluded - PDP 1.0 ''' ''' Column Converter Functions ''' # converters # Boolean: # Deceased # Postponed # Resigned # Excluded # PDP 1.0 # Flying Team (FT) def parse_voucher_reference(vr): vr = vr.strip() #strip spaces return vr.split('\n') if vr != '' else [] #if empty string then return empty list, otherwise split by break line def parse_voucher_type(vt): vt = '' if 'NONE' in vt.upper() else vt #if vt includes none in any form, then return empty vt = vt.strip() #strip spaces return vt.split('\n') if vt != '' else [] #if empty string then return empty list, otherwise split by break line def parse_boolean(boolean_Y_N): ''' Return True if 'y' or 'Y' is found. Otherwise return False even if it fails ''' try: if isinstance(boolean_Y_N, float) and np.isnan(boolean_Y_N): return False elif boolean_Y_N.upper() == 'Y': return True else: return False except Exception as e: logging.warn('parse_boolean failed for {}.'.format(boolean_Y_N)) logging.warn('{} {}'.format(e.message, e.args)) return False # birth date def parse_column_date_of_birth(date): try: return datetime.datetime.strptime(date, '%d.%m.%Y %H:%M').strftime('%Y-%m-%d') except ValueError: try: return datetime.datetime.strptime(date, '%d.%m.%Y').strftime('%Y-%m-%d') except: return None # gender gender_table = {'m': SEX_CHOICES_MALE, 'f': SEX_CHOICES_FEMALE} def parse_column_gender(gender): try: return gender_table[gender.lower()] except: return None # SS number def parse_column_ss_number(ss): ss = ss.replace(' ', '') if not ss.isdigit(): return None if len(ss) == 11: logging.debug('Old SS number: |{}|'.format(ss)) eleventh_digit = VerhoeffAlgorithm.calculate_verhoeff_check_sum(ss[ :11]) twelfth_digit = LuhnAlgorithm.calculate_luhn_checksum(ss[:11]) ss = ss + eleventh_digit + twelfth_digit # we revalidate the old ss too after we append the digits if len(ss) == 13: if not is_valid_social_security_number(ss): logging.debug('Invalid SS number: |{}|'.format(ss)) else: logging.debug('Invalid SS number: (Length not valid) |{}|'.format(ss)) return ss def parse_role(role_name): d = { 'NURSE': ROLE_CHOICES_NURSE, 'PSYCHOLOGIST': ROLE_CHOICES_PSYCHOLOGIST, 'SECRETARY': ROLE_CHOICES_SECRETARY, 'PROJECT MANAGER': ROLE_CHOICES_PROJECT_MANAGER, 'DOCTOR': ROLE_CHOICES_DOCTOR, 'HEALTH_PARTNER': ROLE_CHOICES_HEALTH_PARTNER, 'VOUCHER_PARTNER': ROLE_CHOICES_VOUCHER_PARTNER } return d[role_name] #crash if no valid role_name is given # Language language_table = { 'L': 'Luxembourgish', 'LT': 'Lithuanian', 'IT': 'Italian', 'I': 'Italian', 'NL': 'Dutch', 'F': 'French', 'D': 'German', 'G': 'German', 'E': 'English', 'P': 'Portuguese', 'A': 'Arabic', 'SP': 'Spanish', 'S': 'Spanish', 'FIN': 'Finnish' } locale_table = { 'Luxembourgish': ('lb_LU', 'LU'), 'Lithuanian': ('lt_LT', 'LT'), 'Italian': ('it_IT', 'IT'), 'French': ('fr_FR', 'FR'), 'German': ('de_DE', 'DE'), 'English': ('en_GB', 'GB'), 'Portuguese': ('pt_PT', 'PT'), 'Dutch': ('nl_BE', 'NL'), 'Arabic': ('ar_sa', None), 'Spanish': ('es_ES', 'ES') , 'Finnish': ('fi_FI', 'FI') } language_translation_table = { # deletions ord(')'): None, ord('('): None, ord(' '): None, # replacements ord(','): ';' } def apply_column_prefered_language(languages): return apply_column_languages(languages)[:1] def apply_column_languages(languages): languages = languages.strip() if type(languages) != float and len(languages) > 0: # replacements and transformations languages = str(languages).upper().strip().translate( language_translation_table) new_list = [] for language in languages.split(';'): if len(language) == 0: continue if language in language_table: language = language_table[language] new_list.append(language) return np.array(new_list) else: logging.debug( 'Parse Languages: Empty, NaN, or invalid Languages: |{}|'.format(languages)) return np.array([]) # Country country_table = { 'LUX': 'Luxembourg', 'DE' : 'Deutschland', 'FRA': 'France' } def apply_column_country(country): try: return country_table[country] except: if country in list(country_table.values()): return country logging.warn('Invalid Country: {}'.format(country)) return country ''' Instead of using the converters parameter from read_excel method, we opt for make the transformations later since the read_excel method does not allow converters that return a list. ''' converters = { 'DATE OF BIRTH': parse_column_date_of_birth, 'GENDER': parse_column_gender, 'SS NUMBER': parse_column_ss_number, 'COUNTRY': apply_column_country, 'LANGUAGES': apply_column_languages, 'PREFERED WRITEN LANGUAGE': apply_column_prefered_language, 'DECEASED': parse_boolean, 'POSTPONED': parse_boolean, 'RESIGNED': parse_boolean, 'EXCLUDED': parse_boolean, 'PDP 1.0': parse_boolean, 'FLYING TEAM (FT)': parse_boolean, 'VOUCHER ACTIVITY': parse_voucher_type, 'VOUCHER REFERENCE': parse_voucher_reference } # add voucher for subject voucher_partners = {} voucher_partners['ZIT'] = 'Zitha' def add_subject_vouchers(voucher_reference, referral, voucher_types, subject_nd_number): nd_number, date, voucher_partner, voucher_type, num = voucher_reference.split('-') nd_number = nd_number.upper().replace('ND', 'PDP') issue_date = datetime.datetime.strptime(date, '%Y%m%d') expiry_date = issue_date + relativedelta(months=+6) usage_partner, created = Worker.objects.update_or_create( name=voucher_partners.get(voucher_partner, voucher_partner), last_name=voucher_partners.get(voucher_partner, voucher_partner), voucher_partner_code=voucher_partner[0].upper()) usage_partner.roles.update(name=ROLE_CHOICES_VOUCHER_PARTNER) # create workerStudyRole workerStudyRole, _ = WorkerStudyRole.objects.update_or_create(worker=usage_partner, study_id=GLOBAL_STUDY_ID, name=ROLE_CHOICES_VOUCHER_PARTNER) usage_partner.voucher_types.set(list(voucher_types.values())) usage_partner.save() if created: logging.warn('New Voucher Partner created: {}'.format(voucher_partner)) vt = VoucherType.objects.get(code=voucher_type) if nd_number != subject_nd_number: logging.warn('voucher reference nd_number is not the same! {} != {}'.format(nd_number, subject_nd_number)) study_subject = StudySubject.objects.get(nd_number=subject_nd_number) voucher, created = Voucher.objects.update_or_create(number=voucher_reference, issue_date=issue_date, expiry_date=expiry_date, voucher_type=vt, study_subject=study_subject, status=VOUCHER_STATUS_IN_USE, usage_partner=usage_partner, issue_worker=referral) logging.warn('New Voucher added: {}'.format(voucher_reference)) return voucher # create voucher types voucher_type_codes = set() def create_voucher_code(description): code = re.sub(r'\([^)]*\)', '', description.strip()) code = re.sub(r'[aeiou]+', '', code.lower()).strip() code = ''.join([e.capitalize() for e in code.split(' ')[:2]]) while code in voucher_type_codes: if re.match(r'.*_\d+$', code) is None: code = code + '_2' else: spt = code.split('_') num = int(spt[-1])+1 code = ''.join(spt[:len(spt)]) + str(num) return code def create_voucher_types(voucher_types_dict, study): voucher_types = {} for name, code in list(voucher_types_dict.items()): voucher_type, _ = VoucherType.objects.update_or_create(code=code, description=name, study=study) voucher_types[name] = voucher_type return voucher_types # create appointment types def create_appointment_types(assessments): appointmentTypes = [] for name, duration in list(assessments.items()): code = list(filter(str.isupper, name)) appointmentType, _ = AppointmentType.objects.update_or_create( code=code, default_duration=duration, description=name) appointmentType.save() appointmentTypes.append(appointmentType) return appointmentTypes def create_languages(languages_cell): languages = [] if not os.path.isdir(MEDIA_ROOT): os.makedirs(MEDIA_ROOT) for language in languages_cell: logging.warn('iteration {} {}'.format(language, languages_cell)) lang, created = Language.objects.get_or_create( name=language, locale=locale_table.get(language,(None, None))[0]) if created: logging.warn('New Language added: {}'.format(language)) lang.save() if language in language_flags: src = os.path.join(language_base_dir, language_flags[language]) basename = os.path.basename(src) dst = os.path.join(MEDIA_ROOT, basename) logging.warn('Copying file {} to {}'.format(src, dst)) copyfile(src, dst) # .save(basename, File(open(dst, 'rb'))) #SimpleUploadedFile(name=path, content=open(path, 'rb').read(), content_type='image/png') lang.image = basename lang.save() languages.append(lang) return languages def parse_row(index, row, visit_columns, appointmentTypes, voucher_types, lcsb_worker): # Languages if len(row['LANGUAGES']) == 0 and len(row['PREFERED WRITEN LANGUAGE']) == 0: logging.warn('No Languages available for row {} {}'.format(index, row['FIRST NAME']+row['LAST NAME'])) elif len(row['LANGUAGES']) == 0 and len(row['PREFERED WRITEN LANGUAGE']) > 0: row['LANGUAGES'] = row['PREFERED WRITEN LANGUAGE'] elif len(row['LANGUAGES']) > 0 and len(row['PREFERED WRITEN LANGUAGE']) == 0: row['PREFERED WRITEN LANGUAGE'] = row['LANGUAGES'] languages = create_languages(row['LANGUAGES']) pref_lang = create_languages(row['PREFERED WRITEN LANGUAGE'][:1]) if len(pref_lang) > 0: pref_lang = pref_lang[0] elif len(languages) > 0: pref_lang = languages[0] # Country country = row['COUNTRY'] country, created = Country.objects.get_or_create(name=country) if created: logging.warn('New Country added: {}'.format(row['COUNTRY'])) country.save() # Location and Flying Team # If no FT, then default location is CHL ft = None location = None prefix = None if not row['FLYING TEAM (FT)']: prefix = DEFAULT_LOCATION_PREFIX location, created = Location.objects.get_or_create( name=DEFAULT_LOCATION, prefix=prefix) if created: logging.warn('New location added: {}'.format(DEFAULT_LOCATION)) location.save() else: prefix = 'F' location, created = Location.objects.get_or_create( name='Flying Team', prefix=prefix) if created: logging.warn('New location added: Flying Team') location.save() # Create Flying Team ft, created = FlyingTeam.objects.get_or_create( place=row['LOCATION OF FT']) if created: logging.warn('New Flying Team added: {}'.format( row['LOCATION OF FT'])) ft.save() # Health Partner # create health partner (Referral) health_partner = None if row['REFERRAL'].strip() != '': logging.warn('Trying to get or create Worker: {}'.format(row['REFERRAL'])) health_partner, created = Worker.objects.get_or_create(name=row['REFERRAL']) health_partner.roles.update(name=ROLE_CHOICES_HEALTH_PARTNER) # create workerStudyRole workerStudyRole, _ = WorkerStudyRole.objects.update_or_create( worker=health_partner, study_id=GLOBAL_STUDY_ID, name=ROLE_CHOICES_HEALTH_PARTNER) health_partner.save() if created: logging.warn('New Health Partner added: {}'.format(row['REFERRAL'])) if row['SS NUMBER'] is None: row['SS NUMBER'] = '' subject, created = Subject.objects.get_or_create(social_security_number=row['SS NUMBER'], first_name=row['FIRST NAME'], last_name=row['LAST NAME'], defaults={ 'social_security_number': row['SS NUMBER'], 'first_name': row['FIRST NAME'], 'last_name': row['LAST NAME'], 'sex': row['GENDER'], 'phone_number': row['PHONE NUMBER 1'], 'phone_number_2': row['PHONE NUMBER 2'], 'email': row['E-MAIL'], 'date_born': row['DATE OF BIRTH'], 'address': row['ADDRESS'], 'postal_code': row['POSTAL CODE'], 'city': row['CITY'], 'country': country, 'dead': row['DECEASED'], 'default_written_communication_language': pref_lang }) subject.languages.set(languages) subject.save() if created: logging.warn('New Subject added with SS number: {}'.format(row['SS NUMBER'])) # StudySubject study = Study.objects.filter(id=GLOBAL_STUDY_ID)[0] nd_number = row['ND NUMBER'].upper().replace('ND', 'PDP') studySubject, created = StudySubject.objects.get_or_create(subject=subject, nd_number=nd_number, defaults={ 'subject': subject, 'study': study, 'postponed': row['POSTPONED'], 'nd_number': nd_number, 'resigned': row['RESIGNED'], 'resign_reason': row['REASON'], 'type': SUBJECT_TYPE_CHOICES_PATIENT, 'excluded': row['EXCLUDED'], 'exclude_reason': row['REASON.1'], 'comments': row['COMMENT'], 'default_location': location, 'flying_team': ft, 'screening_number': get_new_screening_number(prefix), 'date_added': parse_column_date_of_birth(row['DATE ADDED (V1)']) }) #all study subjects can have all voucher types studySubject.voucher_types.set(list(voucher_types.values())) studySubject.save() if created: logging.warn('New StudySubject added with ND number: {}'.format(nd_number)) #VOUCHERS voucher_references = row['VOUCHER REFERENCE'] for voucher_reference in voucher_references: voucher = add_subject_vouchers(voucher_reference, health_partner, voucher_types, nd_number) # Visits # Consider all visits as part of the same visit with multiple appointments appointments = [] appointment = None ''' map(date_regex.findall gets all the dates in the strings ignoring comments such as Tel sum(Ans, []) flattens the resulting list from the map since each findall returns a list map to convert string to datetime ''' def parse_visit_date(date_string): try: dtime = datetime.datetime.strptime(date_string, '%d.%m.%Y %H:%M') except ValueError: dtime = datetime.datetime.strptime(date_string, '%d.%m.%Y') return dtime visit_dates = [parse_visit_date(x) for x in sum(list(map(date_regex.findall, row[visit_columns].values)), [])] # get first and last elements of the sorted element datetime_begin, datetime_end = itemgetter(*[0, -1])(sorted(visit_dates)) datetime_begin = datetime_begin.strftime('%Y-%m-%d') datetime_end = datetime_end.strftime('%Y-%m-%d') visit, created = Visit.objects.get_or_create( subject=studySubject, datetime_begin=datetime_begin, datetime_end=datetime_end, defaults={ 'is_finished': True}) if created: logging.warn('New Visit added for ND number {} starting on {}'.format( nd_number, datetime_begin)) appointment_types = appointmentTypes[:len(set(visit_dates))] #in this case appointment types are incremental visit.appointment_types.set(appointment_types) visit.save() ''' If there are two Vx with the same date we put together the appointment types in the same appointment ''' starting_hour = 9 for visit_date in set(visit_dates): if visit_date.hour == 0: datetime_when = visit_date.replace(hour=starting_hour, minute=0, second=0, microsecond=0) starting_hour+=1 # get the indices of each occurrence of the date and use them to get # the appointment types appointment_types = itembetter( indexof(visit_date, visit_dates), appointmentTypes) # creatre appointment appointment, _ = Appointment.objects.update_or_create( visit=visit, length=sum([a.default_duration for a in appointment_types]), flying_team=ft, location=location, status=Appointment.APPOINTMENT_STATUS_FINISHED, datetime_when=datetime_when, worker_assigned=lcsb_worker) date_when = visit_date.replace( hour=9, minute=0, second=0, microsecond=0) for appointment_type in appointment_types: app_type_link = AppointmentTypeLink( appointment=appointment, date_when=date_when, appointment_type=appointment_type, worker=lcsb_worker) date_when += datetime.timedelta( minutes=appointment_type.default_duration) app_type_link.save() def createWorker(password, email='', username='admin', first_name='LCSB', last_name='Admin', specialization='Technician', unit='LCSB', languages=[], role_name=ROLE_CHOICES_TECHNICIAN, name='', comment=None, is_superuser=True, is_admin=True): user = None if username is not None: # create user defaults = {'email': email, 'password': password} user, _ = User.objects.update_or_create(username=username, defaults=defaults) user.set_password(password) user.is_superuser = is_superuser user.is_staff = True user.is_admin = is_admin user.save() logging.info('SuperUser and Worker {} created'.format(username)) # create worker defaults = {'first_name': first_name, 'last_name': last_name, 'email': email, 'unit': unit, 'specialization': specialization , 'user': user, 'comment': comment} worker_args = {} if first_name is not None: worker_args['first_name'] = first_name if last_name is not None: worker_args['last_name'] = last_name if name is not None: worker_args['name'] = name worker_args['defaults'] = defaults worker, created = Worker.objects.update_or_create(**worker_args) if created: logging.warn('Added worker. Name: {} first_name: {} last_name: {}'.format(name, first_name, last_name)) locations = Location.objects.all() worker.locations.set(locations) if len(languages) == 0: languages = Language.objects.all() worker.languages.set(languages) worker.save() # create workerStudyRole workerStudyRole, _ = WorkerStudyRole.objects.update_or_create(worker=worker, study_id=GLOBAL_STUDY_ID, name=role_name) workerStudyRole.save() return worker if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) if len(sys.argv) < 2: logging.warn('Please, execute the program as: python {} file_path.xlsx'.format(sys.argv[0])) sys.exit(1) file = sys.argv[1] if not os.path.isfile(file): logging.warn('Please, execute the program with a valid file path.') sys.exit(1) admin_password = None if len(sys.argv) >= 3: admin_password = sys.argv[2] #create worker and super user if admin_password is None: pass1 = '' pass2 = None while pass1 != pass2: pass1 = getpass.getpass('Please type a password for the Admin user: ') pass2 = getpass.getpass('Please type your password again: ') if pass1 != pass2: print('Password mismatch, please try again') else: pass1 = admin_password lcsb_worker = createWorker(pass1) #STUDY study = Study.objects.filter(id=GLOBAL_STUDY_ID)[0] #enable vouchers study.columns.voucher_types = True study.columns.vouchers = True study.nd_number_study_subject_regex = r'^PDP\d{4}$' study.columns.save() study.save() #READ WORKERS SHEET df = pd.read_excel(file, dtype=object, sheet_name='Workers') #convert column name to upper case df.columns = [c.upper() for c in df.columns] #convert languages df['LANGUAGES'] = df['LANGUAGES'].apply(apply_column_languages) df['ROLE'] = df['ROLE'].apply(parse_role) for index, row in df.iterrows(): languages = create_languages(row['LANGUAGES']) createWorker(row['PASSWORD'], email=row['MAIL'], username=row['USERNAME'], first_name=row['FIRST NAME'], last_name=row['LAST NAME'], specialization=row['ROLE'], role_name=row['ROLE'], unit='CHL', languages=languages, is_superuser=False, is_admin=False) #READ VOUCHER PARTNER SHEET df = pd.read_excel(file, dtype=object, sheet_name='Voucher Partners') #convert column name to upper case df.columns = [c.upper() for c in df.columns] logging.warn('COLUMNS IN VOUCHER_PARTNER: {}'.format(df.columns)) df['ROLE'] = df['ROLE'].apply(parse_role) logging.warn('Voucher Partners') for index, row in df.iterrows(): logging.warn('Voucher Partner: {}'.format(row['NAME'])) worker = createWorker(None, email='', username=None, first_name='', last_name='', specialization=row['ROLE'], role_name=row['ROLE'], unit='CHL', languages=[], name=row['NAME'], comment=row.get('COMMENT', '')) if type(row['OFFER']) != float: offers = row['OFFER'].split(';') voucher_types = [] for offer in offers: code = create_voucher_code(offer) voucher_type_codes.add(code) voucher_type, created = VoucherType.objects.update_or_create(description=offer.title(), study=study, code=code) if created: logging.warn('Voucher type created: {} ({})'.format(offer, code)) voucher_types.append(voucher_type) worker.voucher_types.set(voucher_types) worker.save() #READ FIRST SHEET df = pd.read_excel(file, dtype=object, sheet_name='Subjects') df = df.fillna('').astype(str) df.columns = [c.upper().strip() for c in df.columns] logging.warn('COLUMNS IN Subjects: {}'.format(df.columns)) # make transformations for column, function in list(converters.items()): logging.warn(column) df[column] = df[column].apply(function) # get visits columns regex = re.compile(r'\(V\d\)') # visit_columns = list(filter(regex.search, df.columns)) assessments = OrderedDict([('Cognitive Test', 180), ('Risk Factor', 120), ('Voucher Distribution', 120), ('Follow Up', 90)]) appointmentTypes = create_appointment_types(assessments) voucher_types_dict = OrderedDict([('Cognitive Activity', 'CA'), ('Neurofit', 'NF'), ('Mobilfit', 'MF'), ('Diet', 'D'), ('Consulte ORL', 'CORL'), ('Physical Activity', 'PA'), ('Individual Cognitive Training', 'IT'), ('Social', 'S'), ('Test', 'T')]) voucher_types = create_voucher_types(voucher_types_dict, study) # process each row for index, row in df.iterrows(): parse_row(index, row, visit_columns, appointmentTypes, voucher_types, lcsb_worker)