-
Carlos Vega authoredCarlos Vega authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
import_file.py 20.44 KiB
# coding=utf-8
import os
import getpass
import django
from django.conf import settings
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "smash.settings")
django.setup()
import pandas as pd
import numpy as np
import logging
import datetime
import re
from operator import itemgetter
from collections import OrderedDict, defaultdict
import sys
import string
from django.contrib.auth.models import User
from web.models.constants import VOUCHER_STATUS_IN_USE, SUBJECT_TYPE_CHOICES_PATIENT, GLOBAL_STUDY_ID, SEX_CHOICES, SEX_CHOICES_MALE, SEX_CHOICES_FEMALE
from web.algorithm import VerhoeffAlgorithm, LuhnAlgorithm
from web.utils import is_valid_social_security_number
from web.models import VoucherType, Voucher, Country, AppointmentTypeLink, AppointmentType, Study, Worker, Language, Subject, WorkerStudyRole, StudySubject, Location, FlyingTeam, Visit, Appointment, AppointmentType
from web.models.worker_study_role import ROLE_CHOICES_TECHNICIAN, WORKER_STAFF, ROLE_CHOICES_SECRETARY, ROLE_CHOICES_HEALTH_PARTNER, \
WORKER_HEALTH_PARTNER, ROLE_CHOICES_VOUCHER_PARTNER, WORKER_VOUCHER_PARTNER
DEFAULT_LOCATION = 'CHL'
DEFAULT_LOCATION_PREFIX = 'P'
date_regex = re.compile(r'\d{1,2}\.\d{1,2}\.\d{4}')
def get_new_screening_number(screening_number_prefix):
result_number = 0
subjects = StudySubject.objects.filter(screening_number__contains=screening_number_prefix)
for subject in subjects:
screening_numbers = subject.screening_number.split(";")
for screening_number in screening_numbers:
screening_number = screening_number.strip()
if screening_number.startswith(screening_number_prefix):
number = screening_number[len(screening_number_prefix)+1:]
try:
result_number = max(result_number, int(number))
except ValueError:
pass
return screening_number_prefix + '-' + str(result_number + 1).zfill(3)
def itembetter(items, lst):
if len(items) == 1:
return [itemgetter(*items)(lst)]
else:
return list(itemgetter(*items)(lst))
def indexof(element, l):
return [i for i, x in enumerate(l) if x == element]
'''
# Things that *could* already be on the database:
- Language
- Country
- Location
- Flying Team
- Referals (Health Partner)
- A subject with the same SS number, first name and last name
- A studySubject with the same ND number, subject
# Columns to be transformed to a standard format
- Gender
- Language
- Prefered writen language
- Country
- SS number
- Date of birth
- Date added (V1) # visits might have comments like (Tel)
- ... (V2)
- ... (V3)
- ... (V4)
- Voucher activity (remove cells that include None in any form and split by breakline)
- Voucher reference (split)
Boolean
- Deceased
- Postponed
- Resigned
- Excluded
- PDP 1.0
'''
'''
Column Converter Functions
'''
# converters
# Boolean:
# Deceased
# Postponed
# Resigned
# Excluded
# PDP 1.0
# Flying Team (FT)
def parse_voucher_reference(vr):
vr = vr.strip() #strip spaces
return vr.split('\n') if vr != u'' else [] #if empty string then return empty list, otherwise split by break line
def parse_voucher_type(vt):
vt = '' if 'NONE' in vt.upper() else vt #if vt includes none in any form, then return empty
vt = vt.strip() #strip spaces
return vt.split('\n') if vt != u'' else [] #if empty string then return empty list, otherwise split by break line
def parse_boolean(boolean_Y_N):
'''
Return True if 'y' or 'Y' is found.
Otherwise return False even if it fails
'''
try:
if isinstance(boolean_Y_N, float) and np.isnan(boolean_Y_N):
return False
elif boolean_Y_N.upper() == 'Y':
return True
else:
return False
except Exception as e:
logging.warn('parse_boolean failed for {}.'.format(boolean_Y_N))
logging.warn('{} {}'.format(e.message, e.args))
return False
# birth date
def parse_column_date_of_birth(date):
return datetime.datetime.strptime(date, '%d.%m.%Y').strftime('%Y-%m-%d')
# gender
gender_table = {'m': SEX_CHOICES_MALE, 'f': SEX_CHOICES_FEMALE}
def parse_column_gender(gender):
try:
return gender_table[gender.lower()]
except:
return None
# SS number
def parse_column_ss_number(ss):
ss = ss.replace(' ', '')
if not ss.isdigit():
return None
if len(ss) == 11:
logging.debug('Old SS number: |{}|'.format(ss))
eleventh_digit = VerhoeffAlgorithm.calculate_verhoeff_check_sum(ss[
:11])
twelfth_digit = LuhnAlgorithm.calculate_luhn_checksum(ss[:11])
ss = ss + eleventh_digit + twelfth_digit
# we revalidate the old ss too after we append the digits
if len(ss) == 13:
if not is_valid_social_security_number(ss):
logging.debug('Invalid SS number: |{}|'.format(ss))
else:
logging.debug('Invalid SS number: (Length not valid) |{}|'.format(ss))
return ss
# Language
language_table = {
'L': 'Luxembourgish',
'LT': 'Lithuanian',
'IT': 'Italian',
'F': 'French',
'D': 'German',
'E': 'English',
'P': 'Portuguese',
'A': 'Arabic',
'SP': 'Spanish',
'FIN': 'Finnish'
}
locale_table = {
'Luxembourgish': ('lb_LU', 'LU'),
'Lithuanian': ('lt_LT', 'LT'),
'Italian': ('it_IT', 'IT'),
'French': ('fr_FR', 'FR'),
'German': ('de_DE', 'DE'),
'English': ('en_GB', 'GB'),
'Portuguese': ('pt_PT', 'PT'),
'Arabic': ('ar_sa', None),
'Spanish': ('es_ES', 'ES') ,
'Finnish': ('fi_FI', 'FI')
}
language_translation_table = {
# deletions
ord(u')'): None,
ord(u'('): None,
ord(u' '): None,
# replacements
ord(u','): u';'
}
def apply_column_prefered_language(languages):
return apply_column_languages(languages)[:1]
def apply_column_languages(languages):
languages = languages.strip()
if type(languages) != float and len(languages) > 0:
# replacements and transformations
languages = unicode(languages).upper().translate(
language_translation_table)
new_list = []
for language in languages.split(';'):
if language in language_table:
language = language_table[language]
new_list.append(language)
return np.array(new_list)
else:
logging.debug(
'Parse Languages: Empty, NaN, or invalid Languages: |{}|'.format(languages))
return np.array([])
# Country
country_table = {
'LUX': 'Luxembourg'
}
def apply_column_country(country):
try:
return country_table[country]
except:
logging.warn('Invalid Country: {}'.format(country))
return country
'''
Instead of using the converters parameter from read_excel method,
we opt for make the transformations later since the read_excel method does not allow
converters that return a list.
'''
converters = {
'DATE OF BIRTH': parse_column_date_of_birth,
'GENDER': parse_column_gender,
'SS NUMBER': parse_column_ss_number,
'COUNTRY': apply_column_country,
'LANGUAGES': apply_column_languages,
'PREFERED WRITEN LANGUAGE': apply_column_prefered_language,
'DECEASED': parse_boolean,
'POSTPONED': parse_boolean,
'RESIGNED': parse_boolean,
'EXCLUDED': parse_boolean,
'PDP 1.0': parse_boolean,
'FLYING TEAM (FT)': parse_boolean,
'VOUCHER ACTIVITY': parse_voucher_type,
'VOUCHER REFERENCE': parse_voucher_reference
}
# add voucher for subject
voucher_partners = {}
voucher_partners['ZIT'] = 'Zitha'
def add_subject_vouchers(voucher_reference, referral, voucher_types):
nd_number, date, voucher_partner, voucher_type, num = voucher_reference.split('-')
nd_number = nd_number.upper().replace('ND', 'PDP')
issue_date = datetime.datetime.strptime(date, '%Y%m%d')
expiry_date = issue_date + datetime.timedelta(days=365)
usage_partner, created = Worker.objects.update_or_create(
name=voucher_partners.get(voucher_partner, voucher_partner))
usage_partner.roles.update(role=ROLE_CHOICES_VOUCHER_PARTNER)
# create workerStudyRole
workerStudyRole, _ = WorkerStudyRole.objects.update_or_create(worker=usage_partner,
study_id=GLOBAL_STUDY_ID, role=ROLE_CHOICES_VOUCHER_PARTNER)
usage_partner.voucher_types.set(voucher_types.values())
usage_partner.save()
if created:
logging.warn('New Voucher Partner created: {}'.format(voucher_partner))
vt = VoucherType.objects.get(code=voucher_type)
study_subject = StudySubject.objects.get(nd_number=nd_number)
voucher, created = Voucher.objects.update_or_create(number=voucher_reference, issue_date=issue_date,
expiry_date=expiry_date, voucher_type=vt, study_subject=study_subject,
status=VOUCHER_STATUS_IN_USE, usage_partner=usage_partner, issue_worker=referral)
logging.warn('New Voucher added: {}'.format(voucher_reference))
return voucher
# create voucher types
def create_voucher_types(voucher_types_dict, study):
voucher_types = {}
for name, code in voucher_types_dict.items():
voucher_type, _ = VoucherType.objects.update_or_create(code=code, description=name, study=study)
voucher_types[name] = voucher_type
return voucher_types
# create appointment types
def create_appointment_types(assessments):
appointmentTypes = []
for name, duration in assessments.items():
code = filter(str.isupper, name)
appointmentType, _ = AppointmentType.objects.update_or_create(
code=code, default_duration=duration, description=name)
appointmentType.save()
appointmentTypes.append(appointmentType)
return appointmentTypes
def parse_row(index, row, visit_columns, appointmentTypes, voucher_types):
# Languages
if len(row['LANGUAGES']) == 0 and len(row['PREFERED WRITEN LANGUAGE']) == 0:
logging.warn('No Languages available')
elif len(row['LANGUAGES']) == 0 and len(row['PREFERED WRITEN LANGUAGE']) > 0:
row['LANGUAGES'] = row['PREFERED WRITEN LANGUAGE']
elif len(row['LANGUAGES']) > 0 and len(row['PREFERED WRITEN LANGUAGE']) == 0:
row['PREFERED WRITEN LANGUAGE'] = row['LANGUAGES']
languages = []
for language in row['LANGUAGES']:
lang, created = Language.objects.get_or_create(
name=language, locale=locale_table.get(language,(None, None))[0])
languages.append(lang)
if created:
logging.warn('New Language added: {}'.format(language))
lang.save()
for language in row['PREFERED WRITEN LANGUAGE'][:1]:
pref_lang, created = Language.objects.get_or_create(name=language
,locale=locale_table.get(language,(None, None))[0])
if created:
logging.warn(
'New Language (from Prefered) added: {}'.format(language))
pref_lang.save()
# Country
country = row['COUNTRY']
country, created = Country.objects.get_or_create(name=country)
if created:
logging.warn('New Country added: {}'.format(row['COUNTRY']))
country.save()
# Location and Flying Team
# If no FT, then default location is CHL
ft = None
location = None
prefix = None
if not row['FLYING TEAM (FT)']:
prefix = DEFAULT_LOCATION_PREFIX
location, created = Location.objects.get_or_create(
name=DEFAULT_LOCATION, prefix=prefix)
if created:
logging.warn('New location added: {}'.format(DEFAULT_LOCATION))
location.save()
else:
prefix = 'F'
location, created = Location.objects.get_or_create(
name='Flying Team', prefix=prefix)
if created:
logging.warn('New location added: Flying Team')
location.save()
# Create Flying Team
ft, created = FlyingTeam.objects.get_or_create(
place=row['LOCATION OF FT'])
if created:
logging.warn('New Flying Team added: {}'.format(
row['LOCATION OF FT']))
ft.save()
# Health Partner
# create health partner (Referral)
health_partner, created = Worker.objects.get_or_create(name=row['REFERRAL'])
health_partner.roles.update(role=ROLE_CHOICES_HEALTH_PARTNER)
# create workerStudyRole
workerStudyRole, _ = WorkerStudyRole.objects.update_or_create(
worker=health_partner, study_id=GLOBAL_STUDY_ID, role=ROLE_CHOICES_HEALTH_PARTNER)
health_partner.save()
if created:
logging.warn('New Health Partner added: {}'.format(row['REFERRAL']))
subject, created = Subject.objects.get_or_create(social_security_number=row['SS NUMBER'],
first_name=row['FIRST NAME'],
last_name=row['LAST NAME'],
defaults={
'social_security_number': row['SS NUMBER'],
'first_name': row['FIRST NAME'],
'last_name': row['LAST NAME'],
'sex': row['GENDER'],
'phone_number': row['PHONE NUMBER 1'],
'phone_number_2': row['PHONE NUMBER 2'],
'email': row['E-MAIL'],
'date_born': row['DATE OF BIRTH'],
'address': row['ADDRESS'],
'postal_code': row['POSTAL CODE'],
'city': row['CITY'],
'country': country,
'dead': row['DECEASED'],
'default_written_communication_language': pref_lang
})
subject.languages.set(languages)
subject.save()
if created:
logging.warn('New Subject added with SS number: {}'.format(row['SS NUMBER']))
# StudySubject
study = Study.objects.filter(id=GLOBAL_STUDY_ID)[0]
nd_number = row['ND NUMBER'].upper().replace('ND', 'PDP')
studySubject, created = StudySubject.objects.get_or_create(subject=subject, nd_number=nd_number,
defaults={
'subject': subject,
'study': study,
'postponed': row['POSTPONED'],
'nd_number': nd_number,
'resigned': row['RESIGNED'],
'resign_reason': row['REASON'],
'type': SUBJECT_TYPE_CHOICES_PATIENT,
'excluded': row['EXCLUDED'],
'exclude_reason': row['REASON.1'],
'previously_in_study': row['PDP 1.0'],
'comments': row['COMMENT'],
'default_location': location,
'flying_team': ft,
'screening_number': get_new_screening_number(prefix),
'date_added': parse_column_date_of_birth(row['DATE ADDED (V1)'])
})
#all study subjects can have all voucher types
studySubject.voucher_types.set(voucher_types.values())
studySubject.save()
if created:
logging.warn('New StudySubject added with ND number: {}'.format(nd_number))
#VOUCHERS
voucher_references = row['VOUCHER REFERENCE']
for voucher_reference in voucher_references:
voucher = add_subject_vouchers(voucher_reference, health_partner, voucher_types)
# Visits
# Consider all visits as part of the same visit with multiple appointments
appointments = []
appointment = None
'''
map(date_regex.findall gets all the dates in the strings ignoring comments such as Tel
sum(Ans, []) flattens the resulting list from the map since each findall returns a list
map to convert string to datetime
'''
visit_dates = map(lambda x: datetime.datetime.strptime(
x, '%d.%m.%Y'), sum(map(date_regex.findall, row[visit_columns].values), []))
# get first and last elements of the sorted element
datetime_begin, datetime_end = itemgetter(*[0, -1])(sorted(visit_dates))
datetime_begin = datetime_begin.strftime('%Y-%m-%d')
datetime_end = datetime_end.strftime('%Y-%m-%d')
visit, created = Visit.objects.get_or_create(
subject=studySubject, datetime_begin=datetime_begin, datetime_end=datetime_end, defaults={
'is_finished': True})
if created:
logging.warn('New Visit added for ND number {} starting on {}'.format(
nd_number, datetime_begin))
appointment_types = appointmentTypes[:len(set(visit_dates))] #in this case appointment types are incremental
visit.appointment_types.set(appointment_types)
visit.save()
'''
If there are two Vx with the same date we put together the appointment types in the same appointment
'''
for visit_date in set(visit_dates):
datetime_when = visit_date.strftime('%Y-%m-%d')
# get the indices of each occurrence of the date and use them to get
# the appointment types
appointment_types = itembetter(
indexof(visit_date, visit_dates), appointmentTypes)
# creatre appointment
appointment, _ = Appointment.objects.update_or_create(
visit=visit, length=sum(
[a.default_duration for a in appointment_types]),
flying_team=ft, location=location,
status=Appointment.APPOINTMENT_STATUS_FINISHED, datetime_when=datetime_when)
date_when = visit_date.replace(
hour=9, minute=0, second=0, microsecond=0)
for appointment_type in appointment_types:
app_type_link = AppointmentTypeLink(
appointment=appointment, date_when=date_when,
appointment_type=appointment_type)
date_when += datetime.timedelta(
minutes=appointment_type.default_duration)
app_type_link.save()
def createWorker(password, email='', username='admin', first_name='LCSB', last_name='Admin',
specialization='Technician', unit='LCSB'):
# create user
defaults = {'email': email, 'password': password}
user, _ = User.objects.update_or_create(username=username, defaults=defaults)
user.set_password(password)
user.is_superuser = True
user.is_staff = True
user.is_admin = True
user.save()
# create worker
defaults = {'first_name': first_name, 'last_name': last_name,
'email': email, 'unit': unit, 'specialization': specialization
, 'user': user}
worker, _ = Worker.objects.update_or_create(first_name=first_name,
last_name=last_name, defaults=defaults)
locations = Location.objects.all()
worker.locations.set(locations)
languages = Language.objects.all()
worker.languages.set(languages)
worker.save()
# create workerStudyRole
workerStudyRole, _ = WorkerStudyRole.objects.update_or_create(worker=worker,
study_id=GLOBAL_STUDY_ID, role=ROLE_CHOICES_TECHNICIAN)
logging.info('SuperUser and Worker {} created'.format(username))
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
if len(sys.argv) < 2:
logging.warn('Please, execute the program as: python {} file_path.xlsx'.format(sys.argv[0]))
sys.exit(1)
file = sys.argv[1]
if not os.path.isfile(file):
logging.warn('Please, execute the program with a valid file path.')
sys.exit(1)
df = pd.read_excel(file, dtype=object)
df = df.fillna('').astype(unicode)
df.columns = [c.upper() for c in df.columns]
# make transformations
for column, function in converters.items():
logging.warn(column)
df[column] = df[column].apply(function)
# get visits columns
regex = re.compile(r'\(V\d\)')
#
study = Study.objects.filter(id=GLOBAL_STUDY_ID)[0]
#enable vouchers
study.columns.voucher_types = True
study.columns.vouchers = True
study.nd_number_study_subject_regex = r'^PDP\d{4}$'
study.columns.save()
study.save()
#
visit_columns = filter(regex.search, df.columns)
assessments = OrderedDict([('Cognitive Test', 180), ('Risk Factor', 120),
('Voucher Distribution', 120), ('Follow Up', 90)])
appointmentTypes = create_appointment_types(assessments)
voucher_types_dict = OrderedDict([('Cognitive Activity', 'CA'), ('Neurofit', 'NF'), ('Mobilfit', 'MF'), ('Diet', 'D'),
('Consulte ORL', 'CORL'), ('Physical Activity', 'PA'), ('Individual Cognitive Training', 'IT'), ('Social', 'S'), ('Test', 'T')])
voucher_types = create_voucher_types(voucher_types_dict, study)
# process each row
for index, row in df.iterrows():
parse_row(index, row, visit_columns, appointmentTypes, voucher_types)
#create worker and super user
pass1 = ''
pass2 = None
while pass1 != pass2:
pass1 = getpass.getpass('Please type a password for the Admin user: ')
pass2 = getpass.getpass('Please type your password again: ')
if pass1 != pass2:
print 'Password mismatch, please try again'
createWorker(pass1)