Newer
Older
import django
from django.conf import settings
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "smash.settings")
django.setup()
from smash.local_settings import MEDIA_ROOT
from shutil import copyfile
import pandas as pd
import numpy as np
import logging
import datetime

Carlos Vega
committed
from dateutil.relativedelta import relativedelta
import re
from operator import itemgetter
from collections import OrderedDict, defaultdict
import sys
from django.contrib.auth.models import User
from web.models.constants import VOUCHER_STATUS_IN_USE, SUBJECT_TYPE_CHOICES_PATIENT, GLOBAL_STUDY_ID, SEX_CHOICES, SEX_CHOICES_MALE, SEX_CHOICES_FEMALE
from web.algorithm import VerhoeffAlgorithm, LuhnAlgorithm
from web.utils import is_valid_social_security_number
from web.models import VoucherType, Voucher, Country, AppointmentTypeLink, AppointmentType, Study, Worker, Language, Subject, WorkerStudyRole, StudySubject, Location, FlyingTeam, Visit, Appointment, AppointmentType
from web.models.worker_study_role import ROLE_CHOICES_TECHNICIAN, WORKER_STAFF, ROLE_CHOICES_SECRETARY, ROLE_CHOICES_HEALTH_PARTNER, \
WORKER_HEALTH_PARTNER, ROLE_CHOICES_VOUCHER_PARTNER, WORKER_VOUCHER_PARTNER
DEFAULT_LOCATION = 'CHL'
DEFAULT_LOCATION_PREFIX = 'P'
language_base_dir = 'web/static/flags/'
language_flags = {
'French': 'FR.png',
'English': 'GB.png',
'Finnish': 'FI.png',
'Italian': 'IT.png',
'Spanish': 'ES.png',
'Portuguese': 'PT.png',
'Luxembourgish': 'LU.png',
'German': 'DE.png',
'Dutch': 'NL.png'
}
def get_new_screening_number(screening_number_prefix):
result_number = 0
subjects = StudySubject.objects.filter(screening_number__contains=screening_number_prefix)
for subject in subjects:
screening_numbers = subject.screening_number.split(";")
for screening_number in screening_numbers:
screening_number = screening_number.strip()
if screening_number.startswith(screening_number_prefix):
number = screening_number[len(screening_number_prefix)+1:]
try:
result_number = max(result_number, int(number))
except ValueError:
pass
return screening_number_prefix + '-' + str(result_number + 1).zfill(3)
def itembetter(items, lst):
if len(items) == 1:
return [itemgetter(*items)(lst)]
else:
return list(itemgetter(*items)(lst))
def indexof(element, l):
return [i for i, x in enumerate(l) if x == element]
- Language
- Country
- Location
- Flying Team
- Referals (Health Partner)
- A subject with the same SS number, first name and last name
- A studySubject with the same ND number, subject
# Columns to be transformed to a standard format
- Gender
- Language
- Prefered writen language
- Voucher activity (remove cells that include None in any form and split by breakline)
- Voucher reference (split)
Boolean
- Deceased
- Postponed
- Resigned
- Excluded
'''
'''
Column Converter Functions
'''
# converters
# Boolean:
# Deceased
# Postponed
# Resigned
# Excluded
# PDP 1.0
# Flying Team (FT)
def parse_voucher_reference(vr):
vr = vr.strip() #strip spaces
return vr.split('\n') if vr != u'' else [] #if empty string then return empty list, otherwise split by break line
def parse_voucher_type(vt):
vt = '' if 'NONE' in vt.upper() else vt #if vt includes none in any form, then return empty
vt = vt.strip() #strip spaces
return vt.split('\n') if vt != u'' else [] #if empty string then return empty list, otherwise split by break line
def parse_boolean(boolean_Y_N):
'''
Return True if 'y' or 'Y' is found.
Otherwise return False even if it fails
'''
try:
if isinstance(boolean_Y_N, float) and np.isnan(boolean_Y_N):
return False
elif boolean_Y_N.upper() == 'Y':
return True
else:
return False
except Exception as e:
logging.warn('parse_boolean failed for {}.'.format(boolean_Y_N))
logging.warn('{} {}'.format(e.message, e.args))
return False
def parse_column_date_of_birth(date):
return datetime.datetime.strptime(date, '%d.%m.%Y').strftime('%Y-%m-%d')
# gender
gender_table = {'m': SEX_CHOICES_MALE, 'f': SEX_CHOICES_FEMALE}
def parse_column_gender(gender):
try:
return gender_table[gender.lower()]
except:
return None
def parse_column_ss_number(ss):
ss = ss.replace(' ', '')
if not ss.isdigit():
return None
if len(ss) == 11:
logging.debug('Old SS number: |{}|'.format(ss))
eleventh_digit = VerhoeffAlgorithm.calculate_verhoeff_check_sum(ss[
:11])
twelfth_digit = LuhnAlgorithm.calculate_luhn_checksum(ss[:11])
ss = ss + eleventh_digit + twelfth_digit
# we revalidate the old ss too after we append the digits
if len(ss) == 13:
if not is_valid_social_security_number(ss):
logging.debug('Invalid SS number: |{}|'.format(ss))
else:
logging.debug('Invalid SS number: (Length not valid) |{}|'.format(ss))
return ss
language_table = {
'L': 'Luxembourgish',
'LT': 'Lithuanian',
'IT': 'Italian',
'F': 'French',
'D': 'German',
'E': 'English',
'P': 'Portuguese',
'A': 'Arabic',
locale_table = {
'Luxembourgish': ('lb_LU', 'LU'),
'Lithuanian': ('lt_LT', 'LT'),
'Italian': ('it_IT', 'IT'),
'French': ('fr_FR', 'FR'),
'German': ('de_DE', 'DE'),
'English': ('en_GB', 'GB'),
'Portuguese': ('pt_PT', 'PT'),
'Arabic': ('ar_sa', None),
'Spanish': ('es_ES', 'ES') ,
'Finnish': ('fi_FI', 'FI')
}
language_translation_table = {
# deletions
ord(u')'): None,
ord(u'('): None,
ord(u' '): None,
# replacements
ord(u','): u';'
}
def apply_column_prefered_language(languages):
return apply_column_languages(languages)[:1]
def apply_column_languages(languages):
languages = languages.strip()
if type(languages) != float and len(languages) > 0:
# replacements and transformations
languages = unicode(languages).upper().translate(
language_translation_table)
new_list = []
for language in languages.split(';'):
if language in language_table:
language = language_table[language]
new_list.append(language)
return np.array(new_list)
else:
logging.debug(
'Parse Languages: Empty, NaN, or invalid Languages: |{}|'.format(languages))
# Country
country_table = {
'LUX': 'Luxembourg'
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
def apply_column_country(country):
try:
return country_table[country]
except:
logging.warn('Invalid Country: {}'.format(country))
return country
'''
Instead of using the converters parameter from read_excel method,
we opt for make the transformations later since the read_excel method does not allow
converters that return a list.
'''
converters = {
'DATE OF BIRTH': parse_column_date_of_birth,
'GENDER': parse_column_gender,
'SS NUMBER': parse_column_ss_number,
'COUNTRY': apply_column_country,
'LANGUAGES': apply_column_languages,
'PREFERED WRITEN LANGUAGE': apply_column_prefered_language,
'DECEASED': parse_boolean,
'POSTPONED': parse_boolean,
'RESIGNED': parse_boolean,
'EXCLUDED': parse_boolean,
'PDP 1.0': parse_boolean,
'FLYING TEAM (FT)': parse_boolean,
'VOUCHER ACTIVITY': parse_voucher_type,
'VOUCHER REFERENCE': parse_voucher_reference
# add voucher for subject
voucher_partners = {}
voucher_partners['ZIT'] = 'Zitha'
def add_subject_vouchers(voucher_reference, referral, voucher_types):
nd_number, date, voucher_partner, voucher_type, num = voucher_reference.split('-')
nd_number = nd_number.upper().replace('ND', 'PDP')

Carlos Vega
committed
expiry_date = issue_date + relativedelta(months=+6)

Carlos Vega
committed
name=voucher_partners.get(voucher_partner, voucher_partner), last_name=voucher_partners.get(voucher_partner, voucher_partner), voucher_partner_code=voucher_partner[0].upper())
usage_partner.roles.update(role=ROLE_CHOICES_VOUCHER_PARTNER)
# create workerStudyRole
workerStudyRole, _ = WorkerStudyRole.objects.update_or_create(worker=usage_partner,
study_id=GLOBAL_STUDY_ID, role=ROLE_CHOICES_VOUCHER_PARTNER)
usage_partner.voucher_types.set(voucher_types.values())
usage_partner.save()
if created:
logging.warn('New Voucher Partner created: {}'.format(voucher_partner))
vt = VoucherType.objects.get(code=voucher_type)
study_subject = StudySubject.objects.get(nd_number=nd_number)
voucher, created = Voucher.objects.update_or_create(number=voucher_reference, issue_date=issue_date,
expiry_date=expiry_date, voucher_type=vt, study_subject=study_subject,
status=VOUCHER_STATUS_IN_USE, usage_partner=usage_partner, issue_worker=referral)
logging.warn('New Voucher added: {}'.format(voucher_reference))
return voucher
# create voucher types
def create_voucher_types(voucher_types_dict, study):
voucher_types = {}
for name, code in voucher_types_dict.items():
voucher_type, _ = VoucherType.objects.update_or_create(code=code, description=name, study=study)
voucher_types[name] = voucher_type
return voucher_types
# create appointment types
def create_appointment_types(assessments):
appointmentTypes = []
for name, duration in assessments.items():
code = filter(str.isupper, name)
appointmentType, _ = AppointmentType.objects.update_or_create(
code=code, default_duration=duration, description=name)
appointmentType.save()
appointmentTypes.append(appointmentType)
return appointmentTypes
def parse_row(index, row, visit_columns, appointmentTypes, voucher_types, lcsb_worker):
# Languages
if len(row['LANGUAGES']) == 0 and len(row['PREFERED WRITEN LANGUAGE']) == 0:
logging.warn('No Languages available')
elif len(row['LANGUAGES']) == 0 and len(row['PREFERED WRITEN LANGUAGE']) > 0:
row['LANGUAGES'] = row['PREFERED WRITEN LANGUAGE']
elif len(row['LANGUAGES']) > 0 and len(row['PREFERED WRITEN LANGUAGE']) == 0:
row['PREFERED WRITEN LANGUAGE'] = row['LANGUAGES']
languages = []
for language in row['LANGUAGES']:
lang, created = Language.objects.get_or_create(
name=language, locale=locale_table.get(language,(None, None))[0])
languages.append(lang)
if created:
logging.warn('New Language added: {}'.format(language))
if language in language_flags:
src = os.path.join(language_base_dir,
language_flags[language])
basename = os.path.basename(src)
dst = os.path.join(MEDIA_ROOT, basename)
copyfile(src, dst)
# .save(basename, File(open(dst, 'rb'))) #SimpleUploadedFile(name=path, content=open(path, 'rb').read(), content_type='image/png')
lang.image = basename
lang.save()
pref_lang, created = Language.objects.get_or_create(name=language
,locale=locale_table.get(language,(None, None))[0])
if created:
logging.warn(
'New Language (from Prefered) added: {}'.format(language))
pref_lang.save()
# Country
country = row['COUNTRY']
country, created = Country.objects.get_or_create(name=country)
if created:
logging.warn('New Country added: {}'.format(row['COUNTRY']))
country.save()
# Location and Flying Team
prefix = None
prefix = DEFAULT_LOCATION_PREFIX
name=DEFAULT_LOCATION, prefix=prefix)
if created:
logging.warn('New location added: {}'.format(DEFAULT_LOCATION))
location.save()
else:
prefix = 'F'
name='Flying Team', prefix=prefix)
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
if created:
logging.warn('New location added: Flying Team')
location.save()
# Create Flying Team
ft, created = FlyingTeam.objects.get_or_create(
place=row['LOCATION OF FT'])
if created:
logging.warn('New Flying Team added: {}'.format(
row['LOCATION OF FT']))
ft.save()
# Health Partner
# create health partner (Referral)
health_partner, created = Worker.objects.get_or_create(name=row['REFERRAL'])
health_partner.roles.update(role=ROLE_CHOICES_HEALTH_PARTNER)
# create workerStudyRole
workerStudyRole, _ = WorkerStudyRole.objects.update_or_create(
worker=health_partner, study_id=GLOBAL_STUDY_ID, role=ROLE_CHOICES_HEALTH_PARTNER)
health_partner.save()
if created:
logging.warn('New Health Partner added: {}'.format(row['REFERRAL']))
subject, created = Subject.objects.get_or_create(social_security_number=row['SS NUMBER'],
first_name=row['FIRST NAME'],
last_name=row['LAST NAME'],
defaults={
'social_security_number': row['SS NUMBER'],
'first_name': row['FIRST NAME'],
'last_name': row['LAST NAME'],
'sex': row['GENDER'],
'phone_number': row['PHONE NUMBER 1'],
'phone_number_2': row['PHONE NUMBER 2'],
'email': row['E-MAIL'],
'date_born': row['DATE OF BIRTH'],
'address': row['ADDRESS'],
'postal_code': row['POSTAL CODE'],
'city': row['CITY'],
'country': country,
'dead': row['DECEASED'],
'default_written_communication_language': pref_lang
})
subject.languages.set(languages)
subject.save()
if created:
logging.warn('New Subject added with SS number: {}'.format(row['SS NUMBER']))
# StudySubject
study = Study.objects.filter(id=GLOBAL_STUDY_ID)[0]
nd_number = row['ND NUMBER'].upper().replace('ND', 'PDP')
studySubject, created = StudySubject.objects.get_or_create(subject=subject, nd_number=nd_number,
defaults={
'subject': subject,
'study': study,
'postponed': row['POSTPONED'],
'nd_number': nd_number,
'resigned': row['RESIGNED'],
'resign_reason': row['REASON'],
'type': SUBJECT_TYPE_CHOICES_PATIENT,
'excluded': row['EXCLUDED'],
'exclude_reason': row['REASON.1'],
'previously_in_study': row['PDP 1.0'],
'comments': row['COMMENT'],
'default_location': location,
'flying_team': ft,
'screening_number': get_new_screening_number(prefix),
'date_added': parse_column_date_of_birth(row['DATE ADDED (V1)'])
})
#all study subjects can have all voucher types
studySubject.voucher_types.set(voucher_types.values())
studySubject.save()
if created:
logging.warn('New StudySubject added with ND number: {}'.format(nd_number))
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
#VOUCHERS
voucher_references = row['VOUCHER REFERENCE']
for voucher_reference in voucher_references:
voucher = add_subject_vouchers(voucher_reference, health_partner, voucher_types)
# Visits
# Consider all visits as part of the same visit with multiple appointments
appointments = []
appointment = None
'''
map(date_regex.findall gets all the dates in the strings ignoring comments such as Tel
sum(Ans, []) flattens the resulting list from the map since each findall returns a list
map to convert string to datetime
'''
visit_dates = map(lambda x: datetime.datetime.strptime(
x, '%d.%m.%Y'), sum(map(date_regex.findall, row[visit_columns].values), []))
# get first and last elements of the sorted element
datetime_begin, datetime_end = itemgetter(*[0, -1])(sorted(visit_dates))
datetime_begin = datetime_begin.strftime('%Y-%m-%d')
datetime_end = datetime_end.strftime('%Y-%m-%d')
visit, created = Visit.objects.get_or_create(
subject=studySubject, datetime_begin=datetime_begin, datetime_end=datetime_end, defaults={
'is_finished': True})
if created:
logging.warn('New Visit added for ND number {} starting on {}'.format(
nd_number, datetime_begin))
appointment_types = appointmentTypes[:len(set(visit_dates))] #in this case appointment types are incremental
visit.appointment_types.set(appointment_types)
visit.save()
'''
If there are two Vx with the same date we put together the appointment types in the same appointment
'''
datetime_when = visit_date.replace(hour=starting_hour, minute=0, second=0, microsecond=0)
starting_hour+=1
# get the indices of each occurrence of the date and use them to get
# the appointment types
appointment_types = itembetter(
indexof(visit_date, visit_dates), appointmentTypes)
# creatre appointment
appointment, _ = Appointment.objects.update_or_create(
visit=visit, length=sum([a.default_duration for a in appointment_types]),

Carlos Vega
committed
status=Appointment.APPOINTMENT_STATUS_FINISHED, datetime_when=datetime_when,
worker_assigned=lcsb_worker)
date_when = visit_date.replace(
hour=9, minute=0, second=0, microsecond=0)
for appointment_type in appointment_types:
app_type_link = AppointmentTypeLink(
appointment=appointment, date_when=date_when,
appointment_type=appointment_type, worker=lcsb_worker)
date_when += datetime.timedelta(
minutes=appointment_type.default_duration)
app_type_link.save()
def createWorker(password, email='', username='admin', first_name='LCSB', last_name='Admin',
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
specialization='Technician', unit='LCSB'):
# create user
defaults = {'email': email, 'password': password}
user, _ = User.objects.update_or_create(username=username, defaults=defaults)
user.set_password(password)
user.is_superuser = True
user.is_staff = True
user.is_admin = True
user.save()
# create worker
defaults = {'first_name': first_name, 'last_name': last_name,
'email': email, 'unit': unit, 'specialization': specialization
, 'user': user}
worker, _ = Worker.objects.update_or_create(first_name=first_name,
last_name=last_name, defaults=defaults)
locations = Location.objects.all()
worker.locations.set(locations)
languages = Language.objects.all()
worker.languages.set(languages)
worker.save()
# create workerStudyRole
workerStudyRole, _ = WorkerStudyRole.objects.update_or_create(worker=worker,
study_id=GLOBAL_STUDY_ID, role=ROLE_CHOICES_TECHNICIAN)
workerStudyRole.save()
logging.info('SuperUser and Worker {} created'.format(username))
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
if len(sys.argv) < 2:
logging.warn('Please, execute the program as: python {} file_path.xlsx'.format(sys.argv[0]))
sys.exit(1)
file = sys.argv[1]
if not os.path.isfile(file):
logging.warn('Please, execute the program with a valid file path.')
sys.exit(1)
#create worker and super user
pass1 = ''
pass2 = None
while pass1 != pass2:
pass1 = getpass.getpass('Please type a password for the Admin user: ')
pass2 = getpass.getpass('Please type your password again: ')
if pass1 != pass2:
print 'Password mismatch, please try again'
lcsb_worker = createWorker(pass1)
df = pd.read_excel(file, dtype=object)
df = df.fillna('').astype(unicode)
df.columns = [c.upper() for c in df.columns]
# make transformations
for column, function in converters.items():
logging.warn(column)
df[column] = df[column].apply(function)
# get visits columns
regex = re.compile(r'\(V\d\)')
study = Study.objects.filter(id=GLOBAL_STUDY_ID)[0]
#enable vouchers
study.columns.voucher_types = True
study.columns.vouchers = True
study.nd_number_study_subject_regex = r'^PDP\d{4}$'
study.columns.save()
study.save()
#
visit_columns = filter(regex.search, df.columns)
assessments = OrderedDict([('Cognitive Test', 180), ('Risk Factor', 120),
('Voucher Distribution', 120), ('Follow Up', 90)])
appointmentTypes = create_appointment_types(assessments)
voucher_types_dict = OrderedDict([('Cognitive Activity', 'CA'), ('Neurofit', 'NF'), ('Mobilfit', 'MF'), ('Diet', 'D'),
('Consulte ORL', 'CORL'), ('Physical Activity', 'PA'), ('Individual Cognitive Training', 'IT'), ('Social', 'S'), ('Test', 'T')])
voucher_types = create_voucher_types(voucher_types_dict, study)
# process each row
for index, row in df.iterrows():
parse_row(index, row, visit_columns, appointmentTypes, voucher_types, lcsb_worker)