Commit 42653ef1 authored by Pinar Alper's avatar Pinar Alper
Browse files

schemas extended to cater for all fields in Data Information Sheet DISH

parent 8e442d95
__pycache__/
# Visual studio code
.vscode
\ No newline at end of file
.vscode
project_venv
# Elixir Metadata Utility Tools
# Elixir LU metadata utilities
## Development
## Development environment setup
Install dependencies with:
Create virtual environment:
```bash
mkdir project_venv
python3 -m venv project_venv
source ./project_venv/bin/activate
```
Install dependencies (single brackets are required if you are using `zsh`):
```bash
pip install -e .[dev]
pip install -e '.[dev]'
```
## Testing
Run tests with:
```bash
python setup.py test
```
## Current Version
......
import pkg_resources
import json
import hashlib
from os import fsencode
from metadata_tools.importxls.export_utils import get_lines_from_string
class DatasetExporter:
def __init__(self):
with open(pkg_resources.resource_filename('metadata_tools', 'resources/elu_institutions.json'), encoding='utf-8') as institutions_file:
institutions = json.loads(institutions_file.read())
self.institution_dict = {}
for inst in institutions:
self.institution_dict[inst.get('institution_name')] = inst.get('elu_accession')
self.h = hashlib.md5()
self.predefined_data_types = set([
"Omics data",
"Genotype data",
"Whole genome sequencing",
"Exome sequencing",
"Genomics variant array",
"RNASeq",
"Genetic and derived genetic data",
"Transcriptome array",
"Methylation array",
"MicroRNA array",
"Metabolomics",
"Metagenomics",
"Proteomics",
"Other omics data",
"Clinical Imaging",
"Cell Imaging",
"Human subject data",
"Clinical data",
"Lifestyle data",
"Socio Economic Data",
"Environmental Data",
"Other Phenotype data",
"Other"
])
self.predefined_storage_types = set([
'hpc_chaos_home',
'hpc_chaos_project',
'hpc_gaia_home',
'hpc_gaia_project',
'hpc_gaia_work',
'hpc_iris_home',
'hpc_iris_project',
'hpc_scratch_personal',
'hpc_scratch_project',
'hpc_isilon',
'atlas_personal',
'atlas_project',
'hpc_backup_chaos',
'hpc_backup_gaia',
'bertha',
'certon_block',
'lcsb_group_server',
'lcsb_desktop',
'lcsb_laptop',
'personal_laptop',
'Owncloud',
'External Storage (e.g. Hard disk, DVD)',
'Other'
])
def get_hash_for_path(self, path):
self.h.update(fsencode(path))
return str(int(self.h.hexdigest(), 16))
def lookup_institution_accession(self, institution_name):
if institution_name not in self.institution_dict.keys():
print ('Undefined institution -- > {}'.format(institution_name))
return None
else:
return self.institution_dict[institution_name]
def process_data_types(self, xls_data_type_list):
result = []
data_type_notes = ''
for type_name in xls_data_type_list:
type_name = type_name.strip()
if type_name:
if type_name in self.predefined_data_types:
result.append(type_name.replace(" ", "_"))
else:
data_type_notes += type_name + '\n'
return (result, data_type_notes)
def is_storage_resource(self, resource):
if resource in self.predefined_storage_types:
return True
else:
print('Unknow Storage resource --> {}'.format(resource))
return False
def get_storage_location(self, resource, path, category):
result = {}
if self.is_application(path):
result['storage_resource'] = 'application'
elif resource in self.predefined_storage_types:
result['storage_resource'] = resource
else:
result['storage_resource'] = 'Other'
path_lines = []
path_lines.extend(get_lines_from_string(path))
result['locations'] = path_lines
result['category'] = category
return result
def get_samples_storage(self, sample_location):
return [{'storage_resource':'sample-storage', 'locations':[sample_location], 'category':'master'}]
def is_application(self, path):
if ("transmart" in path.lower()) or ("redcap" in path.lower()):
return True
else:
return False
def process_share_list(self, shares):
share_list = []
for shr in shares:
if ";" not in shr:
if self.lookup_institution_accession(shr.strip()):
share_list.append({'share_inst': self.lookup_institution_accession(shr.strip())})
else:
share_list.append({'share_notes': shr})
else:
infos = shr.split(";")
share_list.append({'share_inst': self.lookup_institution_accession(infos[0].strip()),
'share_notes': infos[1].strip()})
return share_list
def build_storage_locations(self, locations_list, category):
result = []
if len(locations_list) % 2 != 0 and len(locations_list) > 0:
if len(locations_list) == 1:
if self.is_storage_resource(locations_list[0]):
result.append(self.get_storage_location(locations_list[0],'<missing_info>',category))
else:
result.append(self.get_storage_location('Other',locations_list[0],category))
else:
raise ValueError('Uneven Master Data Location Row')
elif len(locations_list) % 2 == 0 and len(locations_list) > 0:
s = 0
e = len(locations_list) // 2
while s < e:
if self.is_storage_resource(locations_list[s * 2]):
result.append(self.get_storage_location(locations_list[s * 2], locations_list[s*2+1], category))
else:
result.append(self.get_storage_location('Other', [locations_list[s * 2]], category))
s += 1
return result
import logging
import pyexcel
import hashlib
from os import fsencode
from .export_utils import get_partners_from_daisy, process_possible_date, process_yes_no_dontknow_answer, get_value_list_from_row, is_data, is_study, is_submission, process_yes_no_answer, get_names_from_string
class DishXlsExporter:
def __init__(self):
logging.basicConfig(filename='export_dishxls.log', level=logging.DEBUG)
institutions = get_partners_from_daisy()
self.inst_dict = {}
self.inst_ac_dict = {}
for inst in institutions:
self.inst_dict[inst.get('name').lower()] = inst.get('name')
for inst in institutions:
if inst.get('acronym'):
self.inst_ac_dict[inst.get('acronym').lower()] = inst.get('name')
self.h = hashlib.md5()
self.predefined_data_types = set([
"Omics data",
"Genotype data",
"Whole genome sequencing",
"Exome sequencing",
"Genomics variant array",
"RNASeq",
"Genetic and derived genetic data",
"Transcriptome array",
"Methylation array",
"MicroRNA array",
"Metabolomics",
"Metagenomics",
"Proteomics",
"Other omics data",
"Clinical Imaging",
"Cell Imaging",
"Human subject data",
"Clinical data",
"Lifestyle data",
"Socio Economic Data",
"Environmental Data",
"Other Phenotype data",
"Other"
])
def export_submission(self, full_file_path):
idx = 1
logging.info('Processing start for ----> {}'.format(full_file_path))
book = pyexcel.get_book(file_name=full_file_path)
is_dish = any("_Help" in elem for elem in book.sheet_names())
if is_dish:
dataset_dict = {
"source": book.filename,
"contacts": [],
"data_declarations": [],
"studies": [],
"legal_bases": []
}
while idx < book.number_of_sheets():
sheet = book.sheet_by_index(idx)
logging.info('Processing sheet ----> {}'.format(book.sheet_names()[idx]))
if is_study(sheet):
cohort_dict = {'name': sheet[1, 1],
'description' : sheet[2, 1] + ' ' + sheet[6, 1],
'has_ethics_approval' : process_yes_no_answer(sheet[4, 1]),
"ethics_approval_notes": sheet[5, 1],
"url": sheet[3, 1],
"contacts": [{"first_name": get_names_from_string(sheet[8,1])[0],
"last_name":get_names_from_string(sheet[8,1])[1],
"role": sheet[11,1],
"email":sheet[9,1],
"affiliations": [self.process_institution(sheet[10,1])]
}]
}
if sheet[12, 1] and sheet[15, 1]:
cohort_dict["contacts"] = cohort_dict["contacts"].append({"first_name": get_names_from_string(sheet[12,1])[0],
"last_name": get_names_from_string(sheet[12,1])[1],
"role": sheet[15,1],
"email":sheet[13,1],
"affiliations": [self.process_institution(sheet[14,1])]
})
dataset_dict["studies"].append(cohort_dict)
elif is_data(sheet):
datadec_dict = {'title' : sheet[1, 1],
'source_study' : sheet[2, 1],
"data_types":[]}
datadec_dict["data_type_notes"] = sheet[7, 1]
data_type_info = self.process_data_types(get_value_list_from_row(sheet, 6))
datadec_dict["data_types"].extend(data_type_info[0])
datadec_dict["data_type_notes"] = datadec_dict["data_type_notes"] +" "+ data_type_info[1]+ " Notes on samples: " + sheet[10, 1]
#if it involves samples add this as a datatype
if process_yes_no_answer(sheet[9, 1]):
datadec_dict["data_types"].append('Samples')
if sheet[8, 1]:
datadec_dict["de_identification"] = self.process_deidentification(sheet[8,1])
datadec_dict["consent_status"] = self.process_consent_status(sheet[32,1])
datadec_dict["consent_status_description"] = sheet[33, 1]
if sheet[20, 1]:
datadec_dict['subject_categories'] = sheet[20, 1].replace(' & ', '_and_')
if sheet[12, 1]:
lb_code = self.extract_lb_code(sheet[12, 1])
dataset_dict["legal_bases"].append({"data_declarations": [sheet[1, 1]],
"personal_data_codes":["Standard"],
"legal_basis_codes":[lb_code],
"legal_basis_notes": sheet[12, 0]
})
if sheet[13, 1]:
lb_code = self.extract_lb_code(sheet[13, 1])
dataset_dict["legal_bases"].append({"data_declarations": [sheet[1, 1]],
"personal_data_codes":["Standard"],
"legal_basis_codes":[lb_code],
"legal_basis_notes": sheet[13, 0]
})
if sheet[16, 1]:
lb_code = self.extract_lb_code(sheet[16, 1])
dataset_dict["legal_bases"].append({"data_declarations": [sheet[1, 1]],
"personal_data_codes":["Special"],
"legal_basis_codes":[lb_code],
"legal_basis_notes": sheet[16, 0]
})
if sheet[17, 1]:
lb_code = self.extract_lb_code(sheet[17, 1])
dataset_dict["legal_bases"].append({"data_declarations": [sheet[1, 1]],
"personal_data_codes":["Special"],
"legal_basis_codes":[lb_code],
"legal_basis_notes": sheet[17, 0]
})
if sheet[21, 1]:
datadec_dict['has_special_subjects'] = process_yes_no_dontknow_answer(
sheet[21, 1])
if datadec_dict.get('has_special_subjects') == True and sheet[22, 1]:
datadec_dict['special_subject_notes'] = sheet[22, 1]
use_restrictions = []
if process_yes_no_dontknow_answer(sheet[24, 1]):
use_restrictions.append({'use_class': 'RS[XX]',
'use_restriction_rule': "CONSTRAINTS",
'use_class_note': sheet[25, 1]})
elif process_yes_no_dontknow_answer(sheet[24, 1]) is not None:
use_restrictions.append({'use_class': 'RS[XX]',
'use_restriction_rule': "NO_CONSTRAINTS",
'use_class_note': sheet[25, 1]})
if process_yes_no_dontknow_answer(sheet[26, 1]):
use_restrictions.append({'use_class': 'GS[XX]',
'use_restriction_rule': "CONSTRAINTS",
'use_class_note': sheet[27, 1]})
elif process_yes_no_dontknow_answer(sheet[26, 1]) is not None:
use_restrictions.append({'use_class': 'GS[XX]',
'use_restriction_rule': "NO_CONSTRAINTS",
'use_class_note': sheet[27, 1]})
if process_yes_no_dontknow_answer(sheet[28, 1]):
use_restrictions.append({'use_class': 'IS',
'use_restriction_rule': "CONSTRAINTS",
'use_class_note': sheet[29, 1]})
elif process_yes_no_dontknow_answer(sheet[28, 1]) is not None:
use_restrictions.append({'use_class': 'IS',
'use_restriction_rule': "NO_CONSTRAINTS",
'use_class_note': sheet[29, 1]})
if process_yes_no_dontknow_answer(sheet[30, 1]):
use_restrictions.append({'use_class': 'TS-[XX]',
'use_restriction_rule': "CONSTRAINTS",
'use_class_note': sheet[31, 1]})
elif process_yes_no_dontknow_answer(sheet[30, 1]) is not None:
use_restrictions.append({'use_class': 'IS',
'use_restriction_rule': "NO_CONSTRAINTS",
'use_class_note': sheet[31, 1]})
if process_yes_no_answer(sheet[35, 1]):
use_restrictions.append({'use_class': 'PS',
'use_restriction_rule': "CONSTRAINTS",
'use_class_note': sheet[35, 0] + " PROJECT: "+dataset_dict["project"]})
else:
use_restrictions.append({'use_class': 'PS',
'use_restriction_rule': "NO_CONSTRAINTS",
'use_class_note': sheet[35, 0]})
if process_yes_no_answer(sheet[36, 1]):
datadec_dict["storage_end_date"] = process_possible_date(sheet[37, 1])
use_restrictions.append({'use_class': 'TS-[XX]',
'use_restriction_rule': "CONSTRAINTS",
'use_class_note': process_possible_date(sheet[37, 1])})
else:
use_restrictions.append({'use_class': 'TS-[XX]',
'use_restriction_rule': "NO_CONSTRAINTS",
'use_class_note': process_possible_date(sheet[37, 1])})
if process_yes_no_answer(sheet[38, 1]):
use_restrictions.append({'use_class': 'PUB',
'use_restriction_rule': "CONSTRAINTS",
'use_class_note': sheet[39, 1]})
else:
use_restrictions.append({'use_class': 'PUB',
'use_restriction_rule': "NO_CONSTRAINTS",
'use_class_note': sheet[39, 1]})
if process_yes_no_answer(sheet[42, 1]):
use_restrictions.append({'use_class': 'Other',
'use_restriction_rule': "CONSTRAINTS",
'use_class_note': sheet[42, 1]})
if process_yes_no_answer(sheet[47, 1]):
use_restrictions.append({'use_class': 'IP',
'use_restriction_rule': "CONSTRAINTS",
'use_class_note': sheet[48, 1]})
else:
use_restrictions.append({'use_class': 'IP',
'use_restriction_rule': "NO_CONSTRAINTS",
'use_class_note': sheet[48, 1]})
datadec_dict['use_restrictions'] = use_restrictions
datadec_dict["access_procedure"] = ""
if sheet[45, 1] and ('not' in sheet[45, 1]):
if sheet[44, 1] and ('no' in sheet[44, 1]):
datadec_dict["access_category"] = "open-access"
datadec_dict["access_procedure"] = datadec_dict["access_procedure"] + "Researchers need to sign an access request form."
else:
datadec_dict["access_category"] = "registered-access"
else:
datadec_dict["access_category"] = "controlled-access"
datadec_dict["access_procedure"] = datadec_dict["access_procedure"] + sheet[46, 1]
dataset_dict["data_declarations"].append(datadec_dict)
elif is_submission(sheet):
dataset_dict["name"] = sheet[2, 1]
dataset_dict["project"] = sheet[5, 1]
dataset_dict["contacts"].extend([{"first_name": get_names_from_string(sheet[9, 1])[0],
"last_name":get_names_from_string(sheet[9, 1])[1],
"role": sheet[11,1],
"email":sheet[10,1],
"affiliations": [self.process_institution(sheet[7,1])]
},
{"first_name": get_names_from_string(sheet[12, 1])[0],
"last_name":get_names_from_string(sheet[12, 1])[1],
"role": "Legal_Representative",
"email":sheet[10,1],
"affiliations": [self.process_institution(sheet[7,1])]
},
{"first_name": get_names_from_string(sheet[14, 1])[0],
"last_name":get_names_from_string(sheet[14, 1])[1],
"role": "Data_Protection_Officer",
"email":sheet[10,1],
"affiliations": [self.process_institution(sheet[7,1])]
}])
if sheet[16, 1] and sheet[18, 1]:
dataset_dict["contacts"].append({"first_name": get_names_from_string(sheet[14, 1])[0],
"last_name": get_names_from_string(sheet[14, 1])[1],
"role": sheet[18,1],
"email": sheet[10,1],
"affiliations": [self.process_institution(sheet[7,1])]
})
else:
pass
idx += 1
logging.info('Processing end for ----> {}'.format(full_file_path))
return dataset_dict
else:
raise ValueError("{} not a valid DISH Excel file".format(full_file_path))
def get_hash_for_path(self, path):
self.h.update(fsencode(path))
return str(int(self.h.hexdigest(), 16))
def process_data_types(self, xls_data_type_list):
result = []
data_type_notes = ''
for type_name in xls_data_type_list:
type_name = type_name.strip()
if type_name:
if type_name in self.predefined_data_types:
result.append(type_name.replace(" ", "_"))
else:
data_type_notes += type_name + '\n'
return (result, data_type_notes)
def process_deidentification(self, deid_str):
if 'seu' in deid_str:
return 'pseudonymized'
elif 'non' in deid_str:
return 'anonymized'
def process_consent_status(self, consent_str):
if 'et' in consent_str:
return 'heterogeneous'
elif 'om' in consent_str:
return 'homogeneous'
def process_institution(self, institution_str):
if institution_str:
if self.inst_ac_dict.get(institution_str.lower()):
return self.inst_ac_dict.get(institution_str.lower())
elif self.inst_dict.get(institution_str.lower()):
return self.inst_dict.get(institution_str.lower())
else:
logging.error('Unknown institution -- > {}'.format(institution_str))
return institution_str
else:
return ""
def extract_lb_code(self, lb_value):
op = lb_value.index('(')
cp = lb_value.rindex(')')
return lb_value[op+1:cp]
from datetime import datetime
import datetime as dt
import json
import urllib
from json import dumps
from urllib.error import HTTPError, URLError
from socket import timeout
import logging
def process_yes_no_answer(answer):
"""
convert yes/no answers to boolean we take empty answers as no
......@@ -29,15 +36,26 @@ def process_yes_no_dontknow_answer(answer):
return None
def is_data_sheet(fname):
return fname.startswith('from-repository') or fname.startswith('from-collaborator') or fname.startswith(
'own-cohort')
def is_data(sheet):
str = sheet[1,0] if sheet[1, 0] else ''
logging.info('Is data sheet ----> {}'.format(str))
return 'data' in str.lower()
def is_study(sheet):
str = sheet[1,0] if sheet[1, 0] else ''
logging.info('Is study sheet ----> {}'.format(str))
return 'study' in str.lower()
def is_submission(sheet):
str = sheet[1,0] if sheet[1, 0] else ''
logging.info('Is submission sheet ----> {}'.format(str))
return 'submission' in str.lower()
def get_value_list_from_row(sheet, row_idx):
result = []
vals = sheet.row[row_idx]
data_vals = vals[2:]
data_vals = vals[1:]
for val in data_vals:
if val:
result.append(val)
......@@ -85,4 +103,30 @@ def get_lines_from_string(a_string):
for line in line_list:
if line:
result.append(line)
return result
\ No newline at end of file
return result
def get_partners_from_daisy():