etl.py 7.5 KB
Newer Older
Sascha Herzinger's avatar
Sascha Herzinger committed
1
2
"""This module provides the ETL class"""

3
import abc
4
import json
5
import logging
6
import os
7

8
from Crypto.Cipher import AES
9
from celery import Task
Sascha Herzinger's avatar
Sascha Herzinger committed
10
11
from pandas import DataFrame

Sascha Herzinger's avatar
Sascha Herzinger committed
12
from fractalis import app, redis
13
from fractalis.utils import get_cache_encrypt_key
14

15
16
17
logger = logging.getLogger(__name__)


18
class ETL(Task, metaclass=abc.ABCMeta):
Sascha Herzinger's avatar
Sascha Herzinger committed
19
20
21
22
23
24
    """This is an abstract class that implements a celery Task and provides a
    factory method to create instances of implementations of itself. Its main
    purpose is to  manage extraction of the data from the target server. ETL
    stands for (E)xtract (T)ransform (L)oad and not by coincidence similar named
    methods can be found in this class.
    """
25
26
27

    @property
    @abc.abstractmethod
Sascha Herzinger's avatar
Sascha Herzinger committed
28
29
    def name(self) -> str:
        """Used by celery to identify this task by name."""
30
        pass
31

32
33
34
35
36
37
38
39
    @property
    @abc.abstractmethod
    def produces(self) -> str:
        """This specifies the fractalis internal format that this ETL
        produces. Can be one of: ['categorical', 'numerical']
        """
        pass

40
41
42
    @staticmethod
    @abc.abstractmethod
    def can_handle(handler: str, descriptor: dict) -> bool:
Sascha Herzinger's avatar
Sascha Herzinger committed
43
44
        """Check if the current implementation of ETL can handle given handler
        and data type.
45
46
47
        WARNING: You should never raise an Exception here and expect it to be
        propagated further up. It will be caught and assumed that the
        current ETL cannot handle the given arguments.
Sascha Herzinger's avatar
Sascha Herzinger committed
48
        :param handler: Describes the handler. E.g.: transmart, ada
49
        :param descriptor: Describes the data that we want to download.
Sascha Herzinger's avatar
Sascha Herzinger committed
50
51
        :return: True if implementation can handle given parameters.
        """
52
        pass
53
54

    @classmethod
55
    def factory(cls, handler: str, descriptor: dict) -> 'ETL':
56
57
        """Return an instance of the implementation ETL that can handle the
        given parameters.
Sascha Herzinger's avatar
Sascha Herzinger committed
58
        :param handler: Describes the handler. E.g.: transmart, ada
59
        :param descriptor: Describes the data that we want to download.
Sascha Herzinger's avatar
Sascha Herzinger committed
60
        :return: An instance of an implementation of ETL that returns True for
61
        can_handle()
Sascha Herzinger's avatar
Sascha Herzinger committed
62
        """
63
        from . import ETL_REGISTRY
64
        for ETL_TASK in ETL_REGISTRY:
65
66
67
68
69
70
71
72
73
74
75
76
            # noinspection PyBroadException
            try:
                if ETL_TASK.can_handle(handler, descriptor):
                    return ETL_TASK()
            except Exception as e:
                logger.warning("Caught exception and assumed that ETL '{}' "
                               "cannot handle handler '{}' and descriptor: '{}'"
                               " Exception:'{}'".format(type(ETL_TASK).__name__,
                                                        handler,
                                                        str(descriptor), e))
                continue

77
        raise NotImplementedError(
78
79
            "No ETL implementation found for handler '{}' "
            "and descriptor '{}'".format(handler, descriptor))
80
81

    @abc.abstractmethod
Sascha Herzinger's avatar
Sascha Herzinger committed
82
83
84
85
86
87
88
    def extract(self, server: str, token: str, descriptor: dict) -> object:
        """Extract the data via HTTP requests.
        :param server: The server from which to extract from.
        :param token: The token used for authentication.
        :param descriptor: The descriptor containing all necessary information
        to download the data.
        """
89
        pass
90
91

    @abc.abstractmethod
92
    def transform(self, raw_data: object, descriptor: dict) -> DataFrame:
Sascha Herzinger's avatar
Sascha Herzinger committed
93
94
        """Transform the data into a pandas.DataFrame with a naming according to
        the Fractalis standard format.
95
96
97
        :param raw_data: The return value of extract().
        :param descriptor: The data descriptor, sometimes needed
        for transformation
Sascha Herzinger's avatar
Sascha Herzinger committed
98
        """
99
100
        pass

101
102
103
104
105
106
    def update_redis(self, data_frame):
        """Set redis entry to 'loaded' state to indicate that the user has
        has read access. At this step we also set several meta information
        that can be used for preview functionality that do not require all
        data to be loaded.
        :param data_frame: The extracted and transformed data.
Sascha Herzinger's avatar
Sascha Herzinger committed
107
        """
108
        value = redis.get(name='data:{}'.format(self.request.id))
109
        assert value is not None
110
        data_state = json.loads(value)
111
112
113
        features = data_frame.get('feature')
        if features:
            features = features.unique().tolist()
114
        data_state['loaded'] = True
115
        data_state['meta']['features'] = features
Sascha Herzinger's avatar
Sascha Herzinger committed
116
117
118
        redis.setex(name='data:{}'.format(self.request.id),
                    value=json.dumps(data_state),
                    time=app.config['FRACTALIS_CACHE_EXP'])
119

120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
    def secure_load(self, data_frame: DataFrame, file_path: str) -> None:
        """For the paranoid. Save data encrypted to the file system.
        Note from the dev: If someone has direct access to your FS an
        unencrypted cache will be one of your least worries.
        :param data_frame: DataFrame to write.
        :param file_path: File to write to.
        """
        os.makedirs(os.path.dirname(file_path), exist_ok=True)
        data = data_frame.to_json()
        data = data.encode('utf-8')
        key = get_cache_encrypt_key(app.config['SECRET_KEY'])
        cipher = AES.new(key, AES.MODE_EAX)
        ciphertext, tag = cipher.encrypt_and_digest(data)
        with open(file_path, 'wb') as f:
            [f.write(x) for x in (cipher.nonce, tag, ciphertext)]

    def load(self, data_frame: DataFrame, file_path: str) -> None:
        """Load (save) the data to the file system.
        :param data_frame: DataFrame to write.
        :param file_path: File to write to.
        """
        os.makedirs(os.path.dirname(file_path), exist_ok=True)
        data_frame.to_csv(file_path, index=False)

144
    def run(self, server: str, token: str,
145
146
            descriptor: dict, file_path: str,
            encrypt: bool) -> None:
147
        """Run extract, transform and load. This is called by the celery worker.
Sascha Herzinger's avatar
Sascha Herzinger committed
148
        This is called by the celery worker.
149
        :param
Sascha Herzinger's avatar
Sascha Herzinger committed
150
151
        :param server: The server on which the data are located.
        :param token: The token used for authentication.
152
        :param descriptor: Contains all necessary information to download data
153
        :param file_path: The location where the data will be stored
154
        :param encrypt: Whether or not the data should be encrypted.
155
        :return: The data id. Used to access the associated redis entry later on
Sascha Herzinger's avatar
Sascha Herzinger committed
156
        """
157
158
        logger.info("Starting ETL process ...")
        logger.info("(E)xtracting data from server '{}'.".format(server))
159
160
161
162
        try:
            raw_data = self.extract(server, token, descriptor)
        except Exception as e:
            logger.exception(e)
163
            raise RuntimeError("Data extraction failed. {}".format(e))
164
        logger.info("(T)ransforming data to Fractalis format.")
165
166
167
168
        try:
            data_frame = self.transform(raw_data, descriptor)
        except Exception as e:
            logger.exception(e)
169
            raise RuntimeError("Data transformation failed. {}".format(e))
Sascha Herzinger's avatar
Sascha Herzinger committed
170
        if not isinstance(data_frame, DataFrame):
171
172
173
174
            error = "transform() must return 'pandas.DataFrame', " \
                    "but returned '{}' instead.".format(type(data_frame))
            logging.error(error, exc_info=1)
            raise TypeError(error)
175
        try:
176
177
178
179
            if encrypt:
                self.secure_load(data_frame, file_path)
            else:
                self.load(data_frame, file_path)
180
            self.update_redis(data_frame)
181
182
        except Exception as e:
            logger.exception(e)
183
            raise RuntimeError("Data loading failed. {}".format(e))