Commit 2f96fcf5 authored by Sascha Herzinger's avatar Sascha Herzinger
Browse files

moving etlhandler and etl one dir up

parent 00d8c304
......@@ -7,7 +7,7 @@ from celery import Celery
from fractalis.utils import list_classes_with_base_class
from fractalis.utils import import_module_by_abs_path
from fractalis.data.etls.etl import ETL
from fractalis.data.etl import ETL
from fractalis.analytics.job import AnalyticsJob
......
from fractalis.utils import list_classes_with_base_class
from .etlhandler import ETLHandler
from .etl import ETL
HANDLER_REGISTRY = list_classes_with_base_class('fractalis.data.etls',
ETLHandler)
ETL_REGISTRY = list_classes_with_base_class('fractalis.data.etls',
ETL)
......@@ -2,7 +2,7 @@ import json
from flask import Blueprint, session, request, jsonify
from .etls.etlhandler import ETLHandler
from .etlhandler import ETLHandler
from .schema import create_data_schema
from fractalis.validator import validate_json, validate_schema
from fractalis.celery import app as celery
......
import os
import abc
import json
from hashlib import sha256
from uuid import uuid4
from fractalis.data.etl import ETL
from fractalis import app
from fractalis import redis
class ETLHandler(metaclass=abc.ABCMeta):
@property
@abc.abstractmethod
def _HANDLER(self):
pass
def __init__(self, server, token):
self._server = server
self._token = token
@staticmethod
def compute_data_id(server, descriptor):
descriptor_str = json.dumps(descriptor, sort_keys=True)
to_hash = '{}|{}'.format(server, descriptor_str).encode('utf-8')
hash_key = sha256(to_hash).hexdigest()
return hash_key
def handle(self, descriptors):
data_ids = []
for descriptor in descriptors:
data_id = self.compute_data_id(self._server, descriptor)
tmp_dir = app.config['FRACTALIS_TMP_DIR']
data_dir = os.path.join(tmp_dir, 'data')
os.makedirs(data_dir, exist_ok=True)
value = redis.hget('data', key=data_id)
if value:
file_path = json.loads(value.decode('utf-8'))
else:
file_name = str(uuid4())
file_path = os.path.join(data_dir, file_name)
etl = ETL.factory(handler=self._HANDLER,
data_type=descriptor['data_type'])
async_result = etl.delay(server=self._server,
token=self._token,
descriptor=descriptor,
file_path=file_path)
data_obj = {'file_path': file_path, 'job_id': async_result.id}
redis.hset(name='data', key=data_id, value=json.dumps(data_obj))
data_ids.append(data_id)
return data_ids
@classmethod
def factory(cls, handler, server, token):
from . import HANDLER_REGISTRY
for Handler in HANDLER_REGISTRY:
if Handler.can_handle(handler):
return Handler(server, token)
raise NotImplementedError(
"No ETLHandler implementation found for: '{}'".format(handler))
@classmethod
def can_handle(cls, handler):
return handler == cls._HANDLER
@abc.abstractmethod
def _heartbeat(self):
pass
from fractalis.utils import list_classes_with_base_class
from .etlhandler import ETLHandler
from .etl import ETL
HANDLER_REGISTRY = list_classes_with_base_class('fractalis.data.etls',
ETLHandler)
ETL_REGISTRY = list_classes_with_base_class('fractalis.data.etls',
ETL)
......@@ -4,7 +4,7 @@ import json
from hashlib import sha256
from uuid import uuid4
from fractalis.data.etls.etl import ETL
from fractalis.data.etl import ETL
from fractalis import app
from fractalis import redis
......
import pandas as pd
import numpy as np
from fractalis.data.etls.etl import ETL
from fractalis.data.etl import ETL
class BarETL(ETL):
......
import pandas as pd
import numpy as np
from fractalis.data.etls.etl import ETL
from fractalis.data.etl import ETL
class FooETL(ETL):
......
from fractalis.data.etls.etlhandler import ETLHandler
from fractalis.data.etlhandler import ETLHandler
class TestHandler(ETLHandler):
......
from fractalis.data.etls.etlhandler import ETLHandler
from fractalis.data.etlhandler import ETLHandler
class TransmartHandler(ETLHandler):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment