diff --git a/smash/web/importer/csv_subject_import_reader.py b/smash/web/importer/csv_subject_import_reader.py index e83325b40873bfaa774cb6aa08e0ac525e56c4b8..45271ab7a7101dd5cab2c459bb448d87e7bd2cd2 100644 --- a/smash/web/importer/csv_subject_import_reader.py +++ b/smash/web/importer/csv_subject_import_reader.py @@ -8,7 +8,8 @@ from django.db import models from django.db.models import Field from web.models import StudySubject, Subject, SubjectImportData -from .subject_import_reader import SubjectImportReader, EtlCommon +from .subject_import_reader import SubjectImportReader +from .etl_common import EtlCommon logger = logging.getLogger(__name__) diff --git a/smash/web/importer/csv_tns_visit_import_reader.py b/smash/web/importer/csv_tns_visit_import_reader.py index b1d25f61de30cc8f062cdf914a34247b29f58af0..8d549ce270a32f00b07bb70973af0d0cbdf8002a 100644 --- a/smash/web/importer/csv_tns_visit_import_reader.py +++ b/smash/web/importer/csv_tns_visit_import_reader.py @@ -9,7 +9,7 @@ import pytz from web.models import StudySubject, Visit, Appointment, Location, AppointmentTypeLink, Subject from web.models.etl.visit_import import VisitImportData -from .subject_import_reader import EtlCommon +from .etl_common import EtlCommon from .warning_counter import MsgCounterHandler logger = logging.getLogger(__name__) diff --git a/smash/web/importer/etl_common.py b/smash/web/importer/etl_common.py new file mode 100644 index 0000000000000000000000000000000000000000..a7659c865841a2cb7d5f24deeb4b78a6cb025149 --- /dev/null +++ b/smash/web/importer/etl_common.py @@ -0,0 +1,90 @@ +import codecs +import datetime +from typing import Type, Optional + +from django.db import models + +from importer.subject_import_reader import logger +from models import Provenance +from models.etl.etl import EtlData + + +class EtlCommon: + def __init__(self, import_data: EtlData): + self.etl_data = import_data + + def get_new_date_value(self, old_value: datetime, column_name: str, new_value: str) -> datetime: + if old_value is None or old_value == "": + try: + result = datetime.datetime.strptime(new_value, self.etl_data.date_format) + except ValueError: + logger.warning("Invalid date: " + new_value) + result = old_value + return result + if new_value is None or new_value == "": + return old_value + logger.warning( + "Contradicting entries in csv file for column: " + column_name + "(" + new_value + "," + old_value + + "). Latest value will be used") + return datetime.datetime.strptime(new_value, self.etl_data.date_format) + + @staticmethod + def get_new_value(old_value: object, column_name: str, new_value: object) -> object: + if old_value is None or old_value == "": + return new_value + if new_value is None or new_value == "": + return old_value + if old_value == new_value: + return new_value + if type(new_value) == str and type(old_value) == str: + if new_value in old_value: + return old_value + if old_value in new_value: + return new_value + logger.warning( + "Contradicting entries in csv file for column: " + column_name + "(" + str(new_value) + "," + str( + old_value) + "). Latest value will be used") + return new_value + + def create_provenance_and_change_data(self, object_to_change: models.Model, field_name: str, new_value: object, + object_type: Type[models.Model]) -> Optional[Provenance]: + old_value = getattr(object_to_change, field_name) + if old_value != new_value: + setattr(object_to_change, field_name, new_value) + return self.create_provenance(field_name, new_value, object_to_change, object_type, old_value) + return None + + def create_provenance(self, field_name: str, new_value: object, object_to_change: models.Model, + object_type: Type[models.Model], old_value: object) -> Provenance: + description = '{} changed from "{}" to "{}"'.format(field_name, old_value, new_value) + p = Provenance(modified_table=object_type._meta.db_table, + modified_table_id=object_to_change.id, + modification_author=self.etl_data.import_worker, + previous_value=old_value, + new_value=new_value, + modification_description=description, + modified_field=field_name, + ) + p.save() + return p + + def create_provenance_for_new_object(self, object_type: Type[models.Model], new_object: models.Model) -> list: + result = [] + for field in object_type._meta.get_fields(): + if field.get_internal_type() == "CharField" or \ + field.get_internal_type() == "DateField" or \ + field.get_internal_type() == "IntegerField" or \ + field.get_internal_type() == "DateTimeField" or \ + field.get_internal_type() == "BooleanField": + new_value = getattr(new_object, field.name) + if new_value is not None and new_value != "": + p = self.create_provenance(field.name, new_value, new_object, object_type, '') + result.append(p) + return result + + @staticmethod + def remove_bom(line) -> str: + if type(line) == str: + return line[3:] if line.encode('utf8').startswith(codecs.BOM_UTF8) else line + else: + return line[3:] if line.startswith(codecs.BOM_UTF8) else line \ No newline at end of file diff --git a/smash/web/importer/subject_import_reader.py b/smash/web/importer/subject_import_reader.py index 31dc5088b5bfc1928f679e0d6156c461fb24544d..6cb36b33c1f83b6a9f864b14f0df0a895e774e61 100644 --- a/smash/web/importer/subject_import_reader.py +++ b/smash/web/importer/subject_import_reader.py @@ -1,98 +1,13 @@ -import codecs -import datetime import logging -from typing import List, Type, Optional +from typing import List -from django.db import models - -from web.models import SubjectImportData, Provenance -from web.models.etl.etl import EtlData +from importer.etl_common import EtlCommon +from web.models import SubjectImportData from web.models.study_subject import StudySubject logger = logging.getLogger(__name__) -class EtlCommon: - def __init__(self, import_data: EtlData): - self.etl_data = import_data - - def get_new_date_value(self, old_value: datetime, column_name: str, new_value: str) -> datetime: - if old_value is None or old_value == "": - try: - result = datetime.datetime.strptime(new_value, self.etl_data.date_format) - except ValueError: - logger.warning("Invalid date: " + new_value) - result = old_value - return result - if new_value is None or new_value == "": - return old_value - logger.warning( - "Contradicting entries in csv file for column: " + column_name + "(" + new_value + "," + old_value + - "). Latest value will be used") - return datetime.datetime.strptime(new_value, self.etl_data.date_format) - - @staticmethod - def get_new_value(old_value: object, column_name: str, new_value: object) -> object: - if old_value is None or old_value == "": - return new_value - if new_value is None or new_value == "": - return old_value - if old_value == new_value: - return new_value - if type(new_value) == str and type(old_value) == str: - if new_value in old_value: - return old_value - if old_value in new_value: - return new_value - logger.warning( - "Contradicting entries in csv file for column: " + column_name + "(" + str(new_value) + "," + str( - old_value) + "). Latest value will be used") - return new_value - - def create_provenance_and_change_data(self, object_to_change: models.Model, field_name: str, new_value: object, - object_type: Type[models.Model]) -> Optional[Provenance]: - old_value = getattr(object_to_change, field_name) - if old_value != new_value: - setattr(object_to_change, field_name, new_value) - return self.create_provenance(field_name, new_value, object_to_change, object_type, old_value) - return None - - def create_provenance(self, field_name: str, new_value: object, object_to_change: models.Model, - object_type: Type[models.Model], old_value: object) -> Provenance: - description = '{} changed from "{}" to "{}"'.format(field_name, old_value, new_value) - p = Provenance(modified_table=object_type._meta.db_table, - modified_table_id=object_to_change.id, - modification_author=self.etl_data.import_worker, - previous_value=old_value, - new_value=new_value, - modification_description=description, - modified_field=field_name, - ) - p.save() - return p - - def create_provenance_for_new_object(self, object_type: Type[models.Model], new_object: models.Model) -> list: - result = [] - for field in object_type._meta.get_fields(): - if field.get_internal_type() == "CharField" or \ - field.get_internal_type() == "DateField" or \ - field.get_internal_type() == "IntegerField" or \ - field.get_internal_type() == "DateTimeField" or \ - field.get_internal_type() == "BooleanField": - new_value = getattr(new_object, field.name) - if new_value is not None and new_value != "": - p = self.create_provenance(field.name, new_value, new_object, object_type, '') - result.append(p) - return result - - @staticmethod - def remove_bom(line) -> str: - if type(line) == str: - return line[3:] if line.encode('utf8').startswith(codecs.BOM_UTF8) else line - else: - return line[3:] if line.startswith(codecs.BOM_UTF8) else line - - class SubjectImportReader(EtlCommon): def __init__(self, import_data: SubjectImportData): super().__init__(import_data)