Newer
Older
import django
from django.conf import settings
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "smash.settings")
django.setup()
import pandas as pd
import numpy as np
import logging
import datetime

Carlos Vega
committed
from dateutil.relativedelta import relativedelta
import re
from operator import itemgetter
from collections import OrderedDict, defaultdict
import sys
from django.contrib.auth.models import User
from web.models.constants import VOUCHER_STATUS_IN_USE, SUBJECT_TYPE_CHOICES_PATIENT, GLOBAL_STUDY_ID, SEX_CHOICES, SEX_CHOICES_MALE, SEX_CHOICES_FEMALE
from web.algorithm import VerhoeffAlgorithm, LuhnAlgorithm
from web.utils import is_valid_social_security_number
from web.models import VoucherType, Voucher, Country, AppointmentTypeLink, AppointmentType, Study, Worker, Language, Subject, WorkerStudyRole, StudySubject, Location, FlyingTeam, Visit, Appointment, AppointmentType
from web.models.worker_study_role import ROLE_CHOICES_TECHNICIAN, WORKER_STAFF, ROLE_CHOICES_SECRETARY, ROLE_CHOICES_HEALTH_PARTNER, \
WORKER_HEALTH_PARTNER, ROLE_CHOICES_VOUCHER_PARTNER, WORKER_VOUCHER_PARTNER
DEFAULT_LOCATION = 'CHL'
DEFAULT_LOCATION_PREFIX = 'P'
def get_new_screening_number(screening_number_prefix):
result_number = 0
subjects = StudySubject.objects.filter(screening_number__contains=screening_number_prefix)
for subject in subjects:
screening_numbers = subject.screening_number.split(";")
for screening_number in screening_numbers:
screening_number = screening_number.strip()
if screening_number.startswith(screening_number_prefix):
number = screening_number[len(screening_number_prefix)+1:]
try:
result_number = max(result_number, int(number))
except ValueError:
pass
return screening_number_prefix + '-' + str(result_number + 1).zfill(3)
def itembetter(items, lst):
if len(items) == 1:
return [itemgetter(*items)(lst)]
else:
return list(itemgetter(*items)(lst))
def indexof(element, l):
return [i for i, x in enumerate(l) if x == element]
- Language
- Country
- Location
- Flying Team
- Referals (Health Partner)
- A subject with the same SS number, first name and last name
- A studySubject with the same ND number, subject
# Columns to be transformed to a standard format
- Gender
- Language
- Prefered writen language
- Voucher activity (remove cells that include None in any form and split by breakline)
- Voucher reference (split)
Boolean
- Deceased
- Postponed
- Resigned
- Excluded
'''
'''
Column Converter Functions
'''
# converters
# Boolean:
# Deceased
# Postponed
# Resigned
# Excluded
# PDP 1.0
# Flying Team (FT)
def parse_voucher_reference(vr):
vr = vr.strip() #strip spaces
return vr.split('\n') if vr != u'' else [] #if empty string then return empty list, otherwise split by break line
def parse_voucher_type(vt):
vt = '' if 'NONE' in vt.upper() else vt #if vt includes none in any form, then return empty
vt = vt.strip() #strip spaces
return vt.split('\n') if vt != u'' else [] #if empty string then return empty list, otherwise split by break line
def parse_boolean(boolean_Y_N):
'''
Return True if 'y' or 'Y' is found.
Otherwise return False even if it fails
'''
try:
if isinstance(boolean_Y_N, float) and np.isnan(boolean_Y_N):
return False
elif boolean_Y_N.upper() == 'Y':
return True
else:
return False
except Exception as e:
logging.warn('parse_boolean failed for {}.'.format(boolean_Y_N))
logging.warn('{} {}'.format(e.message, e.args))
return False
def parse_column_date_of_birth(date):
return datetime.datetime.strptime(date, '%d.%m.%Y').strftime('%Y-%m-%d')
# gender
gender_table = {'m': SEX_CHOICES_MALE, 'f': SEX_CHOICES_FEMALE}
def parse_column_gender(gender):
try:
return gender_table[gender.lower()]
except:
return None
def parse_column_ss_number(ss):
ss = ss.replace(' ', '')
if not ss.isdigit():
return None
if len(ss) == 11:
logging.debug('Old SS number: |{}|'.format(ss))
eleventh_digit = VerhoeffAlgorithm.calculate_verhoeff_check_sum(ss[
:11])
twelfth_digit = LuhnAlgorithm.calculate_luhn_checksum(ss[:11])
ss = ss + eleventh_digit + twelfth_digit
# we revalidate the old ss too after we append the digits
if len(ss) == 13:
if not is_valid_social_security_number(ss):
logging.debug('Invalid SS number: |{}|'.format(ss))
else:
logging.debug('Invalid SS number: (Length not valid) |{}|'.format(ss))
return ss
language_table = {
'L': 'Luxembourgish',
'LT': 'Lithuanian',
'IT': 'Italian',
'F': 'French',
'D': 'German',
'E': 'English',
'P': 'Portuguese',
'A': 'Arabic',
locale_table = {
'Luxembourgish': ('lb_LU', 'LU'),
'Lithuanian': ('lt_LT', 'LT'),
'Italian': ('it_IT', 'IT'),
'French': ('fr_FR', 'FR'),
'German': ('de_DE', 'DE'),
'English': ('en_GB', 'GB'),
'Portuguese': ('pt_PT', 'PT'),
'Arabic': ('ar_sa', None),
'Spanish': ('es_ES', 'ES') ,
'Finnish': ('fi_FI', 'FI')
}
language_translation_table = {
# deletions
ord(u')'): None,
ord(u'('): None,
ord(u' '): None,
# replacements
ord(u','): u';'
}
def apply_column_prefered_language(languages):
return apply_column_languages(languages)[:1]
def apply_column_languages(languages):
languages = languages.strip()
if type(languages) != float and len(languages) > 0:
# replacements and transformations
languages = unicode(languages).upper().translate(
language_translation_table)
new_list = []
for language in languages.split(';'):
if language in language_table:
language = language_table[language]
new_list.append(language)
return np.array(new_list)
else:
logging.debug(
'Parse Languages: Empty, NaN, or invalid Languages: |{}|'.format(languages))
# Country
country_table = {
'LUX': 'Luxembourg'
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
def apply_column_country(country):
try:
return country_table[country]
except:
logging.warn('Invalid Country: {}'.format(country))
return country
'''
Instead of using the converters parameter from read_excel method,
we opt for make the transformations later since the read_excel method does not allow
converters that return a list.
'''
converters = {
'DATE OF BIRTH': parse_column_date_of_birth,
'GENDER': parse_column_gender,
'SS NUMBER': parse_column_ss_number,
'COUNTRY': apply_column_country,
'LANGUAGES': apply_column_languages,
'PREFERED WRITEN LANGUAGE': apply_column_prefered_language,
'DECEASED': parse_boolean,
'POSTPONED': parse_boolean,
'RESIGNED': parse_boolean,
'EXCLUDED': parse_boolean,
'PDP 1.0': parse_boolean,
'FLYING TEAM (FT)': parse_boolean,
'VOUCHER ACTIVITY': parse_voucher_type,
'VOUCHER REFERENCE': parse_voucher_reference
# add voucher for subject
voucher_partners = {}
voucher_partners['ZIT'] = 'Zitha'
def add_subject_vouchers(voucher_reference, referral, voucher_types):
nd_number, date, voucher_partner, voucher_type, num = voucher_reference.split('-')
nd_number = nd_number.upper().replace('ND', 'PDP')

Carlos Vega
committed
expiry_date = issue_date + relativedelta(months=+6)

Carlos Vega
committed
name=voucher_partners.get(voucher_partner, voucher_partner), last_name=voucher_partners.get(voucher_partner, voucher_partner), voucher_partner_code=voucher_partner[0].upper())
usage_partner.roles.update(role=ROLE_CHOICES_VOUCHER_PARTNER)
# create workerStudyRole
workerStudyRole, _ = WorkerStudyRole.objects.update_or_create(worker=usage_partner,
study_id=GLOBAL_STUDY_ID, role=ROLE_CHOICES_VOUCHER_PARTNER)
usage_partner.voucher_types.set(voucher_types.values())
usage_partner.save()
if created:
logging.warn('New Voucher Partner created: {}'.format(voucher_partner))
vt = VoucherType.objects.get(code=voucher_type)
study_subject = StudySubject.objects.get(nd_number=nd_number)
voucher, created = Voucher.objects.update_or_create(number=voucher_reference, issue_date=issue_date,
expiry_date=expiry_date, voucher_type=vt, study_subject=study_subject,
status=VOUCHER_STATUS_IN_USE, usage_partner=usage_partner, issue_worker=referral)
logging.warn('New Voucher added: {}'.format(voucher_reference))
return voucher
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
# create voucher types
def create_voucher_types(voucher_types_dict, study):
voucher_types = {}
for name, code in voucher_types_dict.items():
voucher_type, _ = VoucherType.objects.update_or_create(code=code, description=name, study=study)
voucher_types[name] = voucher_type
return voucher_types
# create appointment types
def create_appointment_types(assessments):
appointmentTypes = []
for name, duration in assessments.items():
code = filter(str.isupper, name)
appointmentType, _ = AppointmentType.objects.update_or_create(
code=code, default_duration=duration, description=name)
appointmentType.save()
appointmentTypes.append(appointmentType)
return appointmentTypes
def parse_row(index, row, visit_columns, appointmentTypes, voucher_types):
# Languages
if len(row['LANGUAGES']) == 0 and len(row['PREFERED WRITEN LANGUAGE']) == 0:
logging.warn('No Languages available')
elif len(row['LANGUAGES']) == 0 and len(row['PREFERED WRITEN LANGUAGE']) > 0:
row['LANGUAGES'] = row['PREFERED WRITEN LANGUAGE']
elif len(row['LANGUAGES']) > 0 and len(row['PREFERED WRITEN LANGUAGE']) == 0:
row['PREFERED WRITEN LANGUAGE'] = row['LANGUAGES']
languages = []
for language in row['LANGUAGES']:
lang, created = Language.objects.get_or_create(
name=language, locale=locale_table.get(language,(None, None))[0])
languages.append(lang)
if created:
logging.warn('New Language added: {}'.format(language))
lang.save()
for language in row['PREFERED WRITEN LANGUAGE'][:1]:
pref_lang, created = Language.objects.get_or_create(name=language
,locale=locale_table.get(language,(None, None))[0])
if created:
logging.warn(
'New Language (from Prefered) added: {}'.format(language))
pref_lang.save()
# Country
country = row['COUNTRY']
country, created = Country.objects.get_or_create(name=country)
if created:
logging.warn('New Country added: {}'.format(row['COUNTRY']))
country.save()
# Location and Flying Team
prefix = None
prefix = DEFAULT_LOCATION_PREFIX
name=DEFAULT_LOCATION, prefix=prefix)
if created:
logging.warn('New location added: {}'.format(DEFAULT_LOCATION))
location.save()
else:
prefix = 'F'
name='Flying Team', prefix=prefix)
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
if created:
logging.warn('New location added: Flying Team')
location.save()
# Create Flying Team
ft, created = FlyingTeam.objects.get_or_create(
place=row['LOCATION OF FT'])
if created:
logging.warn('New Flying Team added: {}'.format(
row['LOCATION OF FT']))
ft.save()
# Health Partner
# create health partner (Referral)
health_partner, created = Worker.objects.get_or_create(name=row['REFERRAL'])
health_partner.roles.update(role=ROLE_CHOICES_HEALTH_PARTNER)
# create workerStudyRole
workerStudyRole, _ = WorkerStudyRole.objects.update_or_create(
worker=health_partner, study_id=GLOBAL_STUDY_ID, role=ROLE_CHOICES_HEALTH_PARTNER)
health_partner.save()
if created:
logging.warn('New Health Partner added: {}'.format(row['REFERRAL']))
subject, created = Subject.objects.get_or_create(social_security_number=row['SS NUMBER'],
first_name=row['FIRST NAME'],
last_name=row['LAST NAME'],
defaults={
'social_security_number': row['SS NUMBER'],
'first_name': row['FIRST NAME'],
'last_name': row['LAST NAME'],
'sex': row['GENDER'],
'phone_number': row['PHONE NUMBER 1'],
'phone_number_2': row['PHONE NUMBER 2'],
'email': row['E-MAIL'],
'date_born': row['DATE OF BIRTH'],
'address': row['ADDRESS'],
'postal_code': row['POSTAL CODE'],
'city': row['CITY'],
'country': country,
'dead': row['DECEASED'],
'default_written_communication_language': pref_lang
})
subject.languages.set(languages)
subject.save()
if created:
logging.warn('New Subject added with SS number: {}'.format(row['SS NUMBER']))
# StudySubject
study = Study.objects.filter(id=GLOBAL_STUDY_ID)[0]
nd_number = row['ND NUMBER'].upper().replace('ND', 'PDP')
studySubject, created = StudySubject.objects.get_or_create(subject=subject, nd_number=nd_number,
defaults={
'subject': subject,
'study': study,
'postponed': row['POSTPONED'],
'nd_number': nd_number,
'resigned': row['RESIGNED'],
'resign_reason': row['REASON'],
'type': SUBJECT_TYPE_CHOICES_PATIENT,
'excluded': row['EXCLUDED'],
'exclude_reason': row['REASON.1'],
'previously_in_study': row['PDP 1.0'],
'comments': row['COMMENT'],
'default_location': location,
'flying_team': ft,
'screening_number': get_new_screening_number(prefix),
'date_added': parse_column_date_of_birth(row['DATE ADDED (V1)'])
})
#all study subjects can have all voucher types
studySubject.voucher_types.set(voucher_types.values())
studySubject.save()
if created:
logging.warn('New StudySubject added with ND number: {}'.format(nd_number))
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
#VOUCHERS
voucher_references = row['VOUCHER REFERENCE']
for voucher_reference in voucher_references:
voucher = add_subject_vouchers(voucher_reference, health_partner, voucher_types)
# Visits
# Consider all visits as part of the same visit with multiple appointments
appointments = []
appointment = None
'''
map(date_regex.findall gets all the dates in the strings ignoring comments such as Tel
sum(Ans, []) flattens the resulting list from the map since each findall returns a list
map to convert string to datetime
'''
visit_dates = map(lambda x: datetime.datetime.strptime(
x, '%d.%m.%Y'), sum(map(date_regex.findall, row[visit_columns].values), []))
# get first and last elements of the sorted element
datetime_begin, datetime_end = itemgetter(*[0, -1])(sorted(visit_dates))
datetime_begin = datetime_begin.strftime('%Y-%m-%d')
datetime_end = datetime_end.strftime('%Y-%m-%d')
visit, created = Visit.objects.get_or_create(
subject=studySubject, datetime_begin=datetime_begin, datetime_end=datetime_end, defaults={
'is_finished': True})
if created:
logging.warn('New Visit added for ND number {} starting on {}'.format(
nd_number, datetime_begin))
appointment_types = appointmentTypes[:len(set(visit_dates))] #in this case appointment types are incremental
visit.appointment_types.set(appointment_types)
visit.save()
'''
If there are two Vx with the same date we put together the appointment types in the same appointment
'''
for visit_date in set(visit_dates):
datetime_when = visit_date.strftime('%Y-%m-%d')
# get the indices of each occurrence of the date and use them to get
# the appointment types
appointment_types = itembetter(
indexof(visit_date, visit_dates), appointmentTypes)
# creatre appointment
appointment, _ = Appointment.objects.update_or_create(
visit=visit, length=sum(
[a.default_duration for a in appointment_types]),
flying_team=ft, location=location,
status=Appointment.APPOINTMENT_STATUS_FINISHED, datetime_when=datetime_when)
date_when = visit_date.replace(
hour=9, minute=0, second=0, microsecond=0)
for appointment_type in appointment_types:
app_type_link = AppointmentTypeLink(
appointment=appointment, date_when=date_when,
appointment_type=appointment_type)
date_when += datetime.timedelta(
minutes=appointment_type.default_duration)
app_type_link.save()
def createWorker(password, email='', username='admin', first_name='LCSB', last_name='Admin',
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
specialization='Technician', unit='LCSB'):
# create user
defaults = {'email': email, 'password': password}
user, _ = User.objects.update_or_create(username=username, defaults=defaults)
user.set_password(password)
user.is_superuser = True
user.is_staff = True
user.is_admin = True
user.save()
# create worker
defaults = {'first_name': first_name, 'last_name': last_name,
'email': email, 'unit': unit, 'specialization': specialization
, 'user': user}
worker, _ = Worker.objects.update_or_create(first_name=first_name,
last_name=last_name, defaults=defaults)
locations = Location.objects.all()
worker.locations.set(locations)
languages = Language.objects.all()
worker.languages.set(languages)
worker.save()
# create workerStudyRole
workerStudyRole, _ = WorkerStudyRole.objects.update_or_create(worker=worker,
study_id=GLOBAL_STUDY_ID, role=ROLE_CHOICES_TECHNICIAN)
logging.info('SuperUser and Worker {} created'.format(username))
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
if len(sys.argv) < 2:
logging.warn('Please, execute the program as: python {} file_path.xlsx'.format(sys.argv[0]))
sys.exit(1)
file = sys.argv[1]
if not os.path.isfile(file):
logging.warn('Please, execute the program with a valid file path.')
sys.exit(1)
df = pd.read_excel(file, dtype=object)
df = df.fillna('').astype(unicode)
df.columns = [c.upper() for c in df.columns]
# make transformations
for column, function in converters.items():
logging.warn(column)
df[column] = df[column].apply(function)
# get visits columns
regex = re.compile(r'\(V\d\)')
study = Study.objects.filter(id=GLOBAL_STUDY_ID)[0]
#enable vouchers
study.columns.voucher_types = True
study.columns.vouchers = True
study.nd_number_study_subject_regex = r'^PDP\d{4}$'
study.columns.save()
study.save()
#
visit_columns = filter(regex.search, df.columns)
assessments = OrderedDict([('Cognitive Test', 180), ('Risk Factor', 120),
('Voucher Distribution', 120), ('Follow Up', 90)])
appointmentTypes = create_appointment_types(assessments)
voucher_types_dict = OrderedDict([('Cognitive Activity', 'CA'), ('Neurofit', 'NF'), ('Mobilfit', 'MF'), ('Diet', 'D'),
('Consulte ORL', 'CORL'), ('Physical Activity', 'PA'), ('Individual Cognitive Training', 'IT'), ('Social', 'S'), ('Test', 'T')])
voucher_types = create_voucher_types(voucher_types_dict, study)
# process each row
for index, row in df.iterrows():
parse_row(index, row, visit_columns, appointmentTypes, voucher_types)
#create worker and super user
pass1 = ''
pass2 = None
while pass1 != pass2:
pass1 = getpass.getpass('Please type a password for the Admin user: ')
pass2 = getpass.getpass('Please type your password again: ')
if pass1 != pass2:
print 'Password mismatch, please try again'
createWorker(pass1)