Commit 018548e8 authored by Sascha Herzinger's avatar Sascha Herzinger
Browse files

Implemented integrity checks for ETLs

parent 6a4d09f2
Pipeline #2275 failed with stage
in 43 seconds
"""Module containing the Celery Task for the Correlation Analysis."""
import logging
from typing import List, TypeVar
import pandas as pd
......@@ -11,6 +12,7 @@ from fractalis.analytics.tasks.shared.utils import \
apply_subsets, apply_categories
logger = logging.getLogger(__name__)
T = TypeVar('T')
......@@ -36,10 +38,12 @@ class CorrelationTask(AnalyticTask):
:param categories: List of DataFrames that categorise the data points.
:return: corr. coef., p-value and other useful values.
"""
if x.shape[0] == 0 or y.shape[0] == 0:
raise ValueError("X or Y contain no data.")
if x.shape[1] < 2 or y.shape[1] < 2:
raise ValueError("X or Y are malformed.")
if len(x['feature'].unique().tolist()) != 1 \
or len(y['feature'].unique().tolist()) != 1:
error = "Input is invalid. Please make sure that the two " \
"variables to compare have exactly one dimension, each."
logger.error(error)
raise ValueError(error)
if method not in ['pearson', 'spearman', 'kendall']:
raise ValueError("Unknown method '{}'".format(method))
......
from fractalis.utils import list_classes_with_base_class
from .etlhandler import ETLHandler
from .etl import ETL
from .check import IntegrityCheck
HANDLER_REGISTRY = list_classes_with_base_class('fractalis.data.etls',
ETLHandler)
ETL_REGISTRY = list_classes_with_base_class('fractalis.data.etls',
ETL)
CHECK_REGISTRY = list_classes_with_base_class('fractalis.data',
IntegrityCheck)
"""This module provides an abstract class for testing whether or not the
ETLs produce valid fractalis standard data."""
import abc
import logging
logger = logging.getLogger(__name__)
class IntegrityCheck(metaclass=abc.ABCMeta):
"""This is an abstract class that provides can be called directly"""
@property
@abc.abstractmethod
def data_type(self) -> str:
"""Specifies a fractalis internal data type."""
pass
@classmethod
def can_handle(cls, data_type: str) -> bool:
"""Test if this checker is responsible for the given data type.
:param data_type: The data type that the ETL attempts to load.
:return: True if this checker can handle the type.
"""
return cls.data_type == data_type
@staticmethod
def factory(data_type: str) -> 'IntegrityCheck':
"""A factory that returns a checker object for a given data_type.
:param data_type: Data type that one wants to test against.
:return: An instance of IntegrityCheck
"""
from . import CHECK_REGISTRY
for Check in CHECK_REGISTRY:
if Check.can_handle(data_type):
return Check()
error = "No IntegrityCheck implementation found " \
"for data type '{}'".format(data_type)
logger.error(error)
raise NotImplementedError(error)
@abc.abstractmethod
def check(self, data: object) -> None:
"""Raise if the data have an invalid format. This is okay because
there is no reason for the ETL to continue otherwise.
:param data: The data to check.
"""
pass
......@@ -10,6 +10,7 @@ from celery import Task
from pandas import DataFrame
from fractalis import app, redis
from fractalis.data.check import IntegrityCheck
from fractalis.utils import get_cache_encrypt_key
logger = logging.getLogger(__name__)
......@@ -51,8 +52,8 @@ class ETL(Task, metaclass=abc.ABCMeta):
"""
pass
@classmethod
def factory(cls, handler: str, descriptor: dict) -> 'ETL':
@staticmethod
def factory(handler: str, descriptor: dict) -> 'ETL':
"""Return an instance of the implementation ETL that can handle the
given parameters.
:param handler: Describes the handler. E.g.: transmart, ada
......@@ -61,15 +62,15 @@ class ETL(Task, metaclass=abc.ABCMeta):
can_handle()
"""
from . import ETL_REGISTRY
for ETL_TASK in ETL_REGISTRY:
for ETLTask in ETL_REGISTRY:
# noinspection PyBroadException
try:
if ETL_TASK.can_handle(handler, descriptor):
return ETL_TASK()
if ETLTask.can_handle(handler, descriptor):
return ETLTask()
except Exception as e:
logger.warning("Caught exception and assumed that ETL '{}' "
"cannot handle handler '{}' and descriptor: '{}'"
" Exception:'{}'".format(type(ETL_TASK).__name__,
" Exception:'{}'".format(type(ETLTask).__name__,
handler,
str(descriptor), e))
continue
......@@ -98,7 +99,7 @@ class ETL(Task, metaclass=abc.ABCMeta):
"""
pass
def update_redis(self, data_frame):
def update_redis(self, data_frame: DataFrame) -> None:
"""Set redis entry to 'loaded' state to indicate that the user has
has read access. At this step we also set several meta information
that can be used for preview functionality that do not require all
......@@ -118,7 +119,8 @@ class ETL(Task, metaclass=abc.ABCMeta):
value=json.dumps(data_state),
time=app.config['FRACTALIS_CACHE_EXP'])
def secure_load(self, data_frame: DataFrame, file_path: str) -> None:
@staticmethod
def secure_load(data_frame: DataFrame, file_path: str) -> None:
"""For the paranoid. Save data encrypted to the file system.
Note from the dev: If someone has direct access to your FS an
unencrypted cache will be one of your least worries.
......@@ -134,7 +136,8 @@ class ETL(Task, metaclass=abc.ABCMeta):
with open(file_path, 'wb') as f:
[f.write(x) for x in (cipher.nonce, tag, ciphertext)]
def load(self, data_frame: DataFrame, file_path: str) -> None:
@staticmethod
def load(data_frame: DataFrame, file_path: str) -> None:
"""Load (save) the data to the file system.
:param data_frame: DataFrame to write.
:param file_path: File to write to.
......@@ -165,6 +168,8 @@ class ETL(Task, metaclass=abc.ABCMeta):
logger.info("(T)ransforming data to Fractalis format.")
try:
data_frame = self.transform(raw_data, descriptor)
checker = IntegrityCheck.factory(self.produces)
checker.check(data_frame)
except Exception as e:
logger.exception(e)
raise RuntimeError("Data transformation failed. {}".format(e))
......
......@@ -73,8 +73,7 @@ class ETLHandler(metaclass=abc.ABCMeta):
:return The label (not necessarily unique) to the data."""
pass
@classmethod
def create_redis_entry(cls, task_id: str, file_path: str,
def create_redis_entry(self, task_id: str, file_path: str,
descriptor: dict, data_type: str) -> None:
"""Creates an entry in Redis that reflects all meta data surrounding the
downloaded data. E.g. data type, file system location, ...
......@@ -86,7 +85,7 @@ class ETLHandler(metaclass=abc.ABCMeta):
data_state = {
'task_id': task_id,
'file_path': file_path,
'label': cls.make_label(descriptor),
'label': self.make_label(descriptor),
'data_type': data_type,
'meta': {
'descriptor': descriptor,
......@@ -125,8 +124,8 @@ class ETLHandler(metaclass=abc.ABCMeta):
async_result.get(propagate=False)
return task_ids
@classmethod
def factory(cls, handler: str, server: str, auth: dict) -> 'ETLHandler':
@staticmethod
def factory(handler: str, server: str, auth: dict) -> 'ETLHandler':
"""Return an instance of the implementation of ETLHandler that can
handle the given parameters.
:param handler: Describes the handler. E.g.: transmart, ada
......@@ -151,6 +150,5 @@ class ETLHandler(metaclass=abc.ABCMeta):
"""
return handler == cls._handler
@abc.abstractmethod
def _heartbeat(self):
pass
raise NotImplementedError()
......@@ -22,9 +22,6 @@ class AdaHandler(ETLHandler):
_handler = 'ada'
def _heartbeat(self):
raise NotImplementedError()
@staticmethod
def make_label(descriptor: dict) -> str:
return '{} ({})'.format(descriptor['dictionary']['label'],
......
......@@ -9,7 +9,7 @@ from fractalis.data.etl import ETL
class RandomDFETL(ETL):
name = 'test_randomdf_task'
produces = 'something'
produces = 'mock'
@staticmethod
def can_handle(handler, descriptor):
......
......@@ -24,9 +24,6 @@ class TransmartHandler(ETLHandler):
_handler = 'transmart'
def _heartbeat(self):
raise NotImplementedError()
@staticmethod
def make_label(descriptor: dict) -> str:
return descriptor['path']
......
"""This module provides the integrity checks
for the 'categorical' fractalis format."""
import logging
import numpy as np
import pandas as pd
from fractalis.data.check import IntegrityCheck
logger = logging.getLogger(__name__)
class CategoricalIntegrityCheck(IntegrityCheck):
"""Implements IntegrityCheck for 'categorical' data type."""
data_type = 'categorical'
def check(self, data: object) -> None:
if not isinstance(data, pd.DataFrame):
error = "Data must be a pandas.DataFrame."
logger.error(error)
raise ValueError(error)
if sorted(['id', 'feature', 'value']) != sorted(data.columns.tolist()):
error = "Data frame must contain the columns " \
"'id', 'feature', and 'value'."
logger.error(error)
raise ValueError(error)
if data['id'].dtype != np.object:
error = "'id' column must be of type 'object' ('string')."
logger.error(error)
raise ValueError(error)
if data['feature'].dtype != np.object:
error = "'feature' column must be of type 'object' ('string')."
logger.error(error)
raise ValueError(error)
if data['value'].dtype != np.object:
error = "'value' column must be of type 'object' ('string')."
logger.error(error)
raise ValueError(error)
if len(data['id'].unique().tolist()) != data.shape[0]:
error = "'id' column must be unique for this data type."
logger.error(error)
raise ValueError(error)
if len(data['feature'].unique().tolist()) != 1:
error = "'feature' column must contain exactly one unique value " \
"for this data type."
logger.error(error)
raise ValueError(error)
"""This module provides the integrity checks
for the 'numerical' fractalis format."""
import logging
import numpy as np
import pandas as pd
from fractalis.data.check import IntegrityCheck
logger = logging.getLogger(__name__)
class NumericalIntegrityCheck(IntegrityCheck):
"""Implements IntegrityCheck for 'numerical' data type."""
data_type = 'numerical'
def check(self, data: object) -> None:
if not isinstance(data, pd.DataFrame):
error = "Data must be a pandas.DataFrame."
logger.error(error)
raise ValueError(error)
if sorted(['id', 'feature', 'value']) != sorted(data.columns.tolist()):
error = "Data frame must contain the columns " \
"'id', 'feature', and 'value'."
logger.error(error)
raise ValueError(error)
if data['id'].dtype != np.object:
error = "'id' column must be of type 'object' ('string')."
logger.error(error)
raise ValueError(error)
if data['feature'].dtype != np.object:
error = "'feature' column must be of type 'object' ('string')."
logger.error(error)
raise ValueError(error)
if data['value'].dtype != np.int \
and data['value'].dtype != np.float:
error = "'value' column must be of type 'np.int' or 'np.float'."
logger.error(error)
raise ValueError(error)
if len(data['id'].unique().tolist()) != data.shape[0]:
error = "'id' column must be unique for this data type."
logger.error(error)
raise ValueError(error)
if len(data['feature'].unique().tolist()) != 1:
error = "'feature' column must contain exactly one unique value " \
"for this data type."
logger.error(error)
raise ValueError(error)
\ No newline at end of file
"""This module provides the integrity checks
for the 'numerical_array' fractalis format."""
import logging
import numpy as np
import pandas as pd
from fractalis.data.check import IntegrityCheck
logger = logging.getLogger(__name__)
class NumericalArrayIntegrityCheck(IntegrityCheck):
"""Implements IntegrityCheck for 'numerical_array' data type."""
data_type = 'numerical_array'
def check(self, data: object) -> None:
if not isinstance(data, pd.DataFrame):
error = "Data must be a pandas.DataFrame."
logger.error(error)
raise ValueError(error)
if sorted(['id', 'feature', 'value']) != sorted(data.columns.tolist()):
error = "Data frame must contain the columns " \
"'id', 'feature', and 'value'."
logger.error(error)
raise ValueError(error)
if data['id'].dtype != np.object:
error = "'id' column must be of type 'object' ('string')."
logger.error(error)
raise ValueError(error)
if data['feature'].dtype != np.object:
error = "'feature' column must be of type 'object' ('string')."
logger.error(error)
raise ValueError(error)
if data['value'].dtype != np.int \
and data['value'].dtype != np.float:
error = "'value' column must be of type 'np.int' or 'np.float'."
logger.error(error)
raise ValueError(error)
if len(pd.unique(data[['id', 'feature']].values)) != data.shape[0]:
error = "Every combination of 'id' and 'feature' must be unique."
logger.error(error)
raise ValueError(error)
\ No newline at end of file
"""This module tests nothing. It's just here for unit test mocking purposes."""
from fractalis.data.check import IntegrityCheck
class MockIntegrityCheck(IntegrityCheck):
data_type = 'mock'
def check(self, data: object) -> None:
pass
\ No newline at end of file
......@@ -233,7 +233,7 @@ class TestData:
assert 'meta' not in data_state
assert data_state['etl_state'] == 'PENDING'
assert not data_state['etl_message']
assert data_state['data_type'] == 'something'
assert data_state['data_type'] == 'mock'
assert not data_state['loaded']
assert 'task_id' in data_state
......@@ -246,7 +246,7 @@ class TestData:
assert 'file_path' not in data_state
assert data_state['etl_state'] == 'SUCCESS'
assert not data_state['etl_message']
assert data_state['data_type'] == 'something'
assert data_state['data_type'] == 'mock'
assert data_state['loaded']
assert 'task_id' in data_state
......@@ -259,7 +259,7 @@ class TestData:
assert 'file_path' not in data_state
assert data_state['etl_state'] == 'FAILURE'
assert data_state['etl_message']
assert data_state['data_type'] == 'something'
assert data_state['data_type'] == 'mock'
assert not data_state['loaded']
assert 'task_id' in data_state
......
"""This module provides tests for the integrity checker
for the 'categorical' data type."""
import pandas as pd
import pytest
from fractalis.data.check import IntegrityCheck
# noinspection PyMissingOrEmptyDocstring,PyMissingTypeHints
class TestCategoricalIntegrityCheck:
checker = IntegrityCheck.factory('categorical')
def test_correct_check_1(self):
df = pd.DataFrame([['1', '2', '3']], columns=['id', 'feature', 'value'])
self.checker.check(df)
def test_correct_check_2(self):
df = pd.DataFrame([['1', '2', '3']], columns=['id', 'feat', 'value'])
with pytest.raises(ValueError) as e:
self.checker.check(df)
assert 'must contain the columns' in e
def test_correct_check_3(self):
df = pd.DataFrame([[1, '2', '3']], columns=['id', 'feature', 'value'])
with pytest.raises(ValueError) as e:
self.checker.check(df)
assert "must be of type 'object'" in e
def test_correct_check_4(self):
df = pd.DataFrame([['1', 2, '3']], columns=['id', 'feature', 'value'])
with pytest.raises(ValueError) as e:
self.checker.check(df)
assert "must be of type 'object'" in e
def test_correct_check_5(self):
df = pd.DataFrame([['1', '2', 3]], columns=['id', 'feature', 'value'])
with pytest.raises(ValueError) as e:
self.checker.check(df)
assert "must be of type 'object'" in e
def test_correct_check_6(self):
df = pd.DataFrame([['1', '2', '3'], ['4', '2', '3']],
columns=['id', 'feature', 'value'])
self.checker.check(df)
def test_correct_check_7(self):
df = pd.DataFrame([['1', '2', '3'], ['4', '4', '3']],
columns=['id', 'feature', 'value'])
with pytest.raises(ValueError) as e:
self.checker.check(df)
assert 'must contain exactly one' in e
def test_correct_check_8(self):
df = pd.DataFrame([['1', '2', '3'], ['1', '2', '3']],
columns=['id', 'feature', 'value'])
with pytest.raises(ValueError) as e:
self.checker.check(df)
assert 'must be unique' in e
"""This module provides tests for the integrity checker
for the 'numerical' data type."""
import pandas as pd
import pytest
from fractalis.data.check import IntegrityCheck
# noinspection PyMissingOrEmptyDocstring,PyMissingTypeHints
class TestNumericalIntegrityCheck:
checker = IntegrityCheck.factory('numerical')
def test_correct_check_1(self):
df = pd.DataFrame([['1', '2', 3]], columns=['id', 'feature', 'value'])
self.checker.check(df)
def test_correct_check_2(self):
df = pd.DataFrame([['1', '2', 3]], columns=['id', 'feat', 'value'])
with pytest.raises(ValueError) as e:
self.checker.check(df)
assert 'must contain the columns' in e
def test_correct_check_3(self):
df = pd.DataFrame([[1, '2', 3]], columns=['id', 'feature', 'value'])
with pytest.raises(ValueError) as e:
self.checker.check(df)
assert "must be of type 'object'" in e
def test_correct_check_4(self):
df = pd.DataFrame([['1', 2, 3]], columns=['id', 'feature', 'value'])
with pytest.raises(ValueError) as e:
self.checker.check(df)
assert "must be of type 'object'" in e
def test_correct_check_5(self):
df = pd.DataFrame([['1', '2', '3']], columns=['id', 'feature', 'value'])
with pytest.raises(ValueError) as e:
self.checker.check(df)
assert "must be of type 'np.int'" in e
def test_correct_check_6(self):
df = pd.DataFrame([['1', '2', 3], ['4', '2', 3]],
columns=['id', 'feature', 'value'])
self.checker.check(df)
def test_correct_check_7(self):
df = pd.DataFrame([['1', '2', 3], ['4', '4', 3]],
columns=['id', 'feature', 'value'])
with pytest.raises(ValueError) as e:
self.checker.check(df)
assert 'must contain exactly one' in e
def test_correct_check_8(self):
df = pd.DataFrame([['1', '2', 3], ['1', '2', 3]],
columns=['id', 'feature', 'value'])
with pytest.raises(ValueError) as e:
self.checker.check(df)
assert 'must be unique' in e
"""This module provides tests for the integrity checker
for the 'numerical_array' data type."""
import pandas as pd
import pytest
from fractalis.data.check import IntegrityCheck
# noinspection PyMissingOrEmptyDocstring,PyMissingTypeHints
class TestNumericalArrayIntegrityCheck:
checker = IntegrityCheck.factory('numerical_array')
def test_correct_check_1(self):
df = pd.DataFrame([['1', '2', 3]], columns=['id', 'feature', 'value'])
self.checker.check(df)
def test_correct_check_2(self):
df = pd.DataFrame([['1', '2', 3]], columns=['id', 'feat', 'value'])
with pytest.raises(ValueError) as e:
self.checker.check(df)
assert 'must contain the columns' in e
def test_correct_check_3(self):
df = pd.DataFrame([[1, '2', 3]], columns=['id', 'feature', 'value'])
with pytest.raises(ValueError) as e:
self.checker.check(df)
assert "must be of type 'object'" in e
def test_correct_check_4(self):
df = pd.DataFrame([['1', 2, 3]], columns=['id', 'feature', 'value'])
with pytest.raises(ValueError) as e:
self.checker.check(df)
assert "must be of type 'object'" in e
def test_correct_check_5(self):
df = pd.DataFrame([['1', '2', '3']], columns=['id', 'feature', 'value'])
with pytest.raises(ValueError) as e:
self.checker.check(df)
assert "must be of type 'np.int'" in e
def test_correct_check_6(self):
df = pd.DataFrame([['1', '2', 3], ['4', '2', 3]],
columns=['id', 'feature', 'value'])
self.checker.check(df)