Skip to content
Snippets Groups Projects
Commit a296da02 authored by Piotr Gawron's avatar Piotr Gawron
Browse files

allow to import type from csv file

parent 279c93f0
No related branches found
No related tags found
1 merge request!305Resolve "Allow for adding modifying types of subjects"
......@@ -5,7 +5,7 @@ from typing import List, Type, Tuple
from django.db import models
from django.db.models import Field
from web.models import StudySubject, Subject, SubjectImportData, Language
from web.models import StudySubject, Subject, SubjectImportData, Language, SubjectType
from .etl_common import EtlCommon
from .subject_import_reader import SubjectImportReader
......@@ -56,16 +56,16 @@ class CsvSubjectImportReader(SubjectImportReader):
value = self.get_value_for_foreign_field(field, value)
if table == Subject:
old_val = getattr(study_subject.subject, field.name)
old_val = self.get_field_value(study_subject.subject, field)
setattr(study_subject.subject, field.name, self.get_new_value(old_val, value))
elif table == StudySubject:
old_val = getattr(study_subject, field.name)
old_val = self.get_field_value(study_subject, field)
setattr(study_subject, field.name, self.get_new_value(old_val, value))
else:
logger.warning("Don't know how to handle column " + column_name + " with data " + value)
@staticmethod
def get_value_for_foreign_field(field, value):
def get_value_for_foreign_field(field: Field, value: str):
if field.related_model == Language:
if value == "":
return None
......@@ -74,10 +74,25 @@ class CsvSubjectImportReader(SubjectImportReader):
if language is None:
language = Language.objects.create(name=value)
return language
elif field.related_model == SubjectType:
subject_type = SubjectType.objects.filter(name=value).first()
if subject_type is None:
subject_type = SubjectType.objects.all().first()
logger.warning(
"Subject type does not exist: '" + str(value) + "'. Changing to: '" + subject_type.name + "'")
return subject_type
else:
logger.warning("Don't know how to handle type " + str(field.related_model))
return None
@staticmethod
def get_field_value(model_object: models.Model, field: Field):
# for foreign keys we need to check if the key id is not none, otherwise for not nullable fields exception
# would be raised
if field.get_internal_type() == "ForeignKey" and getattr(model_object, field.name + "_id") is None:
return None
return getattr(model_object, field.name)
def get_table_and_field(self, column_name: str) -> Tuple[Type[models.Model], Field]:
return self.mappings.get(column_name, (None, None))
......@@ -86,7 +101,7 @@ class CsvSubjectImportReader(SubjectImportReader):
if field.get_internal_type() == "CharField" or \
field.get_internal_type() == "DateField" or \
field.get_internal_type() == "TextField" or \
(field.get_internal_type() == "ForeignKey" and field.related_model in (Language,)):
(field.get_internal_type() == "ForeignKey" and field.related_model in (Language, SubjectType)):
found = False
for mapping in self.import_data.column_mappings.all():
if mapping.table_name == object_type._meta.db_table and field.name == mapping.column_name:
......
first_name,last_name,participant_id,type
Piotr,Gawron,Cov-000001,PATIENT
Piotr,Gawron,Cov-000002,CONTROL
Piotr,Gawron,Cov-000003,
Piotr,Gawron,Cov-000004,UNKNOWN
......@@ -7,7 +7,8 @@ from django.test import TestCase
from web.importer import CsvSubjectImportReader, MsgCounterHandler
from web.models import SubjectImportData, EtlColumnMapping, StudySubject, Country
from web.models.constants import COUNTRY_AFGHANISTAN_ID
from web.tests.functions import get_resource_path, get_test_study, create_tns_column_mapping, create_location
from web.tests.functions import get_resource_path, get_test_study, create_tns_column_mapping, create_location, \
get_control_subject_type, get_patient_subject_type
logger = logging.getLogger(__name__)
......@@ -60,6 +61,16 @@ class TestCsvReader(TestCase):
self.assertIsNotNone(study_subjects[1].subject.default_written_communication_language)
self.assertIsNone(study_subjects[2].subject.default_written_communication_language)
def test_load_type(self):
self.subject_import_data.filename = get_resource_path('import_type.csv')
study_subjects = CsvSubjectImportReader(self.subject_import_data).load_data()
self.assertEqual(4, len(study_subjects))
self.assertEqual(get_patient_subject_type(), study_subjects[0].type)
self.assertEqual(get_control_subject_type(), study_subjects[1].type)
self.assertIsNotNone(study_subjects[2].type)
self.assertIsNotNone(study_subjects[3].type)
def test_load_data_for_tns(self):
self.subject_import_data = SubjectImportData.objects.create(study=get_test_study(),
date_format="%d/%m/%Y",
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment