Commit 6524d30c authored by Sascha Herzinger's avatar Sascha Herzinger
Browse files

Reverted cache duplication feature, because it didn't behave as expected

parent 6f992244
Pipeline #3816 failed with stage
in 2 minutes and 49 seconds
......@@ -56,7 +56,7 @@ CORS(app, supports_credentials=True)
# create celery app
log.info("Creating celery app.")
from fractalis.celeryapp import make_celery, register_tasks # noqa
from fractalis.celeryapp import make_celery, register_tasks # noqa: E402
celery = make_celery(app)
# register blueprints
......
......@@ -87,7 +87,8 @@ class AnalyticTask(Task, metaclass=abc.ABCMeta):
logger.error(error)
raise LookupError(error)
data_state = json.loads(entry)
if not data_state['loaded']:
async_result = self.AsyncResult(data_task_id)
if async_result.state != 'SUCCESS':
error = "The data task '{}' has not been loaded, yet. " \
"Wait for it to complete before using it in an " \
"analysis task.".format(data_task_id)
......
......@@ -7,8 +7,6 @@ from celery import Celery, current_app
from celery.signals import after_task_publish
from flask import Flask
from fractalis.analytics.task import AnalyticTask
from fractalis.data.etl import ETL
from fractalis.utils import list_classes_with_base_class
......@@ -54,6 +52,8 @@ def make_celery(app: Flask) -> Celery:
def register_tasks() -> None:
"""Register all of our Task classes with celery."""
from fractalis import celery
from fractalis.analytics.task import AnalyticTask
from fractalis.data.etl import ETL
logger.info("Registering ETLs ...")
etl_classes = list_classes_with_base_class('fractalis.data.etls', ETL)
......
......@@ -2,7 +2,7 @@
import json
import logging
from typing import Tuple
from typing import Tuple, Union
from flask import Blueprint, session, request, jsonify, Response
......@@ -39,7 +39,7 @@ def create_data_task() -> Tuple[Response, int]:
return jsonify(''), 201
def get_data_state_for_task_id(task_id: str, wait: bool) -> dict:
def get_data_state_for_task_id(task_id: str, wait: bool) -> Union[dict, None]:
"""Return data state associated with task id.
:param task_id: The id associated with the ETL task.
:param wait: If true and ETL is still running wait for it.
......@@ -51,10 +51,7 @@ def get_data_state_for_task_id(task_id: str, wait: bool) -> dict:
async_result.get(propagate=False)
value = redis.get('data:{}'.format(task_id))
if not value:
error = "Could not find data entry in " \
"Redis for task_id '{}'.".format(task_id)
logger.error(error)
return {}
return None
data_state = json.loads(value)
# add additional information to data_state
result = async_result.result
......@@ -77,10 +74,10 @@ def get_all_data() -> Tuple[Response, int]:
data_states = []
for task_id in session['data_tasks']:
data_state = get_data_state_for_task_id(task_id, wait)
if not data_state:
# remove expired data task id
session['data_tasks'].remove(task_id)
continue
if data_state is None:
warning = "Data state with task_id '{}' expired. " \
"Discarding...".format(task_id)
logger.warning(warning)
# remove internal information from response
del data_state['file_path']
del data_state['meta']
......@@ -143,5 +140,10 @@ def get_meta_information(task_id: str) -> Tuple[Response, int]:
logger.warning(error)
return jsonify({'error': error}), 403
data_state = get_data_state_for_task_id(task_id, wait)
if data_state is None:
error = "Could not find redis entry for this task id '{}'. " \
"The entry probably expired.".format(task_id)
logger.error(error)
return jsonify({'error': error}), 404
logger.debug("Successfully gather meta information. Sending response.")
return jsonify({'meta': data_state['meta']}), 200
......@@ -101,10 +101,8 @@ class ETL(Task, metaclass=abc.ABCMeta):
pass
def update_redis(self, data_frame: DataFrame) -> None:
"""Set redis entry to 'loaded' state to indicate that the user has
has read access. At this step we also set several meta information
that can be used for preview functionality that do not require all
data to be loaded.
"""Set several meta information that can be used to filter the data
before the analysis.
:param data_frame: The extracted and transformed data.
"""
value = redis.get(name='data:{}'.format(self.request.id))
......@@ -114,7 +112,6 @@ class ETL(Task, metaclass=abc.ABCMeta):
features = data_frame['feature'].unique().tolist()
else:
features = []
data_state['loaded'] = True
data_state['meta']['features'] = features
redis.setex(name='data:{}'.format(self.request.id),
value=json.dumps(data_state),
......
......@@ -75,8 +75,8 @@ class ETLHandler(metaclass=abc.ABCMeta):
def create_redis_entry(self, task_id: str, file_path: str,
descriptor: dict, data_type: str) -> None:
"""Creates an entry in Redis that reflects all meta data surrounding the
downloaded data. E.g. data type, file system location, ...
"""Creates an entry in Redis that contains meta information for the
data that are to be downloaded.
:param task_id: Id associated with the loaded data.
:param file_path: Location of the data on the file system.
:param descriptor: Describes the data and is used to download them.
......@@ -89,8 +89,7 @@ class ETLHandler(metaclass=abc.ABCMeta):
'data_type': data_type,
'meta': {
'descriptor': descriptor,
},
'loaded': False,
}
}
redis.setex(name='data:{}'.format(task_id),
value=json.dumps(data_state),
......@@ -119,7 +118,7 @@ class ETLHandler(metaclass=abc.ABCMeta):
async_result = etl.apply_async(kwargs=kwargs, task_id=task_id)
assert async_result.id == task_id
task_ids.append(task_id)
if wait:
if wait and async_result.state == 'SUBMITTED':
logger.debug("'wait' was set. Waiting for tasks to finish ...")
async_result.get(propagate=False)
return task_ids
......
......@@ -123,9 +123,9 @@ def get_state_data(state_id: UUID) -> Tuple[Response, int]:
return jsonify({'error': error}), 404
for task_id in session['state_access'][state_id]:
data_state = get_data_state_for_task_id(task_id=task_id, wait=wait)
if data_state['etl_state'] == 'SUBMITTED':
if data_state is not None and data_state['etl_state'] == 'SUBMITTED':
return jsonify({'message': 'ETLs are still running.'}), 202
elif data_state['etl_state'] == 'SUCCESS':
elif data_state is not None and data_state['etl_state'] == 'SUCCESS':
continue
else:
error = "One or more ETLs failed or has unknown status. " \
......
......@@ -2,12 +2,13 @@ import os
from flask_script import Manager
from fractalis import app, redis, sync
from fractalis import app, redis, sync, celery
manager = Manager(app)
@celery.task
@manager.command
def janitor():
"""Ideally this is maintained by a systemd service to cleanup redis and the
......
......@@ -159,9 +159,7 @@ class TestData:
assert 'file_path' in data_state
assert 'label' in data_state
assert 'data_type' in data_state
assert 'loaded' in data_state
assert 'meta' in data_state
assert not data_state['loaded']
def test_valid_redis_after_loaded_on_post(self, test_client, payload):
test_client.post('/data?wait=1', data=payload['serialized'])
......@@ -173,9 +171,7 @@ class TestData:
assert 'file_path' in data_state
assert 'label' in data_state
assert 'data_type' in data_state
assert 'loaded' in data_state
assert 'meta' in data_state
assert data_state['loaded']
def test_valid_filesystem_before_loaded_on_post(
self, test_client, payload):
......@@ -234,7 +230,6 @@ class TestData:
assert data_state['etl_state'] == 'SUBMITTED'
assert not data_state['etl_message']
assert data_state['data_type'] == 'mock'
assert not data_state['loaded']
assert 'task_id' in data_state
def test_valid_response_after_loaded_on_get(self, test_client, payload):
......@@ -247,7 +242,6 @@ class TestData:
assert data_state['etl_state'] == 'SUCCESS'
assert not data_state['etl_message']
assert data_state['data_type'] == 'mock'
assert data_state['loaded']
assert 'task_id' in data_state
def test_valid_response_if_failing_on_get(self, test_client, faiload):
......@@ -260,7 +254,6 @@ class TestData:
assert data_state['etl_state'] == 'FAILURE'
assert data_state['etl_message']
assert data_state['data_type'] == 'mock'
assert not data_state['loaded']
assert 'task_id' in data_state
def test_valid_state_for_finished_etl_on_delete(
......
......@@ -51,7 +51,7 @@ class TestETL:
df1 = pd.DataFrame([[1, 2, 3]], columns=['id', 'feature', 'value'])
df2 = pd.DataFrame([[1, 3]], columns=['id', 'value'])
df3 = pd.DataFrame([], columns=['id', 'feature', 'value'])
redis.set('data:123', json.dumps({'loaded': False, 'meta': {}}))
redis.set('data:123', json.dumps({'meta': {}}))
self.etl.update_redis(data_frame=df1)
data_state = json.loads(redis.get('data:123'))
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment