Commit 09e732a6 authored by Sascha Herzinger's avatar Sascha Herzinger

Merge branch 'jochemb/fractalis-transmart-microETL' into beta

parents cca25acb 75b10cfc
Pipeline #5451 passed with stages
in 36 minutes and 45 seconds
......@@ -35,7 +35,7 @@ class ETL(Task, metaclass=abc.ABCMeta):
@abc.abstractmethod
def produces(self) -> str:
"""This specifies the fractalis internal format that this ETL
produces. Can be one of: ['categorical', 'numerical']
produces. Can be one of: ['categorical', 'numerical', 'numerical_array']
"""
pass
......
"""Provides categorical concept ETL for tranSMART."""
import logging
from fractalis.data.etls.transmart.shared import create_etl_type, CATEGORICAL_FIELD
from pandas import DataFrame
from fractalis.data.etl import ETL
from fractalis.data.etls.transmart.shared import extract_data
logger = logging.getLogger(__name__)
class CategoricalETL(ETL):
"""CategoricalETL implements support for tranSMARTs 'categorical' type."""
name = 'transmart_categorical_etl'
produces = 'categorical'
@staticmethod
def can_handle(handler: str, descriptor: dict) -> bool:
return handler == 'transmart' and \
descriptor['data_type'] == 'categorical'
def extract(self, server: str, token: str, descriptor: dict) -> dict:
return extract_data(server=server, descriptor=descriptor, token=token)
def transform(self, raw_data: dict, descriptor: dict) -> DataFrame:
rows = []
for entry in raw_data['cells']:
idx = entry['dimensionIndexes'][2]
id = raw_data['dimensionElements']['patient'][idx]['inTrialId']
value = entry['numericValue']
rows.append([id, value])
df = DataFrame(rows, columns=['id', 'value'])
return df
CategoricalETL = create_etl_type(
name_='transmart_categorical_etl',
produces_='categorical',
field_name=CATEGORICAL_FIELD
)
"""Provides highdim concept ETL for tranSMART."""
import logging
from fractalis.data.etls.transmart.shared import create_etl_type, NUMERICAL_FIELD
import requests
from pandas import DataFrame
from fractalis.data.etl import ETL
logger = logging.getLogger(__name__)
class HighdimETL(ETL):
"""HighdimETL implements support for tranSMARTs 'highdim' type."""
name = 'transmart_highdim_etl'
produces = 'highdim'
@staticmethod
def can_handle(handler: str, descriptor: dict) -> bool:
return handler == 'transmart' and descriptor['data_type'] == 'highdim'
def extract(self, server: str, token: str, descriptor: dict) -> dict:
r = requests.get(url='{}/v2/observations'.format(server),
params={
'constraint': '{{"type": "concept","path": "{}"}}'
''.format(descriptor["path"]),
'projection': 'log_intensity',
'type': 'autodetect'
},
headers={
'Accept': 'application/x-protobuf',
'Authorization': 'Bearer {}'.format(token)
},
timeout=2000)
if r.status_code != 200:
error = "Target server responded with " \
"status code {}.".format(r.status_code)
logger.error(error)
raise ValueError(error)
try:
pass # TODO
except Exception as e:
logger.exception(e)
raise ValueError("Got unexpected data format.")
def transform(self, raw_data: dict, descriptor: dict) -> DataFrame:
rows = []
for entry in raw_data['cells']:
idx = entry['dimensionIndexes'][2]
id = raw_data['dimensionElements']['patient'][idx]['inTrialId']
value = entry['numericValue']
rows.append([id, value])
df = DataFrame(rows, columns=['id', 'value'])
return df
HighdimETL = create_etl_type(
name_='transmart_highdim_etl',
produces_='numerical_array',
field_name=NUMERICAL_FIELD
)
"""Provides numerical concept ETL for tranSMART."""
import logging
from fractalis.data.etls.transmart.shared import create_etl_type, NUMERICAL_FIELD
from pandas import DataFrame
from fractalis.data.etl import ETL
from fractalis.data.etls.transmart.shared import extract_data
logger = logging.getLogger(__name__)
class NumericalETL(ETL):
"""NumericalETL implements support for tranSMARTs 'numerical' type."""
name = 'transmart_numerical_etl'
produces = 'numerical'
@staticmethod
def can_handle(handler: str, descriptor: dict) -> bool:
return (handler == 'transmart' and
descriptor['data_type'] == 'numerical')
def extract(self, server: str, token: str, descriptor: dict) -> dict:
return extract_data(server=server, descriptor=descriptor, token=token)
def transform(self, raw_data: dict, descriptor: dict) -> DataFrame:
rows = []
for entry in raw_data['cells']:
idx = entry['dimensionIndexes'][2]
id = raw_data['dimensionElements']['patient'][idx]['inTrialId']
value = entry['numericValue']
rows.append([id, value])
df = DataFrame(rows, columns=['id', 'value'])
return df
NumericalETL = create_etl_type(
name_='transmart_numerical_etl',
produces_='numerical',
field_name=NUMERICAL_FIELD
)
......@@ -26,7 +26,7 @@ class TransmartHandler(ETLHandler):
@staticmethod
def make_label(descriptor: dict) -> str:
return descriptor['path']
return descriptor['label']
def _get_token_for_credentials(self, server: str, auth: dict) -> str:
try:
......
"""This module provides shared functionality to the transmart ETLs."""
import logging
import pandas as pd
from urllib.parse import unquote_plus
import requests
from fractalis.data.etl import ETL
logger = logging.getLogger(__name__)
NUMERICAL_FIELD = 'numericValue'
CATEGORICAL_FIELD = 'stringValue'
def extract_data(server: str, descriptor: dict, token: str) -> dict:
"""Extract data from transmart.
......@@ -14,24 +20,115 @@ def extract_data(server: str, descriptor: dict, token: str) -> dict:
:param descriptor: Dict describing the data to download.
:param token: The token used for authentication.
"""
params = dict(
constraint=descriptor['constraint'],
type='clinical'
)
if descriptor['data_type'] == 'numerical_array':
params['type'] = 'autodetect'
params['projection'] = 'log_intensity'
if 'biomarker_constraint' in descriptor:
params['biomarker_constraint'] = descriptor['biomarker_constraint']
r = requests.get(url='{}/v2/observations'.format(server),
params={
'constraint': '{{"type": "concept","path": "{}"}}'
''.format(descriptor["path"]),
'type': 'autodetect'
},
params=params,
headers={
'Accept': 'application/json',
'Authorization': 'Bearer {}'.format(token)
},
timeout=2000)
logger.info('URL called: {}'.format(
unquote_plus(r.url))
)
if r.status_code != 200:
error = "Target server responded with " \
"status code {}.".format(r.status_code)
error = "Target server responded with status code {}. Message: {}.".\
format(r.status_code, r.json())
logger.error(error)
raise ValueError(error)
try:
return r.json()
except Exception as e:
logger.exception(e)
raise ValueError("Got unexpected data format.")
def get_dimension_index(obs, dimension):
return list(obs['dimensionElements'].keys()).index(dimension)
def get_dimension_element(obs, dimension, index):
return obs['dimensionElements'][dimension][index]
def transform_clinical(raw_data: dict, value_field: str) -> pd.DataFrame:
patient_idx = get_dimension_index(raw_data, 'patient')
rows = []
for entry in raw_data['cells']:
patient_element = entry['dimensionIndexes'][patient_idx]
patient = get_dimension_element(raw_data, 'patient', patient_element)
rows.append([
patient['inTrialId'],
entry[value_field]
])
df = pd.DataFrame(rows, columns=['id', 'value'])
feature = df.columns[1]
df.insert(1, 'feature', feature)
return df
def transform_highdim(raw_data: dict):
sample_idx = get_dimension_index(raw_data, 'assay')
feature_idx = get_dimension_index(raw_data, 'biomarker')
rows = []
for entry in raw_data['cells']:
sample_element = entry['dimensionIndexes'][sample_idx]
sample = get_dimension_element(raw_data, 'assay', sample_element)
feature_element = entry['dimensionIndexes'][feature_idx]
feature = get_dimension_element(raw_data, 'biomarker', feature_element)
rows.append([
sample['sampleCode'],
entry[NUMERICAL_FIELD],
feature['label']
])
df = pd.DataFrame(rows, columns=['id', 'value', 'feature'])
return df
def create_etl_type(name_, produces_, field_name):
"""
Create a ETL task class based on a specific input type.
:param name_: task name for registry.
:param produces_: output type.
:param field_name: name of cell in observation (numericValue, stringValue)
:return: ETL task class
"""
class TransmartETL(ETL):
name = name_
produces = produces_
@staticmethod
def can_handle(handler: str, descriptor: dict) -> bool:
return handler == 'transmart' and descriptor['data_type'] == produces_
def extract(self, server: str, token: str, descriptor: dict) -> dict:
return extract_data(server=server, descriptor=descriptor, token=token)
def transform(self, raw_data: dict, descriptor: dict) -> pd.DataFrame:
if self.produces in ('numerical', 'categorical'):
return transform_clinical(raw_data, field_name)
if self.produces == 'numerical_array':
return transform_highdim(raw_data)
return TransmartETL
......@@ -12,6 +12,10 @@ from fractalis.data.etls.transmart.etl_categorical import CategoricalETL
class TestCategoricalETL:
etl = CategoricalETL()
descriptor = dict(
constraint='',
data_type='categorical'
)
def test_correct_handler(self):
assert self.etl.can_handle(handler='transmart',
......@@ -31,7 +35,7 @@ class TestCategoricalETL:
content_type='application/json')
with pytest.raises(ValueError) as e:
self.etl.extract(server='http://foo.bar',
token='', descriptor={'path': ''})
token='', descriptor=self.descriptor)
assert '[400]' in e
def test_extract_raises_readable_if_not_json(self):
......@@ -42,7 +46,7 @@ class TestCategoricalETL:
content_type='application/json')
with pytest.raises(ValueError) as e:
self.etl.extract(server='http://foo.bar',
token='', descriptor={'path': ''})
token='', descriptor=self.descriptor)
assert 'unexpected data' in e
def test_extract_works_for_valid_input(self):
......@@ -52,12 +56,12 @@ class TestCategoricalETL:
status=200,
content_type='application/json')
raw_data = self.etl.extract(server='http://foo.bar',
token='', descriptor={'path': ''})
token='', descriptor=self.descriptor)
assert isinstance(raw_data, dict)
def test_transform_valid_input_correct_output(self):
body = {
"cells": [{"inlineDimensions": ["292278994-08-16T23:00:00Z", None, "@"], "dimensionIndexes": [0, 0, 0, None, 0, None, None], "numericValue": 52.0}], # noqa: 501
"cells": [{"inlineDimensions": ["292278994-08-16T23:00:00Z", None, "@"], "dimensionIndexes": [0, 0, 0, None, 0, None, None], "stringValue": 'FOO'}], # noqa: 501
"dimensionElements": {"patient": [{"id": 1000421548, "deathDate": None, "birthDate": None, "race": None, "maritalStatus": None, "inTrialId": "3052", "age": 52, "trial": "GSE4382", "sexCd": None, "sex": "unknown", "religion": None}]} # noqa: E501
}
with responses.RequestsMock() as response:
......@@ -66,8 +70,8 @@ class TestCategoricalETL:
status=200,
content_type='application/json')
raw_data = self.etl.extract(server='http://foo.bar',
token='', descriptor={'path': ''})
df = self.etl.transform(raw_data=raw_data, descriptor={'path': ''})
assert df.shape == (1, 2)
assert df.values.tolist() == [['3052', 52.0]]
assert list(df) == ['id', 'value']
token='', descriptor=self.descriptor)
df = self.etl.transform(raw_data=raw_data, descriptor=self.descriptor)
assert df.shape == (1, 3)
assert df.values.tolist() == [['3052', 'value', 'FOO']]
assert list(df) == ['id', 'feature', 'value']
......@@ -16,9 +16,9 @@ class TestHighdimlETL:
def test_correct_handler(self):
assert self.etl.can_handle(handler='transmart',
descriptor={'data_type': 'highdim'})
descriptor={'data_type': 'numerical_array'})
assert not self.etl.can_handle(handler='ada',
descriptor={'data_type': 'highdim'})
descriptor={'data_type': 'numerical_array'})
assert not self.etl.can_handle(handler='ada',
descriptor={'data_type': 'categorical'})
assert not self.etl.can_handle(handler='ada',
......
......@@ -12,6 +12,10 @@ from fractalis.data.etls.transmart.etl_numerical import NumericalETL
class TestNumericalETL:
etl = NumericalETL()
descriptor = dict(
constraint='',
data_type='numerical'
)
def test_correct_handler(self):
assert self.etl.can_handle(handler='transmart',
......@@ -31,7 +35,7 @@ class TestNumericalETL:
content_type='application/json')
with pytest.raises(ValueError) as e:
self.etl.extract(server='http://foo.bar',
token='', descriptor={'path': ''})
token='', descriptor=self.descriptor)
assert '[400]' in e
def test_extract_raises_readable_if_not_json(self):
......@@ -42,7 +46,7 @@ class TestNumericalETL:
content_type='application/json')
with pytest.raises(ValueError) as e:
self.etl.extract(server='http://foo.bar',
token='', descriptor={'path': ''})
token='', descriptor=self.descriptor)
assert 'unexpected data' in e
def test_extract_works_for_valid_input(self):
......@@ -52,7 +56,7 @@ class TestNumericalETL:
status=200,
content_type='application/json')
raw_data = self.etl.extract(server='http://foo.bar',
token='', descriptor={'path': ''})
token='', descriptor=self.descriptor)
assert isinstance(raw_data, dict)
def test_transform_valid_input_correct_output(self):
......@@ -65,9 +69,10 @@ class TestNumericalETL:
body=json.dumps(body),
status=200,
content_type='application/json')
raw_data = self.etl.extract(server='http://foo.bar',
token='', descriptor={'path': ''})
df = self.etl.transform(raw_data=raw_data, descriptor={'path': ''})
assert df.shape == (1, 2)
assert df.values.tolist() == [['3052', 52.0]]
assert list(df) == ['id', 'value']
token='', descriptor=self.descriptor)
df = self.etl.transform(raw_data=raw_data, descriptor=self.descriptor)
assert df.shape == (1, 3)
assert df.values.tolist() == [['3052', 'value', 52.0]]
assert list(df) == ['id', 'feature', 'value']
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment