Commit 61cd4347 authored by Sascha Herzinger's avatar Sascha Herzinger
Browse files

Rewrite of data controller and removal of the shadow key concept

parent a6bc5490
Pipeline #2087 failed with stage
in 2 minutes and 37 seconds
......@@ -21,9 +21,9 @@ logger = logging.getLogger(__name__)
def prepare_session() -> None:
"""Make sure the session is properly initialized before each request."""
session.permanent = True
if 'analytics_jobs' not in session:
logger.debug("Initializing analytics_jobs field in session dict.")
session['analytics_jobs'] = []
if 'jobs' not in session:
logger.debug("Initializing jobs field in session dict.")
session['jobs'] = []
if 'data_ids' not in session:
logger.debug("Initializing data_ids field in session dict.")
session['data_ids'] = []
......@@ -41,14 +41,13 @@ def create_job() -> Tuple[Response, int]:
json = request.get_json(force=True) # pattern enforced by decorators
analytics_job = AnalyticsJob.factory(json['job_name'])
if analytics_job is None:
logger.error("Could not create job for unknown job: "
logger.error("Could not submit job for unknown job name: "
"'{}'".format(json['job_name']))
return jsonify({'error_msg': "Job with name '{}' not found.".format(
json['job_name'])}), 400
async_result = analytics_job.delay(session_id=session.sid,
session_data_ids=session['data_ids'],
async_result = analytics_job.delay(accessible_data_ids=session['data_ids'],
args=json['args'])
session['analytics_jobs'].append(async_result.id)
session['jobs'].append(async_result.id)
logger.debug("Job successfully submitted. Sending response.")
return jsonify({'job_id': async_result.id}), 201
......@@ -63,20 +62,17 @@ def get_job_details(job_id: UUID) -> Tuple[Response, int]:
logger.debug("Received GET request on /analytics/job_id.")
wait = request.args.get('wait') == '1'
job_id = str(job_id)
if job_id not in session['analytics_jobs']: # access control
logger.error("Job ID '{}' not found in session. "
"Refusing access.".format(job_id))
return jsonify({'error_msg': "No matching job found."}), 404
if job_id not in session['jobs']:
error = "Job ID '{}' not found in session. " \
"Refusing access.".format(job_id)
logger.warning(error)
return jsonify({'error': error}), 403
async_result = celery.AsyncResult(job_id)
if wait:
async_result.get(propagate=False) # make job synchronous
state = async_result.state
result = async_result.result
if isinstance(result, Exception): # Exception -> str
result = "{}: {}".format(type(result).__name__, str(result))
logger.debug("Job found and has access. Sending response.")
return jsonify({'state': state,
'result': result}), 200
return jsonify({'state': async_result.state,
'result': async_result.result}), 200
@analytics_blueprint.route('/<uuid:job_id>', methods=['DELETE'])
......@@ -88,13 +84,13 @@ def cancel_job(job_id: UUID) -> Tuple[Response, int]:
"""
logger.debug("Received DELETE request on /analytics/job_id.")
job_id = str(job_id)
if job_id not in session['analytics_jobs']: # Access control
logger.error("Job ID '{}' not found in session. "
"Refusing access.".format(job_id))
return jsonify({'error_msg': "No matching job found."}), 404
if job_id not in session['jobs']:
error = "Job ID '{}' not found in session. " \
"Refusing access.".format(job_id)
logger.warning(error)
return jsonify({'error': error}), 403
wait = request.args.get('wait') == '1'
# possibly dangerous: http://stackoverflow.com/a/29627549
celery.control.revoke(job_id, terminate=True, signal='SIGUSR1', wait=wait)
session['analytics_jobs'].remove(job_id)
logger.debug("Successfully send term signal to task. Sending response.")
return jsonify({'job_id': job_id}), 200
return jsonify(''), 200
import abc
import json
import re
import logging
import time
import pandas as pd
from celery import Task
......@@ -8,6 +10,9 @@ from celery import Task
from fractalis import redis
logger = logging.getLogger(__name__)
class AnalyticsJob(Task, metaclass=abc.ABCMeta):
@property
......@@ -27,38 +32,46 @@ class AnalyticsJob(Task, metaclass=abc.ABCMeta):
pass
@staticmethod
def prepare_args(session_id, session_data_ids, args):
def prepare_args(accessible_data_ids, args):
arguments = {}
for arg in args:
value = args[arg]
if (isinstance(value, str) and
value.startswith('$') and value.endswith('$')):
data_id = value[1:-1]
value = redis.get('data:{}'.format(data_id))
if value is None:
raise KeyError("The key '{}' does not match any entries"
" in the database.".format(data_id))
data_obj = json.loads(value.decode('utf-8'))
if session_id not in data_obj['access'] \
or data_id not in session_data_ids: # access check
raise KeyError("No permission to use data_id '{}'"
"for analysis".format(data_id))
if data_id not in accessible_data_ids:
error = "No permission to use data_id '{}'" \
"for analysis".format(data_id)
logger.error(error)
raise KeyError(error)
entry = redis.get('data:{}'.format(data_id))
if not entry:
error = "The key '{}' does not match any entry in Redis. " \
"Value probably expired.".format(data_id)
logger.error(error)
raise LookupError(error)
data_obj = json.loads(entry.decode('utf-8'))
# update 'last_access' internal
data_obj['last_access'] = time.time()
redis.set(name='data:{}'.format(data_id), value=data_obj)
file_path = data_obj['file_path']
value = pd.read_csv(file_path)
arguments[arg] = value
return arguments
def run(self, session_id, session_data_ids, args):
arguments = self.prepare_args(session_id, session_data_ids, args)
def run(self, accessible_data_ids, args):
arguments = self.prepare_args(accessible_data_ids, args)
result = self.main(**arguments)
try:
if type(result) != dict:
raise ValueError("The job '{}' "
"returned an object with type '{}', "
"instead of expected type 'dict'.")
error = "The job '{}' returned an object with type '{}', " \
"instead of expected type 'dict'."
logger.error(error)
raise ValueError(error)
result = json.dumps(result)
except Exception:
raise TypeError("The job '{}' result could not be JSON serialized."
.format(self.name))
except TypeError as e:
logging.exception(e)
raise
result = re.sub(r': NaN', ': null', result)
return result
"""The /data controller. Please refer to doc/api for more information."""
import json
import time
import logging
from uuid import UUID
from typing import Tuple
from flask import Blueprint, session, request, jsonify
......@@ -11,66 +13,28 @@ from fractalis.data.etlhandler import ETLHandler
from fractalis.data.schema import create_data_schema
from fractalis.validator import validate_json, validate_schema
from fractalis import celery, redis
from fractalis.sync import remove_file
data_blueprint = Blueprint('data_blueprint', __name__)
logger = logging.getLogger(__name__)
def get_data_by_id(data_id: str, wait: bool) -> dict:
""" Given a data id return the related Redis DB entry.
:param data_id: The id computed based on the payload that was send to /data
:param wait: Waits for celery to complete. Useful for testing or short jobs.
:return:
"""
value = redis.get('data:{}'.format(data_id))
if value is None:
error = "Could not find data entry in Redis for data_id: " \
"'{}'".format(data_id)
logger.error(error, exc_info=1)
raise LookupError(error)
data_obj = json.loads(value.decode('utf-8'))
job_id = data_obj['job_id']
async_result = celery.AsyncResult(job_id)
if wait:
async_result.get(propagate=False) # wait for results
state = async_result.state
result = async_result.result
if isinstance(result, Exception): # Exception -> str
result = "{}: {}".format(type(result).__name__, str(result))
data_obj['state'] = state
data_obj['message'] = result
data_obj['data_id'] = data_id
# remove internal information from response
del data_obj['file_path']
del data_obj['access']
return data_obj
@data_blueprint.before_request
def prepare_session() -> None:
"""Make sure the session is properly initialized before each request."""
session.permanent = True
if 'jobs' not in session:
logger.debug("Initializing jobs field in session dict.")
session['jobs'] = []
if 'data_ids' not in session:
logger.debug("Initializing data_ids field in session dict.")
session['data_ids'] = []
@data_blueprint.before_request
def cleanup_session() -> None:
"""Remove data_ids from session that have expired."""
for data_id in session['data_ids']:
logger.debug("Testing if data id '{}' has expired.".format(data_id))
if not redis.exists('shadow:data:{}'.format(data_id)):
logger.debug("Could not find shadow entry with id: '{}' in Redis. "
"Removing id from session.".format(data_id))
session['data_ids'].remove(data_id)
@data_blueprint.route('', methods=['POST'])
@validate_json
@validate_schema(create_data_schema)
def create_data() -> Tuple[Response, int]:
def create_data_job() -> Tuple[Response, int]:
"""Submit a new ETL task based on the payload of the request body.
See doc/api/ for more information.
:return: Flask Response
......@@ -81,102 +45,126 @@ def create_data() -> Tuple[Response, int]:
etl_handler = ETLHandler.factory(handler=payload['handler'],
server=payload['server'],
auth=payload['auth'])
data_ids = etl_handler.handle(descriptors=payload['descriptors'],
session_id=session.sid,
wait=wait)
session['data_ids'] += data_ids
session['data_ids'] = list(set(session['data_ids'])) # make unique
logger.debug("Job successfully submitted. Sending response.")
return jsonify({'data_ids': data_ids}), 201
job_ids = etl_handler.handle(descriptors=payload['descriptors'], wait=wait)
session['jobs'] += job_ids
session['jobs'] = list(set(session['data_jobs'])) # make unique
logger.debug("Jobs successfully submitted. Sending response.")
return jsonify({'job_ids': job_ids}), 201
@data_blueprint.route('/<uuid:job_id>', methods=['GET'])
def get_data_job_state(job_id: UUID) -> Tuple[Response, int]:
"""Get information for data that matches given job_id. If the job was
successful add the data_id associated with the successful job to the session
for access control and return it.
:param job_id: The id associated with the previously submitted job.
See doc/api/ for more information.
:return: Flask Response
"""
logger.debug("Received GET request on /data/job_id.")
job_id = str(job_id)
wait = request.args.get('wait') == '1'
if job_id not in session['jobs']:
error = "Job ID '{}' not found in session. " \
"Refusing access.".format(job_id)
logger.warning(error)
return jsonify({'error': error}), 403
async_result = celery.AsyncResult(job_id)
if wait:
async_result.get(propagate=False)
if async_result.state == 'SUCCESS':
logger.debug("Job '{}' successful. Adding data_id '{}' "
"to session.".format(job_id, async_result.result))
session['data_ids'] = async_result.result
logger.debug("Job found and has access. Sending response.")
return jsonify({'state': async_result.state,
'result': async_result.result}), 200
@data_blueprint.route('/<string:data_id>', methods=['GET'])
def get_data_by_id(data_id: str) -> Tuple[Response, int]:
"""Given a data id return the related Redis DB entry.
:param data_id: The id returned by the data job submitted by create_data_job
:return: Parsed and modified data entry from Redis.
"""
logger.debug("Received GET request on /data/data_id.")
if data_id not in session['data_ids']:
error = "Data ID '{}' not found in session. " \
"Refusing access.".format(data_id)
logger.warning(error)
return jsonify({'error': error}), 403
value = redis.get('data:{}'.format(data_id))
if not value:
error = "Could not find data entry in Redis for data_id: " \
"'{}'. The entry probably expired.".format(data_id)
logger.warning(error)
return jsonify({'error': error}), 404
data_obj = json.loads(value.decode('utf-8'))
# update 'last_access' internal
data_obj['last_access'] = time.time()
redis.set(name='data:{}'.format(data_id), value=data_obj)
# remove internal information from response
del data_obj['file_path']
del data_obj['last_access']
logger.debug("Data found and has access. Sending response.")
return jsonify({'data_state': data_obj}), 200
@data_blueprint.route('', methods=['GET'])
def get_all_data_state() -> Tuple[Response, int]:
"""Get information for all data within the current session.
"""Get information for all data that the current session can access.
See doc/api/ for more information.
:return: Flask Response
"""
logger.debug("Received GET request on /data.")
wait = request.args.get('wait') == '1'
data_states = [get_data_by_id(data_id, wait)
for data_id in session['data_ids']]
logger.debug("Job successfully submitted. Sending response.")
data_states = []
for data_id in session['data_ids']:
value = redis.get('data:{}'.format(data_id))
if not value:
error = "Could not find data entry in Redis for data_id: " \
"'{}'. The entry probably expired.".format(data_id)
logger.warning(error)
continue
data_obj = json.loads(value.decode('utf-8'))
# update 'last_access' internal
data_obj['last_access'] = time.time()
redis.set(name='data:{}'.format(data_id), value=data_obj)
# remove internal information from response
del data_obj['file_path']
del data_obj['last_access']
data_states.append(data_obj)
logger.debug("Data states collected. Sending response.")
return jsonify({'data_states': data_states}), 200
@data_blueprint.route('/<string:params>', methods=['GET'])
def get_data_state(params) -> Tuple[Response, int]:
"""Get information for data that matches given arguments.
:param params: Can be data ID or data descriptor
@data_blueprint.route('/<string:data_id>', methods=['DELETE'])
def delete_data(data_id: str) -> Tuple[Response, int]:
"""This only deletes data from the session, not Redis or the file system.
This is enough to disable data visibility for the current user, but does not
influence other users of the same data. Fractalis automatically removes
entries that are no longer accessed after a certain period of time.
:param data_id: The id returned by the data job submitted by create_data_job
See doc/api/ for more information.
:return: Flask Response
"""
logger.debug("Received GET request on /data/params.")
wait = request.args.get('wait') == '1'
# params can be data_id or dict
try:
params = json.loads(params)
data_id = ETLHandler.compute_data_id(server=params['server'],
descriptor=params['descriptor'])
except ValueError:
logger.debug("Couldn't parse params: '{}'. "
"Assuming it is a data id.".format(params))
data_id = params
if data_id not in session['data_ids']: # access control
logger.error("Data ID '{}' not found in session. "
"Refusing access.".format(data_id))
return jsonify(
{'error_msg': "No matching data found. Maybe expired?"}), 404
data_obj = get_data_by_id(data_id, wait)
logger.debug("Successfully retrieved data state. Sending response.")
return jsonify({'data_state': data_obj}), 200
@data_blueprint.route('/<string:params>', methods=['DELETE'])
def delete_data(params) -> Tuple[Response, int]:
"""Delete data from Redis, session, and FS for given params.
:param params: Can be data ID or data descriptor
See doc/api/ for more information.
:return: Flask Response
"""
logger.debug("Received DELETE request on /data/params.")
wait = request.args.get('wait') == '1'
# params can be data_id or dict
try:
params = json.loads(params)
data_id = ETLHandler.compute_data_id(server=params['server'],
descriptor=params['descriptor'])
except ValueError:
logger.debug("Couldn't parse params: '{}'. "
"Assuming it is a data id.".format(params))
data_id = params
if data_id not in session['data_ids']: # access control
logger.error("Data ID '{}' not found in session. "
"Refusing access.".format(data_id))
return jsonify(
{'error_msg': "No matching data found. Maybe expired?"}), 404
value = redis.get('data:{}'.format(data_id))
data_obj = json.loads(value.decode('utf-8'))
file_path = data_obj['file_path']
async_result = remove_file.delay(file_path)
if wait:
async_result.get(propagate=False)
redis.delete('data:{}'.format(data_id))
redis.delete('shadow:data:{}'.format(data_id))
session['data_ids'].remove(data_id)
logger.debug("Successfully deleted data. Sending response.")
logger.debug("Received DELETE request on /data/data_id.")
if data_id in session['data_ids']:
session['data_ids'].remove(data_id)
logger.debug("Successfully removed data from session. Sending response.")
return jsonify(''), 200
@data_blueprint.route('', methods=['DELETE'])
def delete_all_data() -> Tuple[Response, int]:
"""Call delete_data() for every data id in the current session.
"""This only deletes data from the session, not Redis or the file system.
This is enough to disable data visibility for the current user, but does not
influence other users of the same data. Fractalis automatically removes
entries that are no longer accessed after a certain period of time.
See doc/api/ for more information.
:return: Flask Response
"""
logger.debug("Received DELETE request on /data.")
for data_id in session['data_ids']:
logging.debug("Using delete_data() for data id '{}'".format(data_id))
delete_data(data_id)
logger.debug("Successfully deleted all data. Sending response.")
session['data_ids'] = []
logger.debug("Successfully removed all data from session. "
"Sending response.")
return jsonify(''), 200
"""This module provides the ETL class"""
import os
import abc
import json
import time
import logging
from typing import List
from hashlib import sha256
from celery import Task
from pandas import DataFrame
......@@ -58,7 +61,6 @@ class ETL(Task, metaclass=abc.ABCMeta):
def can_handle(cls, handler: str, data_type: str) -> bool:
"""Check if the current implementation of ETL can handle given handler
and data type.
:param handler: Describes the handler. E.g.: transmart, ada
:param data_type: Describes the data type. E.g.: ldd, hdd
:return: True if implementation can handle given parameters.
......@@ -69,7 +71,6 @@ class ETL(Task, metaclass=abc.ABCMeta):
def factory(cls, handler: str, data_type: str) -> 'ETL':
"""Return an instance of the implementation ETL that can handle the
given parameters.
:param handler: Describes the handler. E.g.: transmart, ada
:param data_type: Describes the data type. E.g.: ldd, hdd
:return: An instance of an implementation of ETL that returns True for
......@@ -81,12 +82,11 @@ class ETL(Task, metaclass=abc.ABCMeta):
return etl()
raise NotImplementedError(
"No ETL implementation found for handler '{}' and data type '{}'"
.format(handler, data_type))
.format(handler, data_type))
@abc.abstractmethod
def extract(self, server: str, token: str, descriptor: dict) -> object:
"""Extract the data via HTTP requests.
:param server: The server from which to extract from.
:param token: The token used for authentication.
:param descriptor: The descriptor containing all necessary information
......@@ -98,7 +98,6 @@ class ETL(Task, metaclass=abc.ABCMeta):
def transform(self, raw_data: object) -> DataFrame:
"""Transform the data into a pandas.DataFrame with a naming according to
the Fractalis standard format.
:param raw_data: The data to transform.
"""
pass
......@@ -106,41 +105,62 @@ class ETL(Task, metaclass=abc.ABCMeta):
@staticmethod
def load(data_frame: DataFrame, file_path: str) -> None:
"""Load (save) the data to the file system.
:param data_frame: DataFrame to write.
:param file_path: Path to write to.
:param file_path: File to write to.
"""
data_frame.to_csv(file_path, index=False)
@staticmethod
def grant_access(data_id, session_id):
"""Makes an entry in redis that reflects the permission of the session
to access/read the data. This entry is removed when the data expire via
sync.py. Individual session ids are never removed as long as the data
object lives.
:param data_id: The id of the data
:param session_id: The id of the session
def compute_data_id(server: str, descriptor: dict) -> str:
"""Return a hash key based on the given parameters.
Parameters are automatically sorted before the hash is computed.
:param server: The server which is being handled.
:param descriptor: A dict describing the data.
:return: The computed hash key.
"""
descriptor_str = json.dumps(descriptor, sort_keys=True)
to_hash = '{}|{}'.format(server, descriptor_str).encode('utf-8')
hash_key = sha256(to_hash).hexdigest()
return hash_key
@classmethod
def create_redis_entry(cls, data_id: str,
file_path: str, descriptor: dict) -> None:
"""Creates an entry in Redis that reflects all meta data surrounding the
downloaded data. E.g. last access, data type, file system location, ...
:param data_id: Id associated with the loaded data.
:param file_path: Location of the data on the file system
:param descriptor: Describes the data and is used to download them
"""
key = 'data:{}'.format(data_id)
value = redis.get(key)
data_obj = json.loads(value.decode('utf-8'))
data_obj['access'].append(session_id)
data_obj['access'] = list(set(data_obj['access'])) # make ids unique
redis.set(name=key, value=json.dumps(data_obj))
def run(self, server: str, token: str, descriptor: dict,
file_path: str, session_id: str, data_id: str) -> None:
"""Run the current task.
try:
label = descriptor['label']
except KeyError:
label = str(descriptor)
data_obj = {
'file_path': file_path,
'last_access': time.time(),
'label': label,
'descriptor': descriptor,
'data_type': cls.produces
}
redis.set(name='data:{}'.format(data_id),
value=json.dumps(data_obj))
def run(self, server: str, token: str,
descriptor: dict, tmp_dir: str) -> str:
"""Run the current task, that is running extract, transform and load.
This is called by the celery worker.
:param server: The server on which the data are located.
:param token: The token used for authentication.
:param descriptor: Contains all necessary information to download data.
:param file_path: The path to where the file is written.
:param session_id: The id of the session that requested this job
:param data_id: The id of the data object that is related to this ETL
:param descriptor: Contains all necessary information to download data
:param tmp_dir: The directory where the files are stored
:return: The data id. Used to access the associated redis entry later on
"""
logger.info("Starting ETL process ...")
data_id = self.compute_data_id(server, descriptor)
data_dir = os.path.join(tmp_dir, 'data')
os.makedirs(data_dir, exist_ok=True)
file_path = os.path.join(data_dir, data_id)
logger.info("(E)xtracting data from server '{}'.".format(server))
raw_data = self.extract(server, token, descriptor)
logger.info("(T)ransforming data to Fractalis format.")
......@@ -151,6 +171,5 @@ class ETL(Task, metaclass=abc.ABCMeta):
logging.error(error, exc_info=1)
raise TypeError(error)
self.load(data_frame, file_path)
# at this point we know that the session has permission to read the data
# otherwise authentication with target API would have failed
self.grant_access(data_id, session_id)
self.create_redis_entry(data_id, file_path, descriptor)
return data_id
"""This module provides the ETLHandler class."""
import os
import abc
import json
from uuid import uuid4
from hashlib import sha256
from typing import List
from celery import uuid
from fractalis import app, redis
from fractalis import app
from fractalis.data.etl import ETL
......@@ -31,7 +25,6 @@ class ETLHandler(metaclass=abc.ABCMeta):