Commit de7dec40 authored by Sascha Herzinger's avatar Sascha Herzinger
Browse files

Simplified the data controller even further.

parent 61cd4347
Pipeline #2088 failed with stage
in 14 minutes and 21 seconds
......@@ -40,13 +40,9 @@ log.info("Creating Redis connection.")
redis = StrictRedis(host=app.config['REDIS_HOST'],
port=app.config['REDIS_PORT'])
# Configure app with composed configurations to save admin some work
app.config['SESSION_REDIS'] = redis
app.config['CELERY_RESULT_BACKEND'] = 'redis://{}:{}'.format(
app.config['REDIS_HOST'], app.config['REDIS_PORT'])
# Set new session interface for app
log.info("Replacing default session interface.")
app.config['SESSION_REDIS'] = redis
Session(app)
# allow everyone to submit requests
......
from fractalis.utils import list_classes_with_base_class
from .job import AnalyticsJob
from fractalis.analytics.task import AnalyticTask
JOB_REGISTRY = list_classes_with_base_class('fractalis.analytics.jobs',
AnalyticsJob)
TASK_REGISTRY = list_classes_with_base_class('fractalis.analytics.tasks',
AnalyticTask)
......@@ -9,8 +9,8 @@ from flask.wrappers import Response
from fractalis import celery
from fractalis.validator import validate_json, validate_schema
from fractalis.analytics.schema import create_job_schema
from fractalis.analytics.job import AnalyticsJob
from fractalis.analytics.schema import create_task_schema
from fractalis.analytics.task import AnalyticTask
analytics_blueprint = Blueprint('analytics_blueprint', __name__)
......@@ -21,76 +21,76 @@ logger = logging.getLogger(__name__)
def prepare_session() -> None:
"""Make sure the session is properly initialized before each request."""
session.permanent = True
if 'jobs' not in session:
logger.debug("Initializing jobs field in session dict.")
session['jobs'] = []
if 'data_ids' not in session:
logger.debug("Initializing data_ids field in session dict.")
session['data_ids'] = []
if 'analytic_tasks' not in session:
logger.debug("Initializing analytic_tasks field in session dict.")
session['analytic_tasks'] = []
if 'data_tasks' not in session:
logger.debug("Initializing data_tasks field in session dict.")
session['data_tasks'] = []
@analytics_blueprint.route('', methods=['POST'])
@validate_json
@validate_schema(create_job_schema)
def create_job() -> Tuple[Response, int]:
"""Create a new analytics job based on the parameters in the POST body.
@validate_schema(create_task_schema)
def create_task() -> Tuple[Response, int]:
"""Create a new analytics task based on the parameters in the POST body.
See doc/api/ for more information.
:return: Flask Response
"""
logger.debug("Received POST request on /analytics.")
json = request.get_json(force=True) # pattern enforced by decorators
analytics_job = AnalyticsJob.factory(json['job_name'])
if analytics_job is None:
logger.error("Could not submit job for unknown job name: "
"'{}'".format(json['job_name']))
return jsonify({'error_msg': "Job with name '{}' not found.".format(
json['job_name'])}), 400
async_result = analytics_job.delay(accessible_data_ids=session['data_ids'],
analytic_task = AnalyticTask.factory(json['task_name'])
if analytic_task is None:
logger.error("Could not submit task for unknown task name: "
"'{}'".format(json['task_name']))
return jsonify({'error_msg': "Task with name '{}' not found."
.format(json['task_name'])}), 400
async_result = analytic_task.delay(data_tasks=session['data_tasks'],
args=json['args'])
session['jobs'].append(async_result.id)
logger.debug("Job successfully submitted. Sending response.")
return jsonify({'job_id': async_result.id}), 201
session['analytic_tasks'].append(async_result.id)
logger.debug("Task successfully submitted. Sending response.")
return jsonify({'task_id': async_result.id}), 201
@analytics_blueprint.route('/<uuid:job_id>', methods=['GET'])
def get_job_details(job_id: UUID) -> Tuple[Response, int]:
"""Get job details for the given job_id.
@analytics_blueprint.route('/<uuid:task_id>', methods=['GET'])
def get_task_details(task_id: UUID) -> Tuple[Response, int]:
"""Get task details for the given task_id.
See doc/api/ for more information.
:param job_id: ID returned on job creation.
:param task_id: ID returned on task creation.
:return: Flask Response
"""
logger.debug("Received GET request on /analytics/job_id.")
logger.debug("Received GET request on /analytics/task_id.")
wait = request.args.get('wait') == '1'
job_id = str(job_id)
if job_id not in session['jobs']:
error = "Job ID '{}' not found in session. " \
"Refusing access.".format(job_id)
task_id = str(task_id)
if task_id not in session['analytic_tasks']:
error = "Task ID '{}' not found in session. " \
"Refusing access.".format(task_id)
logger.warning(error)
return jsonify({'error': error}), 403
async_result = celery.AsyncResult(job_id)
async_result = celery.AsyncResult(task_id)
if wait:
async_result.get(propagate=False) # make job synchronous
logger.debug("Job found and has access. Sending response.")
async_result.get(propagate=False) # make task synchronous
logger.debug("Task found and has access. Sending response.")
return jsonify({'state': async_result.state,
'result': async_result.result}), 200
@analytics_blueprint.route('/<uuid:job_id>', methods=['DELETE'])
def cancel_job(job_id: UUID) -> Tuple[Response, int]:
"""Cancel a job for a given job_id.
@analytics_blueprint.route('/<uuid:task_id>', methods=['DELETE'])
def cancel_task(task_id: UUID) -> Tuple[Response, int]:
"""Cancel a task for a given task_id.
See doc/api/ for more information.
:param job_id: ID returned on job creation.
:param task_id: ID returned on task creation.
:return: Flask Response
"""
logger.debug("Received DELETE request on /analytics/job_id.")
job_id = str(job_id)
if job_id not in session['jobs']:
error = "Job ID '{}' not found in session. " \
"Refusing access.".format(job_id)
logger.debug("Received DELETE request on /analytics/task_id.")
task_id = str(task_id)
if task_id not in session['analytic_tasks']:
error = "Task ID '{}' not found in session. " \
"Refusing access.".format(task_id)
logger.warning(error)
return jsonify({'error': error}), 403
wait = request.args.get('wait') == '1'
# possibly dangerous: http://stackoverflow.com/a/29627549
celery.control.revoke(job_id, terminate=True, signal='SIGUSR1', wait=wait)
celery.control.revoke(task_id, terminate=True, signal='SIGUSR1', wait=wait)
logger.debug("Successfully send term signal to task. Sending response.")
return jsonify(''), 200
create_job_schema = {
create_task_schema = {
"type": "object",
"properties": {
"job_name": {"type": "string", "minLength": 5},
"args": {"type": "object", "minProperties": 1},
},
"required": ["job_name", "args"]
"required": ["task_name", "args"]
}
......@@ -13,7 +13,7 @@ from fractalis import redis
logger = logging.getLogger(__name__)
class AnalyticsJob(Task, metaclass=abc.ABCMeta):
class AnalyticTask(Task, metaclass=abc.ABCMeta):
@property
@abc.abstractmethod
......@@ -21,51 +21,54 @@ class AnalyticsJob(Task, metaclass=abc.ABCMeta):
pass
@staticmethod
def factory(job_name):
from . import JOB_REGISTRY
for job in JOB_REGISTRY:
if job.name == job_name:
return job()
def factory(task_name):
from . import TASK_REGISTRY
for task in TASK_REGISTRY:
if task.name == task_name:
return task()
@abc.abstractmethod
def main(self):
pass
@staticmethod
def prepare_args(accessible_data_ids, args):
def prepare_args(data_tasks, args):
arguments = {}
for arg in args:
value = args[arg]
if (isinstance(value, str) and
value.startswith('$') and value.endswith('$')):
data_id = value[1:-1]
if data_id not in accessible_data_ids:
error = "No permission to use data_id '{}'" \
"for analysis".format(data_id)
data_task_id = value[1:-1]
if data_task_id not in data_tasks:
error = "No permission to use data_task_id '{}'" \
"for analysis".format(data_task_id)
logger.error(error)
raise KeyError(error)
entry = redis.get('data:{}'.format(data_id))
raise PermissionError(error)
entry = redis.get('data:{}'.format(data_task_id))
if not entry:
error = "The key '{}' does not match any entry in Redis. " \
"Value probably expired.".format(data_id)
"Value probably expired.".format(data_task_id)
logger.error(error)
raise LookupError(error)
data_obj = json.loads(entry.decode('utf-8'))
# update 'last_access' internal
data_obj['last_access'] = time.time()
redis.set(name='data:{}'.format(data_id), value=data_obj)
if not data_obj['loaded']:
error = "The data task '{}' has not been loaded, yet." \
"Wait for it to complete before using it in an " \
"analysis task.".format(data_task_id)
logger.error(error)
raise ValueError(error)
file_path = data_obj['file_path']
value = pd.read_csv(file_path)
arguments[arg] = value
return arguments
def run(self, accessible_data_ids, args):
arguments = self.prepare_args(accessible_data_ids, args)
def run(self, data_tasks, args):
arguments = self.prepare_args(data_tasks, args)
result = self.main(**arguments)
try:
if type(result) != dict:
error = "The job '{}' returned an object with type '{}', " \
error = "The task '{}' returned an object with type '{}', " \
"instead of expected type 'dict'."
logger.error(error)
raise ValueError(error)
......
......@@ -2,10 +2,10 @@ import pandas as pd
import numpy as np
from scipy import stats
from fractalis.analytics.job import AnalyticsJob
from fractalis.analytics.task import AnalyticTask
class CorrelationJob(AnalyticsJob):
class CorrelationTask(AnalyticTask):
name = 'compute-correlation'
......
......@@ -7,7 +7,7 @@ from datetime import timedelta
# Flask
SECRET_KEY = str(uuid4()) # set me manually in production
SECRET_KEY = 'OVERWRITE ME IN PRODUCTION!!!'
DEBUG = False
TESTING = False
REDIS_HOST = '127.0.0.1'
......@@ -24,7 +24,9 @@ SESSION_USE_SIGNER = False
# Celery
BROKER_URL = 'amqp://'
CELERY_RESULT_BACKEND = 'redis://{}:{}'.format(REDIS_HOST, REDIS_PORT)
CELERYD_TASK_SOFT_TIME_LIMIT = 60 * 10
CELERY_TASK_RESULT_EXPIRES = timedelta(hours=1)
CELERYD_HIJACK_ROOT_LOGGER = False
# Fractalis
......
"""The /data controller. Please refer to doc/api for more information."""
import json
import time
import logging
from uuid import UUID
from typing import Tuple
from flask import Blueprint, session, request, jsonify
......@@ -13,6 +11,7 @@ from fractalis.data.etlhandler import ETLHandler
from fractalis.data.schema import create_data_schema
from fractalis.validator import validate_json, validate_schema
from fractalis import celery, redis
from fractalis.sync import remove_data
data_blueprint = Blueprint('data_blueprint', __name__)
......@@ -23,21 +22,18 @@ logger = logging.getLogger(__name__)
def prepare_session() -> None:
"""Make sure the session is properly initialized before each request."""
session.permanent = True
if 'jobs' not in session:
logger.debug("Initializing jobs field in session dict.")
session['jobs'] = []
if 'data_ids' not in session:
logger.debug("Initializing data_ids field in session dict.")
session['data_ids'] = []
if 'data_tasks' not in session:
logger.debug("Initializing data_tasks field in session dict.")
session['data_tasks'] = []
@data_blueprint.route('', methods=['POST'])
@validate_json
@validate_schema(create_data_schema)
def create_data_job() -> Tuple[Response, int]:
"""Submit a new ETL task based on the payload of the request body.
def create_data_task() -> Tuple[Response, int]:
"""Submit new ETL tasks based on the payload of the request body.
See doc/api/ for more information.
:return: Flask Response
:return: Empty response. Everything important is stored in the session.
"""
logger.debug("Received POST request on /data.")
wait = request.args.get('wait') == '1'
......@@ -45,126 +41,70 @@ def create_data_job() -> Tuple[Response, int]:
etl_handler = ETLHandler.factory(handler=payload['handler'],
server=payload['server'],
auth=payload['auth'])
job_ids = etl_handler.handle(descriptors=payload['descriptors'], wait=wait)
session['jobs'] += job_ids
session['jobs'] = list(set(session['data_jobs'])) # make unique
logger.debug("Jobs successfully submitted. Sending response.")
return jsonify({'job_ids': job_ids}), 201
@data_blueprint.route('/<uuid:job_id>', methods=['GET'])
def get_data_job_state(job_id: UUID) -> Tuple[Response, int]:
"""Get information for data that matches given job_id. If the job was
successful add the data_id associated with the successful job to the session
for access control and return it.
:param job_id: The id associated with the previously submitted job.
See doc/api/ for more information.
:return: Flask Response
"""
logger.debug("Received GET request on /data/job_id.")
job_id = str(job_id)
wait = request.args.get('wait') == '1'
if job_id not in session['jobs']:
error = "Job ID '{}' not found in session. " \
"Refusing access.".format(job_id)
logger.warning(error)
return jsonify({'error': error}), 403
async_result = celery.AsyncResult(job_id)
if wait:
async_result.get(propagate=False)
if async_result.state == 'SUCCESS':
logger.debug("Job '{}' successful. Adding data_id '{}' "
"to session.".format(job_id, async_result.result))
session['data_ids'] = async_result.result
logger.debug("Job found and has access. Sending response.")
return jsonify({'state': async_result.state,
'result': async_result.result}), 200
@data_blueprint.route('/<string:data_id>', methods=['GET'])
def get_data_by_id(data_id: str) -> Tuple[Response, int]:
"""Given a data id return the related Redis DB entry.
:param data_id: The id returned by the data job submitted by create_data_job
:return: Parsed and modified data entry from Redis.
"""
logger.debug("Received GET request on /data/data_id.")
if data_id not in session['data_ids']:
error = "Data ID '{}' not found in session. " \
"Refusing access.".format(data_id)
logger.warning(error)
return jsonify({'error': error}), 403
value = redis.get('data:{}'.format(data_id))
if not value:
error = "Could not find data entry in Redis for data_id: " \
"'{}'. The entry probably expired.".format(data_id)
logger.warning(error)
return jsonify({'error': error}), 404
data_obj = json.loads(value.decode('utf-8'))
# update 'last_access' internal
data_obj['last_access'] = time.time()
redis.set(name='data:{}'.format(data_id), value=data_obj)
# remove internal information from response
del data_obj['file_path']
del data_obj['last_access']
logger.debug("Data found and has access. Sending response.")
return jsonify({'data_state': data_obj}), 200
task_ids = etl_handler.handle(descriptors=payload['descriptors'], wait=wait)
session['data_tasks'] += task_ids
session['data_tasks'] = list(set(session['data_tasks']))
logger.debug("Tasks successfully submitted. Sending response.")
return jsonify(''), 201
@data_blueprint.route('', methods=['GET'])
def get_all_data_state() -> Tuple[Response, int]:
"""Get information for all data that the current session can access.
def get_all_data() -> Tuple[Response, int]:
"""Get information for all tasks that have been submitted in the lifetime
of the current session.
See doc/api/ for more information.
:return: Flask Response
:return: Information associated with each submitted task
"""
logger.debug("Received GET request on /data.")
wait = request.args.get('wait') == '1'
data_states = []
for data_id in session['data_ids']:
value = redis.get('data:{}'.format(data_id))
for task_id in session['data_tasks']:
async_result = celery.AsyncResult(task_id)
if wait:
logger.debug("'wait' was set. Waiting for tasks to finish ...")
async_result.get(propagate=False)
value = redis.get('data:{}'.format(task_id))
if not value:
error = "Could not find data entry in Redis for data_id: " \
"'{}'. The entry probably expired.".format(data_id)
error = "Could not find data entry in Redis for task_id: " \
"'{}'. The entry probably expired.".format(task_id)
logger.warning(error)
continue
data_obj = json.loads(value.decode('utf-8'))
# update 'last_access' internal
data_obj['last_access'] = time.time()
redis.set(name='data:{}'.format(data_id), value=data_obj)
data_state = json.loads(value.decode('utf-8'))
# remove internal information from response
del data_obj['file_path']
del data_obj['last_access']
data_states.append(data_obj)
del data_state['file_path']
del data_state['last_access']
# add additional information to response
data_state['etl_state'] = async_result.state
data_state['etl_message'] = async_result.result
data_states.append(data_state)
logger.debug("Data states collected. Sending response.")
return jsonify({'data_states': data_states}), 200
@data_blueprint.route('/<string:data_id>', methods=['DELETE'])
def delete_data(data_id: str) -> Tuple[Response, int]:
"""This only deletes data from the session, not Redis or the file system.
This is enough to disable data visibility for the current user, but does not
influence other users of the same data. Fractalis automatically removes
entries that are no longer accessed after a certain period of time.
:param data_id: The id returned by the data job submitted by create_data_job
@data_blueprint.route('/<string:task_id>', methods=['DELETE'])
def delete_data(task_id: str) -> Tuple[Response, int]:
"""Remove all traces of the data associated with the given task id.
:param task_id: The id associated with the data
See doc/api/ for more information.
:return: Flask Response
:return: Empty response.
"""
logger.debug("Received DELETE request on /data/data_id.")
if data_id in session['data_ids']:
session['data_ids'].remove(data_id)
logger.debug("Received DELETE request on /data/task_id.")
if task_id in session['data_tasks']:
session['data_tasks'].remove(task_id)
remove_data.delay(task_id)
logger.debug("Successfully removed data from session. Sending response.")
return jsonify(''), 200
@data_blueprint.route('', methods=['DELETE'])
def delete_all_data() -> Tuple[Response, int]:
"""This only deletes data from the session, not Redis or the file system.
This is enough to disable data visibility for the current user, but does not
influence other users of the same data. Fractalis automatically removes
entries that are no longer accessed after a certain period of time.
See doc/api/ for more information.
:return: Flask Response
"""Remove all traces of all data associated with this session.
:return: Empty response.
"""
logger.debug("Received DELETE request on /data.")
session['data_ids'] = []
for task_id in session['data_tasks']:
remove_data.delay(task_id)
session['data_tasks'] = []
logger.debug("Successfully removed all data from session. "
"Sending response.")
return jsonify(''), 200
......@@ -3,10 +3,8 @@
import os
import abc
import json
import time
import logging
from typing import List
from hashlib import sha256
from celery import Task
from pandas import DataFrame
......@@ -102,65 +100,30 @@ class ETL(Task, metaclass=abc.ABCMeta):
"""
pass
@staticmethod
def load(data_frame: DataFrame, file_path: str) -> None:
def load(self, data_frame: DataFrame, file_path: str) -> None:
"""Load (save) the data to the file system.
:param data_frame: DataFrame to write.
:param file_path: File to write to.
"""
data_frame.to_csv(file_path, index=False)
@staticmethod
def compute_data_id(server: str, descriptor: dict) -> str:
"""Return a hash key based on the given parameters.
Parameters are automatically sorted before the hash is computed.
:param server: The server which is being handled.
:param descriptor: A dict describing the data.
:return: The computed hash key.
"""
descriptor_str = json.dumps(descriptor, sort_keys=True)
to_hash = '{}|{}'.format(server, descriptor_str).encode('utf-8')
hash_key = sha256(to_hash).hexdigest()
return hash_key
@classmethod
def create_redis_entry(cls, data_id: str,
file_path: str, descriptor: dict) -> None:
"""Creates an entry in Redis that reflects all meta data surrounding the
downloaded data. E.g. last access, data type, file system location, ...
:param data_id: Id associated with the loaded data.
:param file_path: Location of the data on the file system
:param descriptor: Describes the data and is used to download them
"""
try:
label = descriptor['label']
except KeyError:
label = str(descriptor)
data_obj = {
'file_path': file_path,
'last_access': time.time(),
'label': label,
'descriptor': descriptor,
'data_type': cls.produces
}
redis.set(name='data:{}'.format(data_id),
value=json.dumps(data_obj))
value = redis.get(name='data:{}'.format(self.request.id))
data_state = json.loads(value.decode('utf-8'))
data_state['loaded'] = True
redis.set(name='data:{}'.format(self.request.id), value=data_state)
def run(self, server: str, token: str,
descriptor: dict, tmp_dir: str) -> str:
"""Run the current task, that is running extract, transform and load.
descriptor: dict, file_path: str) -> None:
"""Run extract, transform and load. This is called by the celery worker.
This is called by the celery worker.
:param
:param server: The server on which the data are located.
:param token: The token used for authentication.
:param descriptor: Contains all necessary information to download data
:param tmp_dir: The directory where the files are stored
:param file_path: The location where the data will be stored
:return: The data id. Used to access the associated redis entry later on
"""
logger.info("Starting ETL process ...")
data_id = self.compute_data_id(server, descriptor)
data_dir = os.path.join(tmp_dir, 'data')
os.makedirs(data_dir, exist_ok=True)
file_path = os.path.join(data_dir, data_id)
os.makedirs(os.path.dirname(file_path), exist_ok=True)
logger.info("(E)xtracting data from server '{}'.".format(server))
raw_data = self.extract(server, token, descriptor)
logger.info("(T)ransforming data to Fractalis format.")
......@@ -171,5 +134,3 @@ class ETL(Task, metaclass=abc.ABCMeta):
logging.error(error, exc_info=1)
raise TypeError(error)
self.load(data_frame, file_path)
self.create_redis_entry(data_id, file_path, descriptor)
return data_id
"""This module provides the ETLHandler class."""