import logging import pyexcel import hashlib from os import fsencode from .export_utils import get_partners_from_daisy, process_possible_date, process_yes_no_dontknow_answer, get_value_list_from_row, is_data, is_study, is_submission, process_yes_no_answer, get_names_from_string class DishXlsExporter: def __init__(self): logging.basicConfig(filename='export_dishxls.log', level=logging.DEBUG, format='%(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S') institutions = get_partners_from_daisy() self.inst_dict = {} self.inst_ac_dict = {} for inst in institutions: self.inst_dict[inst.get('name').lower()] = inst.get('name') for inst in institutions: if inst.get('acronym'): self.inst_ac_dict[inst.get('acronym').lower()] = inst.get('name') self.h = hashlib.md5() self.predefined_data_types = set([ "Omics data", "Genotype data", "Whole genome sequencing", "Exome sequencing", "Genomics variant array", "RNASeq", "Genetic and derived genetic data", "Transcriptome array", "Methylation array", "MicroRNA array", "Metabolomics", "Metagenomics", "Proteomics", "Other omics data", "Clinical Imaging", "Cell Imaging", "Human subject data", "Clinical data", "Lifestyle data", "Socio Economic Data", "Environmental Data", "Other Phenotype data", "Other" ]) def export_submission(self, full_file_path): idx = 1 logging.info('Processing start for ----> {}'.format(full_file_path)) book = pyexcel.get_book(file_name=full_file_path) is_dish = any("_Help" in elem for elem in book.sheet_names()) if is_dish: dataset_dict = { "source": book.filename, "contacts": [], "data_declarations": [], "studies": [], "legal_bases": [] } while idx < book.number_of_sheets(): sheet = book.sheet_by_index(idx) logging.info('Processing sheet ----> {}'.format(book.sheet_names()[idx])) if is_study(sheet): cohort_dict = {'name': sheet[1, 1].strip(), 'description' : sheet[2, 1] + ' ' + sheet[6, 1] + ' ' + sheet[16,1], 'has_ethics_approval' : process_yes_no_answer(sheet[4, 1]), "ethics_approval_notes": sheet[5, 1], "url": sheet[3, 1], "contacts": [{"first_name": get_names_from_string(sheet[8,1])[0], "last_name":get_names_from_string(sheet[8,1])[1], "role": sheet[11,1], "email":sheet[9,1], "affiliations": [self.process_institution(sheet[10,1])] }] } if sheet[12, 1] and sheet[15, 1]: cohort_dict["contacts"].append({"first_name": get_names_from_string(sheet[12,1])[0], "last_name": get_names_from_string(sheet[12,1])[1], "role": sheet[15,1], "email":sheet[13,1], "affiliations": [self.process_institution(sheet[14,1])] }) dataset_dict["studies"].append(cohort_dict) elif is_data(sheet): datadec_dict = {'title' : sheet[1, 1].strip(), 'source_study' : sheet[2, 1].strip(), "data_types":[]} datadec_dict["data_type_notes"] = sheet[7, 1] data_type_info = self.process_data_types(get_value_list_from_row(sheet, 6)) datadec_dict["data_types"].extend(data_type_info[0]) if data_type_info[1]: datadec_dict["data_type_notes"] += " " + data_type_info[1] if sheet[10, 1]: datadec_dict["data_type_notes"] += " Notes on samples: " + sheet[10, 1] #if it involves samples add this as a datatype if process_yes_no_answer(sheet[9, 1]): datadec_dict["data_types"].append('Samples') if sheet[8, 1]: datadec_dict["de_identification"] = self.process_deidentification(sheet[8,1]) datadec_dict["consent_status"] = self.process_consent_status(sheet[32,1]) datadec_dict["consent_status_description"] = sheet[33, 1] if sheet[20, 1]: datadec_dict['subjects_category'] = sheet[20, 1].replace(' & ', '_and_') if sheet[12, 1]: lb_code = self.extract_lb_code(sheet[12, 1]) dataset_dict["legal_bases"].append({"data_declarations": [sheet[1, 1]], "personal_data_codes":["Standard"], "legal_basis_codes":[lb_code], "legal_basis_notes": sheet[12, 0] }) if sheet[13, 1]: lb_code = self.extract_lb_code(sheet[13, 1]) dataset_dict["legal_bases"].append({"data_declarations": [sheet[1, 1]], "personal_data_codes":["Standard"], "legal_basis_codes":[lb_code], "legal_basis_notes": sheet[13, 0] }) if sheet[16, 1]: lb_code = self.extract_lb_code(sheet[16, 1]) dataset_dict["legal_bases"].append({"data_declarations": [sheet[1, 1]], "personal_data_codes":["Special"], "legal_basis_codes":[lb_code], "legal_basis_notes": sheet[16, 0] }) if sheet[17, 1]: lb_code = self.extract_lb_code(sheet[17, 1]) dataset_dict["legal_bases"].append({"data_declarations": [sheet[1, 1]], "personal_data_codes":["Special"], "legal_basis_codes":[lb_code], "legal_basis_notes": sheet[17, 0] }) if sheet[21, 1]: datadec_dict['has_special_subjects'] = process_yes_no_dontknow_answer( sheet[21, 1]) if datadec_dict.get('has_special_subjects') == True and sheet[22, 1]: datadec_dict['special_subject_notes'] = sheet[22, 1] use_restrictions = [] if process_yes_no_dontknow_answer(sheet[24, 1]): use_restrictions.append({'use_class': 'RS-[XX]', 'use_restriction_rule': "CONSTRAINTS", 'use_class_note': sheet[24, 0], 'use_restriction_note': sheet[25,1]}) elif process_yes_no_dontknow_answer(sheet[24, 1]) is not None: use_restrictions.append({'use_class': 'RS-[XX]', 'use_restriction_rule': "NO_CONSTRAINTS", 'use_class_note': sheet[24, 0]}) if process_yes_no_dontknow_answer(sheet[26, 1]): use_restrictions.append({'use_class': 'GS-[XX]', 'use_restriction_rule': "CONSTRAINTS", 'use_class_note': sheet[26, 0], 'use_restriction_note': sheet[27,1]}) elif process_yes_no_dontknow_answer(sheet[26, 1]) is not None: use_restrictions.append({'use_class': 'GS-[XX]', 'use_restriction_rule': "NO_CONSTRAINTS", 'use_class_note': sheet[26, 0]}) if process_yes_no_dontknow_answer(sheet[28, 1]): use_restrictions.append({'use_class': 'IS', 'use_restriction_rule': "CONSTRAINTS", 'use_class_note': sheet[28, 0], 'use_restriction_note': sheet[29,1]}) elif process_yes_no_dontknow_answer(sheet[28, 1]) is not None: use_restrictions.append({'use_class': 'IS', 'use_restriction_rule': "NO_CONSTRAINTS", 'use_class_note': sheet[28, 0]}) if process_yes_no_dontknow_answer(sheet[30, 1]): use_restrictions.append({'use_class': 'TS-[XX]', 'use_restriction_rule': "CONSTRAINTS", 'use_class_note': sheet[30, 0], 'use_restriction_note': sheet[31,1]}) elif process_yes_no_dontknow_answer(sheet[30, 1]) is not None: use_restrictions.append({'use_class': 'IS', 'use_restriction_rule': "NO_CONSTRAINTS", 'use_class_note': sheet[30, 0]}) if process_yes_no_answer(sheet[35, 1]): use_restrictions.append({'use_class': 'PS', 'use_restriction_rule': "CONSTRAINTS", 'use_class_note': sheet[35, 0], 'use_restriction_note': dataset_dict["project"]}) else: use_restrictions.append({'use_class': 'PS', 'use_restriction_rule': "NO_CONSTRAINTS", 'use_class_note': sheet[35, 0]}) if process_yes_no_answer(sheet[36, 1]): datadec_dict["storage_end_date"] = process_possible_date(sheet[37, 1]) use_restrictions.append({'use_class': 'TS-[XX]', 'use_restriction_rule': "CONSTRAINTS", 'use_class_note': sheet[36, 0], 'use_restriction_note': process_possible_date(sheet[37, 1])}) else: use_restrictions.append({'use_class': 'TS-[XX]', 'use_restriction_rule': "NO_CONSTRAINTS", 'use_class_note': sheet[36, 0]}) if process_yes_no_answer(sheet[38, 1]): use_restrictions.append({'use_class': 'PUB', 'use_restriction_rule': "CONSTRAINTS", 'use_class_note': sheet[38, 0], 'use_restriction_note': sheet[39, 1]}) else: use_restrictions.append({'use_class': 'PUB', 'use_restriction_rule': "NO_CONSTRAINTS", 'use_class_note': sheet[38, 0]}) if process_yes_no_answer(sheet[40, 1]): use_restrictions.append({'use_class': 'RTN', 'use_restriction_rule': "CONSTRAINTS", 'use_class_note': sheet[40, 0], 'use_restriction_note': sheet[41, 1]}) else: use_restrictions.append({'use_class': 'RTN', 'use_restriction_rule': "NO_CONSTRAINTS", 'use_class_note': sheet[40, 0]}) if sheet[42, 1]: use_restrictions.append({'use_class': 'Other', 'use_restriction_rule': "CONSTRAINTS", 'use_class_note': sheet[42, 0], 'use_restriction_note': sheet[42, 1]}) if process_yes_no_answer(sheet[47, 1]): use_restrictions.append({'use_class': 'IP', 'use_restriction_rule': "CONSTRAINTS", 'use_class_note': sheet[47, 0], 'use_restriction_note': sheet[48, 1]}) else: use_restrictions.append({'use_class': 'IP', 'use_restriction_rule': "NO_CONSTRAINTS", 'use_class_note': sheet[47, 1]}) datadec_dict['use_restrictions'] = use_restrictions datadec_dict["access_procedure"] = "" if sheet[45, 1] and ('not' in sheet[45, 1]): if sheet[44, 1] and ('no' in sheet[44, 1]): datadec_dict["access_category"] = "open-access" #this is just an initial interpretation and should be further curated in catalog datadec_dict["access_procedure"] = datadec_dict["access_procedure"] + "No additional form is needed to request access." else: datadec_dict["access_category"] = "registered-access" #this is just an initial interpretation and should be further curated in catalog datadec_dict["access_procedure"] = datadec_dict["access_procedure"] + "Additional form is needed to request access." else: datadec_dict["access_category"] = "controlled-access" datadec_dict["access_procedure"] = datadec_dict["access_procedure"] + sheet[46, 1] dataset_dict["data_declarations"].append(datadec_dict) elif is_submission(sheet): dataset_dict["name"] = sheet[2, 1].strip() dataset_dict["project"] = sheet[5, 1].strip() dataset_dict["contacts"].extend([{"first_name": get_names_from_string(sheet[9, 1])[0], "last_name":get_names_from_string(sheet[9, 1])[1], "role": sheet[11,1], "email":sheet[10,1].strip(), "affiliations": [self.process_institution(sheet[7,1])] }, {"first_name": get_names_from_string(sheet[12, 1])[0], "last_name":get_names_from_string(sheet[12, 1])[1], "role": "Legal_Representative", "email":sheet[13,1].strip(), "affiliations": [self.process_institution(sheet[7,1])] }, {"first_name": get_names_from_string(sheet[14, 1])[0], "last_name":get_names_from_string(sheet[14, 1])[1], "role": "Data_Protection_Officer", "email":sheet[15,1].strip(), "affiliations": [self.process_institution(sheet[7,1])] }]) if sheet[16, 1] and sheet[18, 1]: dataset_dict["contacts"].append({"first_name": get_names_from_string(sheet[16, 1])[0], "last_name": get_names_from_string(sheet[16, 1])[1], "role": sheet[18,1], "email": sheet[17,1].strip(), "affiliations": [self.process_institution(sheet[7,1])] }) else: pass idx += 1 logging.info('Processing end for ----> {}'.format(full_file_path)) return dataset_dict else: raise ValueError("{} not a valid DISH Excel file".format(full_file_path)) def get_hash_for_path(self, path): self.h.update(fsencode(path)) return str(int(self.h.hexdigest(), 16)) def process_data_types(self, xls_data_type_list): result = [] data_type_notes = '' for type_name in xls_data_type_list: type_name = type_name.strip() if type_name: if type_name in self.predefined_data_types: result.append(type_name.replace(" ", "_")) else: data_type_notes += type_name + '\n' return (result, data_type_notes) def process_deidentification(self, deid_str): if 'seu' in deid_str: return 'pseudonymized' elif 'non' in deid_str: return 'anonymized' def process_consent_status(self, consent_str): if 'et' in consent_str: return 'heterogeneous' elif 'om' in consent_str: return 'homogeneous' def process_institution(self, institution_str): if institution_str: if self.inst_ac_dict.get(institution_str.lower()): return self.inst_ac_dict.get(institution_str.lower()) elif self.inst_dict.get(institution_str.lower()): return self.inst_dict.get(institution_str.lower()) else: logging.error('Unknown institution -- > {}'.format(institution_str)) return institution_str else: return "" def extract_lb_code(self, lb_value): op = lb_value.index('(') cp = lb_value.rindex(')') return lb_value[op+1:cp]