Commit c5125d70 authored by Sascha Herzinger's avatar Sascha Herzinger
Browse files

Refactored the way state saving works.

parent 6524d30c
......@@ -32,6 +32,8 @@ def create_data_task() -> Tuple[Response, int]:
server=payload['server'],
auth=payload['auth'])
task_ids = etl_handler.handle(descriptors=payload['descriptors'],
data_tasks=session['data_tasks'],
use_existing=False,
wait=wait)
session['data_tasks'] += task_ids
session['data_tasks'] = list(set(session['data_tasks']))
......
......@@ -5,10 +5,10 @@ import abc
import json
import logging
from uuid import uuid4
from typing import List
from typing import List, Union
from fractalis import app, redis
import manage
from fractalis import app, redis, celery
from fractalis.data.etl import ETL
......@@ -87,6 +87,7 @@ class ETLHandler(metaclass=abc.ABCMeta):
'file_path': file_path,
'label': self.make_label(descriptor),
'data_type': data_type,
'hash': self.descriptor_to_hash(descriptor),
'meta': {
'descriptor': descriptor,
}
......@@ -95,11 +96,75 @@ class ETLHandler(metaclass=abc.ABCMeta):
value=json.dumps(data_state),
time=app.config['FRACTALIS_CACHE_EXP'])
def handle(self, descriptors: List[dict], wait: bool = False) -> List[str]:
def descriptor_to_hash(self, descriptor):
"""Compute hash for the given descriptor. Used to identify duplicates.
:param descriptor: ETL descriptor. Used to identify duplicates.
:return: Unique hash.
"""
string = '{}-{}-{}'.format(self._server,
self._handler,
str(descriptor))
hash_value = int.from_bytes(string.encode('utf-8'), 'little')
return hash_value
def find_duplicates(self, data_tasks: List[str],
descriptor: dict) -> List[str]:
"""Search for duplicates of the given descriptor and return a list
of associated task ids.
:param data_tasks: Limit duplicate search to.
:param descriptor: ETL descriptor. Used to identify duplicates.
:return: The list of duplicates.
"""
task_ids = []
hash_value = self.descriptor_to_hash(descriptor)
for task_id in data_tasks:
value = redis.get('data:{}'.format(task_id))
if value is None:
continue
data_state = json.loads(value)
if hash_value == data_state['hash']:
task_ids.append(task_id)
return task_ids
def remove_duplicates(self, data_tasks: List[str],
descriptor: dict) -> None:
"""Delete the duplicates of the given descriptor from redis and call
the janitor afterwards to cleanup orphaned files.
:param data_tasks: Limit duplicate search to.
:param descriptor: ETL descriptor. Used to identify duplicates.
"""
task_ids = self.find_duplicates(data_tasks, descriptor)
for task_id in task_ids:
redis.delete('data:{}'.format(task_id))
manage.janitor.delay()
def find_duplicate_task_id(self, data_tasks: List[str],
descriptor: dict) -> Union[str, None]:
"""Search for duplicates of the given descriptor and return their
task id if the state is SUBMITTED or SUCCESS, meaning the data are
reusable.
:param data_tasks: Limit search to this list.
:param descriptor: ETL descriptor. Used to identify duplicates.
:return: TaskID if valid duplicate has been found, None otherwise.
"""
task_ids = self.find_duplicates(data_tasks, descriptor)
for task_id in task_ids:
async_result = celery.AsyncResult(task_id)
if (async_result.state == 'SUBMITTED' or
async_result.state == 'SUCCESS'):
return task_id
return None
def handle(self, descriptors: List[dict], data_tasks: List[str],
use_existing: bool, wait: bool = False) -> List[str]:
"""Create instances of ETL for the given descriptors and submit them
(ETL implements celery.Task) to the broker. The task ids are returned
to keep track of them.
:param descriptors: A list of items describing the data to download.
:param data_tasks: Limit search for duplicates to this list.
:param use_existing: If a duplicate with state 'SUBMITTED' or 'SUCCESS'
already exists use it instead of starting a new ETL. If this is False
duplicates are deleted!
:param wait: Makes this method synchronous by waiting for the tasks to
return.
:return: The list of task ids for the submitted tasks.
......@@ -107,6 +172,13 @@ class ETLHandler(metaclass=abc.ABCMeta):
data_dir = os.path.join(app.config['FRACTALIS_TMP_DIR'], 'data')
task_ids = []
for descriptor in descriptors:
if use_existing:
task_id = self.find_duplicate_task_id(data_tasks, descriptor)
if task_id:
task_ids.append(task_id)
continue
else:
self.remove_duplicates(data_tasks, descriptor)
task_id = str(uuid4())
file_path = os.path.join(data_dir, task_id)
etl = ETL.factory(handler=self._handler, descriptor=descriptor)
......
......@@ -28,16 +28,18 @@ def save_state() -> Tuple[Response, int]:
"""
logger.debug("Received POST request on /state.")
payload = request.get_json(force=True)
# check if task ids in payload are valid
matches = re.findall('\$.+?\$', str(payload))
state = str(payload['state'])
matches = re.findall('\$.+?\$', state)
task_ids = [AnalyticTask.parse_value(match)[0] for match in matches]
task_ids = list(set(task_ids))
if not matches:
error = "This state cannot be saved because it contains no data " \
"task ids. These are used to verify access to the state and " \
"its potentially sensitive data."
logger.error(error)
return jsonify({'error': error}), 400
for match in matches:
task_id, _ = AnalyticTask.parse_value(match)
descriptors = []
for task_id in task_ids:
value = redis.get('data:{}'.format(task_id))
if value is None:
error = "Data task id is {} could not be found in redis. " \
......@@ -45,14 +47,23 @@ def save_state() -> Tuple[Response, int]:
logger.error(error)
return jsonify({'error': error}), 400
try:
json.loads(value)['meta']['descriptor']
except (ValueError, KeyError):
data_state = json.loads(value)
descriptors.append(data_state['meta']['descriptor'])
except (ValueError, KeyError, TypeError):
error = "Task with id {} was found in redis but it represents " \
"no valid data state. " \
"State cannot be saved.".format(task_id)
return jsonify({'error': error}), 400
assert len(matches) == len(descriptors)
meta_state = {
'state': state,
'server': payload['server'],
'handler': payload['handler'],
'task_ids': task_ids,
'descriptors': descriptors
}
uuid = uuid4()
redis.set(name='state:{}'.format(uuid), value=json.dumps(payload))
redis.set(name='state:{}'.format(uuid), value=json.dumps(meta_state))
logger.debug("Successfully saved data to redis. Sending response.")
return jsonify({'state_id': uuid}), 201
......@@ -76,28 +87,18 @@ def request_state_access(state_id: UUID) -> Tuple[Response, int]:
error = "Could not find state associated with id {}".format(state_id)
logger.error(error)
return jsonify({'error': error}), 404
descriptors = []
matches = re.findall('\$.+?\$', str(json.loads(value)))
for match in matches:
task_id, _ = AnalyticTask.parse_value(match)
value = redis.get('data:{}'.format(task_id))
if value is None:
error = "The state with id {} exists, but one or more of the " \
"associated data task ids are missing. Hence this saved " \
"state is lost forever because access can no longer be " \
"verified. Deleting state..."
logger.error(error)
redis.delete('state:{}'.format(state_id))
return jsonify({'error': error}), 403
data_state = json.loads(value)
descriptors.append(data_state['meta']['descriptor'])
etl_handler = ETLHandler.factory(handler=payload['handler'],
server=payload['server'],
meta_state = json.loads(value)
etl_handler = ETLHandler.factory(handler=meta_state['handler'],
server=meta_state['server'],
auth=payload['auth'])
task_ids = etl_handler.handle(descriptors=descriptors, wait=wait)
task_ids = etl_handler.handle(descriptors=meta_state['descriptors'],
data_tasks=session['data_tasks'],
use_existing=True,
wait=wait)
session['data_tasks'] += task_ids
session['data_tasks'] = list(set(session['data_tasks']))
# if all task finish successfully we now that session has access to state
# if all tasks finish successfully we now that session has access to state
session['state_access'][state_id] = task_ids
logger.debug("Tasks successfully submitted. Sending response.")
return jsonify(''), 202
......@@ -114,6 +115,7 @@ def get_state_data(state_id: UUID) -> Tuple[Response, int]:
logger.debug("Received GET request on /state/<uuid:state_id>.")
wait = request.args.get('wait') == '1'
state_id = str(state_id)
meta_state = json.loads(redis.get('state:{}'.format(state_id)))
if state_id not in session['state_access']:
error = "Cannot get state. Make sure to submit a POST request " \
"to this very same URL containing credentials and server " \
......@@ -132,5 +134,9 @@ def get_state_data(state_id: UUID) -> Tuple[Response, int]:
"Assuming no access to saved state."
logger.error(error)
return jsonify({'error': error}), 403
state = json.loads(redis.get('state:{}'.format(state_id)))
return jsonify({'state': state}), 200
# replace task ids in state with the ids of the freshly loaded data
for i, task_id in enumerate(meta_state['task_ids']):
meta_state['state'] = re.sub(pattern=task_id,
repl=session['state_access'][i],
string=meta_state['state'])
return jsonify({'state': meta_state}), 200
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment