Commit 0efa34aa authored by Sascha Herzinger's avatar Sascha Herzinger
Browse files

Improved API for ada to save dictionary request

parent b76890eb
Pipeline #2164 passed with stage
in 5 minutes and 19 seconds
......@@ -4,7 +4,6 @@ import os
import abc
import json
import logging
from typing import List
from celery import Task
from pandas import DataFrame
......@@ -29,24 +28,6 @@ class ETL(Task, metaclass=abc.ABCMeta):
"""Used by celery to identify this task by name."""
pass
@property
@abc.abstractmethod
def _handler(self) -> str:
"""Used by self.can_handle to check whether the current implementation
belongs to a certain handler. This is necessary to avoid conflicts with
other ETL with identical self.name field.
"""
pass
@property
@abc.abstractmethod
def _accepts(self) -> List[str]:
"""Used by self.can_handle to check whether the current implementation
can handle the given data type. One ETL can handle multiple data types,
therefor this is a list.
"""
pass
@property
@abc.abstractmethod
def produces(self) -> str:
......@@ -55,32 +36,33 @@ class ETL(Task, metaclass=abc.ABCMeta):
"""
pass
@classmethod
def can_handle(cls, handler: str, data_type: str) -> bool:
@staticmethod
@abc.abstractmethod
def can_handle(handler: str, descriptor: dict) -> bool:
"""Check if the current implementation of ETL can handle given handler
and data type.
:param handler: Describes the handler. E.g.: transmart, ada
:param data_type: Describes the data type. E.g.: ldd, hdd
:param descriptor: Describes the data that we want to download.
:return: True if implementation can handle given parameters.
"""
return handler == cls._handler and data_type == cls._accepts
pass
@classmethod
def factory(cls, handler: str, data_type: str) -> 'ETL':
def factory(cls, handler: str, descriptor: dict) -> 'ETL':
"""Return an instance of the implementation ETL that can handle the
given parameters.
:param handler: Describes the handler. E.g.: transmart, ada
:param data_type: Describes the data type. E.g.: ldd, hdd
:param descriptor: Describes the data that we want to download.
:return: An instance of an implementation of ETL that returns True for
self.can_handle
can_handle()
"""
from . import ETL_REGISTRY
for etl in ETL_REGISTRY:
if etl.can_handle(handler, data_type):
return etl()
for ETL_TASK in ETL_REGISTRY:
if ETL_TASK.can_handle(handler, descriptor):
return ETL_TASK()
raise NotImplementedError(
"No ETL implementation found for handler '{}' and data type '{}'"
.format(handler, data_type))
"No ETL implementation found for handler '{}' and descriptor '{}'"
.format(handler, descriptor))
@abc.abstractmethod
def extract(self, server: str, token: str, descriptor: dict) -> object:
......@@ -93,10 +75,12 @@ class ETL(Task, metaclass=abc.ABCMeta):
pass
@abc.abstractmethod
def transform(self, raw_data: object) -> DataFrame:
def transform(self, raw_data: object, descriptor: dict) -> DataFrame:
"""Transform the data into a pandas.DataFrame with a naming according to
the Fractalis standard format.
:param raw_data: The data to transform.
:param raw_data: The return value of extract().
:param descriptor: The data descriptor, sometimes needed
for transformation
"""
pass
......@@ -130,7 +114,7 @@ class ETL(Task, metaclass=abc.ABCMeta):
logger.info("(E)xtracting data from server '{}'.".format(server))
raw_data = self.extract(server, token, descriptor)
logger.info("(T)ransforming data to Fractalis format.")
data_frame = self.transform(raw_data)
data_frame = self.transform(raw_data, descriptor)
if not isinstance(data_frame, DataFrame):
error = "transform() must return 'pandas.DataFrame', " \
"but returned '{}' instead.".format(type(data_frame))
......
......@@ -48,24 +48,29 @@ class ETLHandler(metaclass=abc.ABCMeta):
self._token = self._get_token_for_credentials(
server, auth['user'], auth['passwd'])
@staticmethod
@abc.abstractmethod
def make_label(descriptor):
"""Create a label for the given data descriptor. This label will be
used to display the loaded data in the front end.
:param descriptor: Describes the data and is used to download them.
:return The label (not necessarily unique) to the data."""
pass
@classmethod
def create_redis_entry(cls, task_id: str, file_path: str,
descriptor: dict, data_type: str) -> None:
"""Creates an entry in Redis that reflects all meta data surrounding the
downloaded data. E.g. data type, file system location, ...
:param task_id: Id associated with the loaded data.
:param file_path: Location of the data on the file system
:param descriptor: Describes the data and is used to download them
:param data_type: The fractalis internal data type of the loaded data
:param file_path: Location of the data on the file system.
:param descriptor: Describes the data and is used to download them.
:param data_type: The fractalis internal data type of the loaded data.
"""
try:
label = descriptor['label']
except KeyError:
label = str(descriptor)
data_state = {
'task_id': task_id,
'file_path': file_path,
'label': label,
'label': cls.make_label(descriptor),
'descriptor': descriptor,
'data_type': data_type,
'loaded': False
......@@ -88,8 +93,7 @@ class ETLHandler(metaclass=abc.ABCMeta):
for descriptor in descriptors:
task_id = str(uuid4())
file_path = os.path.join(data_dir, task_id)
etl = ETL.factory(handler=self._handler,
data_type=descriptor['data_type'])
etl = ETL.factory(handler=self._handler, descriptor=descriptor)
self.create_redis_entry(task_id, file_path,
descriptor, etl.produces)
kwargs = dict(server=self._server, token=self._token,
......
......@@ -10,7 +10,7 @@ def make_cookie(token: str) -> dict:
def get_field(server: str, data_set: str,
cookie: dict, projections: List[str]) -> dict:
cookie: dict, projections: List[str]) -> List[dict]:
r = requests.get(url='{}/studies/records/findCustom'.format(server),
headers={'Accept': 'application/json'},
params={
......@@ -29,21 +29,6 @@ def get_field(server: str, data_set: str,
return field_data
def get_dictionary(server: str, data_set: str,
descriptor: dict, cookie: dict) -> dict:
r = requests.get(url='{}/studies/dictionary/get/{}'
.format(server, descriptor['projection']),
headers={'Accept': 'application/json'},
params={'dataSet': data_set},
cookies=cookie)
if r.status_code != 200:
dictionary = None
pass
else:
dictionary = r.json()
return dictionary
def prepare_ids(data: List[dict]) -> List[dict]:
new_data = []
for row in data:
......@@ -54,14 +39,14 @@ def prepare_ids(data: List[dict]) -> List[dict]:
return new_data
def name_to_label(data: List[dict], dictionary: dict) -> List[dict]:
def name_to_label(data: List[dict], descriptor: dict) -> List[dict]:
try:
label = dictionary['label']
label = descriptor['dictionary']['label']
except (KeyError, TypeError):
return data
for row in data:
if dictionary['name'] in row:
value = row[dictionary['name']]
del row[dictionary['name']]
if descriptor['dictionary']['name'] in row:
value = row[descriptor['dictionary']['name']]
del row[descriptor['dictionary']['name']]
row[label] = value
return data
\ No newline at end of file
"""Provides BooleanETL for Ada."""
from typing import List
from pandas import DataFrame
from fractalis.data.etl import ETL
......@@ -10,26 +12,25 @@ class BooleanETL(ETL):
"""DateETL implements support for Adas 'Boolean' type"""
name = 'ada_boolean_etl'
_handler = 'ada'
_accepts = 'Boolean'
produces = 'categorical'
def extract(self, server: str,
token: str, descriptor: dict) -> dict:
@staticmethod
def can_handle(handler, descriptor):
return handler == 'ada' and \
descriptor['dictionary']['fieldType'] and \
descriptor['dictionary']['fieldType'] == 'Boolean'
def extract(self, server: str, token: str, descriptor: dict) -> List[dict]:
data_set = descriptor['data_set']
projections = ['_id']
projections += [descriptor['projection']]
projections += [descriptor['dictionary']['projection']]
cookie = common.make_cookie(token=token)
data = common.get_field(server=server, data_set=data_set,
cookie=cookie, projections=projections)
dictionary = common.get_dictionary(server=server, data_set=data_set,
descriptor=descriptor, cookie=cookie)
return {'data': data, 'dictionary': dictionary}
def transform(self, raw_data) -> DataFrame:
data = raw_data['data']
dictionary = raw_data['dictionary']
data = common.prepare_ids(data)
data = common.name_to_label(data, dictionary)
return data
def transform(self, raw_data: List[dict], descriptor: dict) -> DataFrame:
data = common.prepare_ids(raw_data)
data = common.name_to_label(data, descriptor)
data_frame = DataFrame(data)
return data_frame
"""Provides DateETL for Ada."""
from typing import List
from pandas import DataFrame
from fractalis.data.etl import ETL
......@@ -10,26 +12,25 @@ class DateETL(ETL):
"""DateETL implements support for Adas 'Date' type"""
name = 'ada_date_etl'
_handler = 'ada'
_accepts = 'Date'
produces = 'numerical'
def extract(self, server: str,
token: str, descriptor: dict) -> dict:
@staticmethod
def can_handle(handler, descriptor):
return handler == 'ada' and \
descriptor['dictionary']['fieldType'] and \
descriptor['dictionary']['fieldType'] == 'Date'
def extract(self, server: str, token: str, descriptor: dict) -> List[dict]:
data_set = descriptor['data_set']
projections = ['_id']
projections += [descriptor['projection']]
projections += [descriptor['dictionary']['projection']]
cookie = common.make_cookie(token=token)
data = common.get_field(server=server, data_set=data_set,
cookie=cookie, projections=projections)
dictionary = common.get_dictionary(server=server, data_set=data_set,
descriptor=descriptor, cookie=cookie)
return {'data': data, 'dictionary': dictionary}
def transform(self, raw_data) -> DataFrame:
data = raw_data['data']
dictionary = raw_data['dictionary']
data = common.prepare_ids(data)
data = common.name_to_label(data, dictionary)
return data
def transform(self, raw_data: List[dict], descriptor: dict) -> DataFrame:
data = common.prepare_ids(raw_data)
data = common.name_to_label(data, descriptor)
data_frame = DataFrame(data)
return data_frame
"""Provides DoubleETL for Ada."""
from typing import List
from pandas import DataFrame
from fractalis.data.etl import ETL
......@@ -10,26 +12,26 @@ class DoubleETL(ETL):
"""DoubleETL implements support for Adas 'Enum' type"""
name = 'ada_double_etl'
_handler = 'ada'
_accepts = 'Double'
produces = 'numerical'
def extract(self, server: str,
token: str, descriptor: dict) -> dict:
@staticmethod
def can_handle(handler, descriptor):
return handler == 'ada' and \
descriptor['dictionary']['fieldType'] and \
descriptor['dictionary']['fieldType'] == 'Double'
def extract(self, server: str, token: str, descriptor: dict) -> List[dict]:
data_set = descriptor['data_set']
projections = ['_id']
projections += [descriptor['projection']]
projections += [descriptor['dictionary']['projection']]
cookie = common.make_cookie(token=token)
data = common.get_field(server=server, data_set=data_set,
cookie=cookie, projections=projections)
dictionary = common.get_dictionary(server=server, data_set=data_set,
descriptor=descriptor, cookie=cookie)
return {'data': data, 'dictionary': dictionary}
def transform(self, raw_data) -> DataFrame:
data = raw_data['data']
dictionary = raw_data['dictionary']
data = common.prepare_ids(data)
data = common.name_to_label(data, dictionary)
return data
def transform(self, raw_data: List[dict], descriptor: dict) -> DataFrame:
data = common.prepare_ids(raw_data)
data = common.name_to_label(data, descriptor)
data_frame = DataFrame(data)
return data_frame
"""Provides EnumETL for Ada."""
from typing import List
from pandas import DataFrame
from fractalis.data.etl import ETL
......@@ -10,31 +12,29 @@ class EnumETL(ETL):
"""EnumETL implements support for Adas 'Enum' type"""
name = 'ada_enum_etl'
_handler = 'ada'
_accepts = 'Enum'
produces = 'categorical'
def extract(self, server: str,
token: str, descriptor: dict) -> dict:
@staticmethod
def can_handle(handler, descriptor):
return handler == 'ada' and \
descriptor['dictionary']['fieldType'] and \
descriptor['dictionary']['fieldType'] == 'Enum'
def extract(self, server: str, token: str, descriptor: dict) -> List[dict]:
data_set = descriptor['data_set']
projections = ['_id']
projections += [descriptor['projection']]
projections += [descriptor['dictionary']['projection']]
cookie = common.make_cookie(token=token)
data = common.get_field(server=server, data_set=data_set,
cookie=cookie, projections=projections)
dictionary = common.get_dictionary(server=server, data_set=data_set,
descriptor=descriptor, cookie=cookie)
return {'data': data, 'dictionary': dictionary}
def transform(self, raw_data) -> DataFrame:
data = raw_data['data']
dictionary = raw_data['dictionary']
data = common.prepare_ids(data)
if dictionary is not None:
for row in data:
value = row[dictionary['name']]
value = dictionary['numValues'][str(value)]
row[dictionary['name']] = value
data = common.name_to_label(data, dictionary)
return data
def transform(self, raw_data: List[dict], descriptor: dict) -> DataFrame:
data = common.prepare_ids(raw_data)
for row in data:
value = row[descriptor['dictionary']['name']]
value = descriptor['dictionary']['numValues'][str(value)]
row[descriptor['dictionary']['name']] = value
data = common.name_to_label(data, descriptor)
data_frame = DataFrame(data)
return data_frame
"""Provides IntegerETL for Ada."""
from typing import List
from pandas import DataFrame
from fractalis.data.etl import ETL
......@@ -10,26 +12,25 @@ class IntegerETL(ETL):
"""IntegerETL implements support for Adas 'Integer' type"""
name = 'ada_integer_etl'
_handler = 'ada'
_accepts = 'Integer'
produces = 'numerical'
def extract(self, server: str,
token: str, descriptor: dict) -> dict:
@staticmethod
def can_handle(handler, descriptor):
return handler == 'ada' and \
descriptor['dictionary']['fieldType'] and \
descriptor['dictionary']['fieldType'] == 'Integer'
def extract(self, server: str, token: str, descriptor: dict) -> List[dict]:
data_set = descriptor['data_set']
projections = ['_id']
projections += [descriptor['projection']]
projections += [descriptor['dictionary']['projection']]
cookie = common.make_cookie(token=token)
data = common.get_field(server=server, data_set=data_set,
cookie=cookie, projections=projections)
dictionary = common.get_dictionary(server=server, data_set=data_set,
descriptor=descriptor, cookie=cookie)
return {'data': data, 'dictionary': dictionary}
def transform(self, raw_data) -> DataFrame:
data = raw_data['data']
dictionary = raw_data['dictionary']
data = common.prepare_ids(data)
data = common.name_to_label(data, dictionary)
return data
def transform(self, raw_data: List[dict], descriptor: dict) -> DataFrame:
data = common.prepare_ids(raw_data)
data = common.name_to_label(data, descriptor)
data_frame = DataFrame(data)
return data_frame
"""Provides StringETL for Ada."""
from typing import List
from pandas import DataFrame
from fractalis.data.etl import ETL
......@@ -10,26 +12,25 @@ class StringETL(ETL):
"""StringETL implements support for Adas 'String' type"""
name = 'ada_string_etl'
_handler = 'ada'
_accepts = 'String'
produces = 'categorical'
def extract(self, server: str,
token: str, descriptor: dict) -> dict:
@staticmethod
def can_handle(handler, descriptor):
return handler == 'ada' and \
descriptor['dictionary']['fieldType'] and \
descriptor['dictionary']['fieldType'] == 'String'
def extract(self, server: str, token: str, descriptor: dict) -> List[dict]:
data_set = descriptor['data_set']
projections = ['_id']
projections += [descriptor['projection']]
projections += [descriptor['dictionary']['projection']]
cookie = common.make_cookie(token=token)
data = common.get_field(server=server, data_set=data_set,
cookie=cookie, projections=projections)
dictionary = common.get_dictionary(server=server, data_set=data_set,
descriptor=descriptor, cookie=cookie)
return {'data': data, 'dictionary': dictionary}
def transform(self, raw_data) -> DataFrame:
data = raw_data['data']
dictionary = raw_data['dictionary']
data = common.prepare_ids(data)
data = common.name_to_label(data, dictionary)
return data
def transform(self, raw_data: List[dict], descriptor: dict) -> DataFrame:
data = common.prepare_ids(raw_data)
data = common.name_to_label(data, descriptor)
data_frame = DataFrame(data)
return data_frame
......@@ -17,7 +17,12 @@ class AdaHandler(ETLHandler):
_handler = 'ada'
def _heartbeat(self):
pass
raise NotImplementedError()
@staticmethod
def make_label(descriptor):
return '{} ({})'.format(descriptor['dictionary']['label'],
descriptor['data_set'])
def _get_token_for_credentials(self, server: str,
user: str, passwd: str) -> str:
......
......@@ -9,10 +9,14 @@ from fractalis.data.etl import ETL
class RandomDFETL(ETL):
name = 'test_randomdf_task'
_handler = 'test'
_accepts = 'default'
produces = 'something'
@staticmethod
def can_handle(handler, descriptor):
return handler == 'test' and \
descriptor['data_type'] and \
descriptor['data_type'] == 'default'
def extract(self, server, token, descriptor):
if 'fail' in token:
raise Exception('Throwing because I was told to.')
......@@ -20,6 +24,6 @@ class RandomDFETL(ETL):
time.sleep(0.5)
return fake_raw_data
def transform(self, raw_data):
def transform(self, raw_data, descriptor):
fake_df = pd.DataFrame(raw_data)
return fake_df
......@@ -8,6 +8,10 @@ class TestHandler(ETLHandler):
def _heartbeat(self):
pass
@staticmethod
def make_label(descriptor):
pass
def _get_token_for_credentials(self, server: str,
user: str, passwd: str) -> str:
pass
......@@ -16,11 +16,7 @@ create_data_schema = {
"type": "array",
"items": {
"type": "object",
"properties": {
"data_type": {"type": "string"},
},
"minProperties": 2,
"required": ["data_type"]
"minProperties": 1
}
}
},
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment