etlhandler.py 9.62 KB
Newer Older
Sascha Herzinger's avatar
Sascha Herzinger committed
1
2
"""This module provides the ETLHandler class."""

3
import os
4
import abc
5
6
7
import json
import logging
from uuid import uuid4
8
from typing import List, Union
9

10
from fractalis.cleanup import janitor
11
from fractalis import app, redis, celery
12
13
14
from fractalis.data.etl import ETL


15
16
17
logger = logging.getLogger(__name__)


18
class ETLHandler(metaclass=abc.ABCMeta):
Sascha Herzinger's avatar
Sascha Herzinger committed
19
20
21
22
23
    """This is an abstract class that provides a factory method to create
    instances of implementations of itself. The main purpose of this class
    is the supervision of all ETL processes belonging to this handler. Besides
    that it takes care of authentication business.
    """
24
25
26

    @property
    @abc.abstractmethod
Sascha Herzinger's avatar
Sascha Herzinger committed
27
28
29
30
    def _handler(self) -> str:
        """Used by self.can_handle to check whether the current implementation
        is able to handle the incoming request.
        """
31
32
        pass

Sascha Herzinger's avatar
Sascha Herzinger committed
33
    def _get_token_for_credentials(self, server: str, auth: dict) -> str:
34
35
        """ Authenticate with the server and return a token.
        :param server: The server to authenticate with.
Sascha Herzinger's avatar
Sascha Herzinger committed
36
37
        :param auth: dict containing credentials to auth with API
        :return The token returned by the API.
38
        """
39
        raise NotImplementedError()
40
41

    def __init__(self, server, auth):
Sascha Herzinger's avatar
Sascha Herzinger committed
42
        if not isinstance(server, str) or not server:
Sascha Herzinger's avatar
Sascha Herzinger committed
43
44
45
46
47
48
49
            error = ("{} is not a valid server url.".format(server))
            logger.error(error)
            raise ValueError(error)
        if not isinstance(auth, dict):
            error = "'auth' must be a valid dictionary."
            logger.error(error)
            raise ValueError(error)
50
51
        if server.endswith('/'):
            server = server[:-1]
52
        self._server = server
53
54
55
        # if no token is given we have to get one
        try:
            self._token = auth['token']
Sascha Herzinger's avatar
Sascha Herzinger committed
56
57
            if not isinstance(self._token, str) or len(self._token) == 0:
                raise KeyError
58
        except KeyError:
Sascha Herzinger's avatar
Sascha Herzinger committed
59
60
            logger.info('No token has been provided. '
                        'Attempting to authenticate with the API.')
61
62
63
64
65
            try:
                self._token = self._get_token_for_credentials(server, auth)
            except Exception as e:
                logger.exception(e)
                raise ValueError("Could not authenticate with API.")
66

67
68
    @staticmethod
    @abc.abstractmethod
69
    def make_label(descriptor: dict) -> str:
70
71
72
73
74
75
        """Create a label for the given data descriptor. This label will be
        used to display the loaded data in the front end.
        :param descriptor: Describes the data and is used to download them.
        :return The label (not necessarily unique) to the data."""
        pass

76
    def create_redis_entry(self, task_id: str, file_path: str,
77
                           descriptor: dict, data_type: str) -> None:
78
79
        """Creates an entry in Redis that contains meta information for the
        data that are to be downloaded.
80
        :param task_id: Id associated with the loaded data.
81
82
83
        :param file_path: Location of the data on the file system.
        :param descriptor: Describes the data and is used to download them.
        :param data_type: The fractalis internal data type of the loaded data.
84
85
        """
        data_state = {
86
            'task_id': task_id,
87
            'file_path': file_path,
88
            'label': self.make_label(descriptor),
89
            'data_type': data_type,
90
            'hash': self.descriptor_to_hash(descriptor),
91
92
            'meta': {
                'descriptor': descriptor,
93
            }
94
        }
Sascha Herzinger's avatar
Sascha Herzinger committed
95
96
        redis.setex(name='data:{}'.format(task_id),
                    value=json.dumps(data_state),
Sascha Herzinger's avatar
Sascha Herzinger committed
97
                    time=app.config['FRACTALIS_DATA_LIFETIME'])
98

Sascha Herzinger's avatar
Sascha Herzinger committed
99
    def descriptor_to_hash(self, descriptor: dict) -> int:
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
        """Compute hash for the given descriptor. Used to identify duplicates.
        :param descriptor: ETL descriptor. Used to identify duplicates.
        :return: Unique hash.
        """
        string = '{}-{}-{}'.format(self._server,
                                   self._handler,
                                   str(descriptor))
        hash_value = int.from_bytes(string.encode('utf-8'), 'little')
        return hash_value

    def find_duplicates(self, data_tasks: List[str],
                        descriptor: dict) -> List[str]:
        """Search for duplicates of the given descriptor and return a list
        of associated task ids.
        :param data_tasks: Limit duplicate search to.
        :param descriptor: ETL descriptor. Used to identify duplicates.
        :return: The list of duplicates.
        """
        task_ids = []
        hash_value = self.descriptor_to_hash(descriptor)
        for task_id in data_tasks:
            value = redis.get('data:{}'.format(task_id))
            if value is None:
                continue
            data_state = json.loads(value)
            if hash_value == data_state['hash']:
                task_ids.append(task_id)
        return task_ids

Sascha Herzinger's avatar
Sascha Herzinger committed
129
130
    def remove_duplicates(self, data_tasks: List[str],
                          descriptor: dict) -> None:
131
132
133
134
135
136
137
138
        """Delete the duplicates of the given descriptor from redis and call
        the janitor afterwards to cleanup orphaned files.
        :param data_tasks: Limit duplicate search to.
        :param descriptor: ETL descriptor. Used to identify duplicates.
        """
        task_ids = self.find_duplicates(data_tasks, descriptor)
        for task_id in task_ids:
            redis.delete('data:{}'.format(task_id))
Sascha Herzinger's avatar
#3    
Sascha Herzinger committed
139
140
        if task_ids:
            janitor.delay()
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160

    def find_duplicate_task_id(self, data_tasks: List[str],
                               descriptor: dict) -> Union[str, None]:
        """Search for duplicates of the given descriptor and return their
        task id if the state is SUBMITTED or SUCCESS, meaning the data are
        reusable.
        :param data_tasks: Limit search to this list.
        :param descriptor: ETL descriptor. Used to identify duplicates.
        :return: TaskID if valid duplicate has been found, None otherwise.
        """
        task_ids = self.find_duplicates(data_tasks, descriptor)
        for task_id in task_ids:
            async_result = celery.AsyncResult(task_id)
            if (async_result.state == 'SUBMITTED' or
                    async_result.state == 'SUCCESS'):
                return task_id
        return None

    def handle(self, descriptors: List[dict], data_tasks: List[str],
               use_existing: bool, wait: bool = False) -> List[str]:
161
        """Create instances of ETL for the given descriptors and submit them
162
163
        (ETL implements celery.Task) to the broker. The task ids are returned
        to keep track of them.
Sascha Herzinger's avatar
Sascha Herzinger committed
164
        :param descriptors: A list of items describing the data to download.
165
166
167
168
        :param data_tasks: Limit search for duplicates to this list.
        :param use_existing: If a duplicate with state 'SUBMITTED' or 'SUCCESS'
        already exists use it instead of starting a new ETL. If this is False
        duplicates are deleted!
Sascha Herzinger's avatar
Sascha Herzinger committed
169
170
171
172
        :param wait: Makes this method synchronous by waiting for the tasks to
        return.
        :return: The list of task ids for the submitted tasks.
        """
173
174
        data_dir = os.path.join(app.config['FRACTALIS_TMP_DIR'], 'data')
        task_ids = []
175
        for descriptor in descriptors:
176
177
178
179
            if use_existing:
                task_id = self.find_duplicate_task_id(data_tasks, descriptor)
                if task_id:
                    task_ids.append(task_id)
Sascha Herzinger's avatar
Sascha Herzinger committed
180
                    data_tasks.append(task_id)
181
182
183
                    continue
            else:
                self.remove_duplicates(data_tasks, descriptor)
184
185
            task_id = str(uuid4())
            file_path = os.path.join(data_dir, task_id)
186
            etl = ETL.factory(handler=self._handler, descriptor=descriptor)
187
188
189
            self.create_redis_entry(task_id, file_path,
                                    descriptor, etl.produces)
            kwargs = dict(server=self._server, token=self._token,
190
191
                          descriptor=descriptor, file_path=file_path,
                          encrypt=app.config['FRACTALIS_ENCRYPT_CACHE'])
192
            async_result = etl.apply_async(kwargs=kwargs, task_id=task_id)
193
194
            assert async_result.id == task_id
            task_ids.append(task_id)
Sascha Herzinger's avatar
Sascha Herzinger committed
195
            data_tasks.append(task_id)
196
            if wait and async_result.state == 'SUBMITTED':
197
                logger.debug("'wait' was set. Waiting for tasks to finish ...")
198
                async_result.get(propagate=False)
Sascha Herzinger's avatar
Sascha Herzinger committed
199
        task_ids = list(set(task_ids))
200
        return task_ids
201

202
203
    @staticmethod
    def factory(handler: str, server: str, auth: dict) -> 'ETLHandler':
Sascha Herzinger's avatar
Sascha Herzinger committed
204
205
206
207
        """Return an instance of the implementation of ETLHandler that can
        handle the given parameters.
        :param handler: Describes the handler. E.g.: transmart, ada
        :param server: The server to download data from.
208
        :param auth: Contains credentials to authenticate with the API.
Sascha Herzinger's avatar
Sascha Herzinger committed
209
210
211
        :return: An instance of an implementation of ETLHandler that returns
        True for self.can_handle
        """
212
213
214
        from . import HANDLER_REGISTRY
        for Handler in HANDLER_REGISTRY:
            if Handler.can_handle(handler):
215
                return Handler(server, auth)
216
217
218
219
        raise NotImplementedError(
            "No ETLHandler implementation found for: '{}'".format(handler))

    @classmethod
Sascha Herzinger's avatar
Sascha Herzinger committed
220
221
222
223
224
225
226
    def can_handle(cls, handler: str) -> bool:
        """Check whether this implementation of ETLHandler can handle the given
        parameters.
        :param handler: Describes the handler. E.g.: transmart, ada
        :return: True if this implementation can handle the given parameters.
        """
        return handler == cls._handler
227
228

    def _heartbeat(self):
229
        raise NotImplementedError()