Commit f91bf3a3 authored by Sascha Herzinger's avatar Sascha Herzinger
Browse files

All unit tests passing

parent 4c48f417
Pipeline #3832 passed with stage
in 10 minutes and 6 seconds
......@@ -143,7 +143,7 @@ class AnalyticTask(Task, metaclass=abc.ABCMeta):
"but nothing else.")
data_task_id = value
filters = None
return data_task_id, filters
return str(data_task_id), filters
def prepare_args(self, session_data_tasks: List[str],
args: dict, decrypt: bool) -> dict:
......
......@@ -126,7 +126,8 @@ class ETLHandler(metaclass=abc.ABCMeta):
task_ids.append(task_id)
return task_ids
def remove_duplicates(self, data_tasks: List[str], descriptor: dict) -> None:
def remove_duplicates(self, data_tasks: List[str],
descriptor: dict) -> None:
"""Delete the duplicates of the given descriptor from redis and call
the janitor afterwards to cleanup orphaned files.
:param data_tasks: Limit duplicate search to.
......@@ -194,6 +195,7 @@ class ETLHandler(metaclass=abc.ABCMeta):
if wait and async_result.state == 'SUBMITTED':
logger.debug("'wait' was set. Waiting for tasks to finish ...")
async_result.get(propagate=False)
task_ids = list(set(task_ids))
return task_ids
@staticmethod
......
......@@ -8,11 +8,10 @@ from typing import Tuple
from flask import Blueprint, jsonify, Response, request, session
from fractalis import redis
from fractalis import redis, celery
from fractalis.validator import validate_json, validate_schema
from fractalis.analytics.task import AnalyticTask
from fractalis.data.etlhandler import ETLHandler
from fractalis.data.controller import get_data_state_for_task_id
from fractalis.state.schema import request_state_access_schema, \
save_state_schema
......@@ -30,7 +29,7 @@ def save_state() -> Tuple[Response, int]:
"""
logger.debug("Received POST request on /state.")
payload = request.get_json(force=True)
state = str(payload['state'])
state = json.dumps(payload['state'])
matches = re.findall('\$.+?\$', state)
task_ids = [AnalyticTask.parse_value(match)[0] for match in matches]
task_ids = list(set(task_ids))
......@@ -48,15 +47,9 @@ def save_state() -> Tuple[Response, int]:
"State cannot be saved".format(task_id)
logger.error(error)
return jsonify({'error': error}), 400
try:
data_state = json.loads(value)
descriptors.append(data_state['meta']['descriptor'])
except (ValueError, KeyError, TypeError):
error = "Task with id {} was found in redis but it represents " \
"no valid data state. " \
"State cannot be saved.".format(task_id)
return jsonify({'error': error}), 400
assert len(matches) == len(descriptors)
data_state = json.loads(value)
descriptors.append(data_state['meta']['descriptor'])
assert len(task_ids) == len(descriptors)
meta_state = {
'state': state,
'server': payload['server'],
......@@ -115,21 +108,22 @@ def get_state_data(state_id: UUID) -> Tuple[Response, int]:
:return: Previously saved state.
"""
logger.debug("Received GET request on /state/<uuid:state_id>.")
wait = request.args.get('wait') == '1'
state_id = str(state_id)
meta_state = json.loads(redis.get('state:{}'.format(state_id)))
if state_id not in session['state_access']:
value = redis.get('state:{}'.format(state_id))
if not value or state_id not in session['state_access']:
error = "Cannot get state. Make sure to submit a POST request " \
"to this very same URL containing credentials and server " \
"data to launch access verification. Only after that a GET " \
"request might or might not return you the saved state."
logger.error(error)
return jsonify({'error': error}), 404
meta_state = json.loads(value)
state = json.dumps(meta_state['state'])
for task_id in session['state_access'][state_id]:
data_state = get_data_state_for_task_id(task_id=task_id, wait=wait)
if data_state is not None and data_state['etl_state'] == 'SUBMITTED':
async_result = celery.AsyncResult(task_id)
if async_result.state == 'SUBMITTED':
return jsonify({'message': 'ETLs are still running.'}), 202
elif data_state is not None and data_state['etl_state'] == 'SUCCESS':
elif async_result.state == 'SUCCESS':
continue
else:
error = "One or more ETLs failed or has unknown status. " \
......@@ -138,7 +132,7 @@ def get_state_data(state_id: UUID) -> Tuple[Response, int]:
return jsonify({'error': error}), 403
# replace task ids in state with the ids of the freshly loaded data
for i, task_id in enumerate(meta_state['task_ids']):
meta_state['state'] = re.sub(pattern=task_id,
repl=session['state_access'][i],
string=meta_state['state'])
return jsonify({'state': meta_state}), 200
state = re.sub(pattern=task_id,
repl=session['state_access'][state_id][i],
string=state)
return jsonify({'state': json.loads(state)}), 200
......@@ -6,7 +6,8 @@ from uuid import UUID, uuid4
import flask
import pytest
from fractalis import redis, sync
from fractalis import redis, sync, celery
from fractalis.data.etlhandler import ETLHandler
# noinspection PyMissingOrEmptyDocstring,PyMissingTypeHints
......@@ -45,175 +46,263 @@ class TestState:
assert 'error' in body
assert 'could not be found in redis' in body['error']
def test_400_if_task_id_in_redis_but_no_data_state(self, test_client):
def test_save_state_saves_and_returns(self, test_client):
payload = {
'state': {'abc': '$123$'},
'state': {'test': ['$123$']},
'handler': 'test',
'server': 'localfoo'
}
redis.set('data:123', '')
redis.set(name='data:123',
value=json.dumps({'meta': {'descriptor': 'foo'}}))
rv = test_client.post('/state', data=flask.json.dumps(payload))
body = flask.json.loads(rv.get_data())
assert 400 == rv.status_code, body
assert 'error' in body
assert 'no valid data state' in body['error']
assert 201 == rv.status_code, body
assert UUID(body['state_id'])
meta_state = json.loads(redis.get('state:{}'.format(body['state_id'])))
state = json.loads(meta_state['state'])
assert ['$123$'] == state.get('test')
def test_save_state_saves_and_returns(self, test_client):
def test_save_state_discards_duplicates(self, test_client):
payload = {
'state': {'test': ['$123$']},
'state': {'test': ['$123$', '$123$', '$456$']},
'handler': 'test',
'server': 'localfoo'
}
redis.set('data:123', json.dumps({'meta': {'descriptor': 'foo'}}))
rv = test_client.post('/state',
data=flask.json.dumps(payload))
redis.set(name='data:123',
value=json.dumps({'meta': {'descriptor': 'foo'}}))
redis.set(name='data:456',
value=json.dumps({'meta': {'descriptor': 'bar'}}))
rv = test_client.post('/state', data=flask.json.dumps(payload))
body = flask.json.loads(rv.get_data())
assert 201 == rv.status_code, body
assert UUID(body['state_id'])
meta_state = json.loads(redis.get('state:{}'.format(body['state_id'])))
assert 'test' in meta_state['state']
assert ['$123$'] == json.loads(meta_state['state'])['test']
assert len(meta_state['task_ids']) == 2
assert len(meta_state['descriptors']) == 2
assert '123' in meta_state['task_ids']
assert '456' in meta_state['task_ids']
assert 'foo' in meta_state['descriptors']
assert 'bar' in meta_state['descriptors']
def test_400_if_payload_schema_incorrect_1(self, test_client):
payload = {
'state': {'test': ['$123$']},
'server': 'localfoo'
}
rv = test_client.post('/state', data=flask.json.dumps(payload))
assert 400 == rv.status_code
def test_404_if_request_invalid_state_id(self, test_client):
rv = test_client.post(
'/state/{}'.format(str(uuid4())), data=flask.json.dumps(
{'auth': {'token': ''}}))
rv = test_client.post('/state/{}'.format(str(uuid4())),
data=flask.json.dumps({'auth': {'token': ''}}))
assert 404 == rv.status_code
body = flask.json.loads(rv.get_data())
assert 'error' in body
assert 'not find state associated with id' in body['error']
def test_404_if_state_id_is_no_uuid(self, test_client):
rv = test_client.post('/state/123')
assert 404 == rv.status_code
def test_400_if_payload_schema_incorrect(self, test_client):
def test_400_if_payload_schema_incorrect_2(self, test_client):
payload = {
'auth': {}
}
rv = test_client.post('/state/{}'.format(str(uuid4())),
data=flask.json.dumps({'foo': 123}))
data=flask.json.dumps(payload))
assert 400 == rv.status_code
def test_error_if_task_id_is_no_etl_id(self, test_client):
payload = {
'state': {'foo': '$123$'},
def test_request_state_acces_works(self, test_client):
meta_state = {
'state': {'foo': ['$123$', '$456$']},
'handler': 'test',
'server': 'localfoo',
'descriptors': ''
'task_ids': ['123', '456'],
'descriptors': [
{'data_type': 'default'},
{'data_type': 'default', 'foo': 'bar'}
],
}
uuid = str(uuid4())
redis.set(name='state:{}'.format(uuid),
value=json.dumps(payload))
value=json.dumps(meta_state))
with test_client.session_transaction() as sess:
assert not sess['data_tasks']
assert not sess['state_access']
rv = test_client.post('/state/{}'.format(uuid),
data=flask.json.dumps({'auth': {'token': ''}}))
body = flask.json.loads(rv.get_data())
assert 403 == rv.status_code, body
assert 'error' in body
assert 'data task ids are missing' in body['error']
assert not redis.exists('state:{}'.format(uuid))
assert 202 == rv.status_code, body
assert not body
with test_client.session_transaction() as sess:
assert len(sess['data_tasks']) == 2
assert len(sess['state_access']) == 1
key = list(sess['state_access'].keys())[0]
assert len(sess['state_access'][key]) == 2
assert sess['data_tasks'][0] in sess['state_access'][key]
assert sess['data_tasks'][1] in sess['state_access'][key]
assert meta_state['task_ids'][0] not in sess['state_access'][key]
assert meta_state['task_ids'][1] not in sess['state_access'][key]
def test_202_create_valid_state_if_valid_conditions(self, test_client):
payload = {
'state': {'foo': '$123$'},
def test_request_state_access_reuses_duplicate(
self, test_client):
meta_state = {
'state': {'foo': ['$123$', '$456$']},
'handler': 'test',
'server': 'localfoo',
'descriptors': ''
'task_ids': ['123', '456'],
'descriptors': [
{'data_type': 'default'},
{'data_type': 'default'}
],
}
uuid = str(uuid4())
redis.set(name='data:123',
value=json.dumps(
{'meta': {'descriptor': {'data_type': 'default'}}}))
redis.set(name='state:{}'.format(uuid),
value=json.dumps(payload))
rv = test_client.post(
'/state/{}'.format(uuid), data=flask.json.dumps(
{'auth': {'token': ''}}))
value=json.dumps(meta_state))
with test_client.session_transaction() as sess:
assert not sess['data_tasks']
assert not sess['state_access']
rv = test_client.post('/state/{}'.format(uuid),
data=flask.json.dumps({'auth': {'token': ''}}))
body = flask.json.loads(rv.get_data())
assert 202 == rv.status_code, body
assert not body
with test_client.session_transaction() as sess:
assert sess['data_tasks']
assert sess['state_access']
assert sess['data_tasks'] == sess['state_access'][uuid]
assert [UUID(uuid) for uuid in sess['data_tasks']]
assert len(sess['data_tasks']) == 1
assert len(sess['state_access']) == 1
key = list(sess['state_access'].keys())[0]
assert len(sess['state_access'][key]) == 1
assert sess['data_tasks'] == sess['state_access'][key]
assert meta_state['task_ids'][0] != sess['state_access'][key][0]
def test_404_if_get_non_existing_state(self, test_client):
def test_request_state_reuses_previous_etls_but_only_in_own_scope(
self, test_client, monkeypatch):
descriptor_1 = {'data_type': 'default', 'id': 1}
descriptor_2 = {'data_type': 'default', 'id': 2}
handler = 'test'
server = 'localfoo'
meta_state = {
'state': {'foo': ['$123$', '$456$']},
'handler': handler,
'server': server,
'task_ids': ['123', '456'],
'descriptors': [
descriptor_1,
descriptor_2
],
}
uuid = str(uuid4())
rv = test_client.get('/state/{}'.format(uuid))
body = flask.json.loads(rv.get_data())
assert 404 == rv.status_code
assert 'error' in body
assert 'Cannot get state.' in body['error']
redis.set(name='state:{}'.format(uuid),
value=json.dumps(meta_state))
etlhandler = ETLHandler.factory(handler=handler,
server=server,
auth={})
etlhandler.create_redis_entry(task_id='123',
file_path='',
descriptor=descriptor_1,
data_type='')
etlhandler.create_redis_entry(task_id='456',
file_path='',
descriptor=descriptor_2,
data_type='')
with test_client.session_transaction() as sess:
sess['data_tasks'] = ['456']
def test_404_if_get_non_uuid_state(self, test_client):
rv = test_client.get('/state/123')
assert 404 == rv.status_code
class FakeAsyncResult:
def __init__(self, *args, **kwargs):
self.state = 'SUCCESS'
self.id = args[0]
def test_404_if_get_not_previously_self_requested_state(self, test_client):
uuid = str(uuid4())
payload = {
'state': '${}$'.format(uuid),
'handler': 'test',
'server': 'localfoo'
}
redis.set(name='data:{}'.format(uuid),
value=json.dumps(
{'meta': {'descriptor': {'data_type': 'default'}}}))
rv = test_client.post('/state',
data=flask.json.dumps(payload))
body = flask.json.loads(rv.get_data())
assert 201 == rv.status_code, body
state_id = body['state_id']
rv = test_client.post('/state/{}'.format(state_id),
def get(self, *args, **kwargs):
pass
monkeypatch.setattr(celery, 'AsyncResult', FakeAsyncResult)
rv = test_client.post('/state/{}'.format(uuid),
data=flask.json.dumps({'auth': {'token': ''}}))
body = flask.json.loads(rv.get_data())
assert 202 == rv.status_code, body
assert not body
with test_client.session_transaction() as sess:
del sess['state_access'][state_id]
rv = test_client.get('/state/{}'.format(state_id))
assert len(sess['data_tasks']) == 2
key = list(sess['state_access'].keys())[0]
assert len(sess['state_access'][key]) == 2
assert meta_state['task_ids'][0] not in sess['state_access'][key]
assert meta_state['task_ids'][1] in sess['state_access'][key]
def test_get_state_data_404_if_not_requested_before(self, test_client):
rv = test_client.get('/state/{}'.format(str(uuid4())))
assert 404 == rv.status_code
body = flask.json.loads(rv.get_data())
assert 404 == rv.status_code, body
assert 'error' in body
assert 'Cannot get state.' in body['error']
assert 'Cannot get state' in body.get('error')
def test_403_if_etl_fails(self, test_client):
payload = {
'state': '$123$',
'handler': 'test',
'server': 'localfoo'
def test_get_state_with_replaced_ids_if_all_tasks_succeed(
self, test_client, monkeypatch):
meta_state = {
'state': {'foo': ['$123$', '$456$']},
'task_ids': ['123', '456'],
}
redis.set(name='data:123',
value=json.dumps(
{'meta': {'descriptor': {'data_type': 'default'}}}))
rv = test_client.post('/state', data=flask.json.dumps(payload))
body = flask.json.loads(rv.get_data())
state_id = body['state_id']
test_client.post('/state/{}'.format(state_id),
data=flask.json.dumps({'auth': {'token': 'fail'}}))
rv = test_client.get('/state/{}?wait=1'.format(state_id))
uuid = str(uuid4())
redis.set(name='state:{}'.format(uuid),
value=json.dumps(meta_state))
with test_client.session_transaction() as sess:
sess['state_access'][uuid] = ['abc', 'efg']
class FakeAsyncResult:
def __init__(self, *args, **kwargs):
self.state = 'SUCCESS'
self.id = args[0]
def get(self, *args, **kwargs):
pass
monkeypatch.setattr(celery, 'AsyncResult', FakeAsyncResult)
rv = test_client.get('/state/{}'.format(uuid))
body = flask.json.loads(rv.get_data())
assert 403 == rv.status_code, body
assert 'error' in body
assert 'ETLs failed' in body['error']
assert 200 == rv.status_code, body
assert body['state']['foo'][0] == '$abc$'
assert body['state']['foo'][1] == '$efg$'
def test_202_then_200_for_running_etl(self, test_client):
payload = {
'state': '$123$',
'handler': 'test',
'server': 'localfoo'
def test_get_state_get_message_if_not_all_tasks_finished(
self, test_client, monkeypatch):
meta_state = {
'state': {'foo': ['$123$', '$456$']},
'task_ids': ['123', '456'],
}
redis.set(name='data:123', value=json.dumps(
{'meta': {'descriptor': {'data_type': 'default'}}}))
rv = test_client.post('/state', data=flask.json.dumps(payload))
body = flask.json.loads(rv.get_data())
state_id = body['state_id']
test_client.post('/state/{}'.format(state_id),
data=flask.json.dumps({'auth': {'token': ''}}))
rv = test_client.get('/state/{}'.format(state_id))
uuid = str(uuid4())
redis.set(name='state:{}'.format(uuid),
value=json.dumps(meta_state))
with test_client.session_transaction() as sess:
sess['state_access'][uuid] = ['abc', 'efg']
class FakeAsyncResult:
def __init__(self, *args, **kwargs):
self.state = 'SUBMITTED'
self.id = args[0]
def get(self, *args, **kwargs):
pass
monkeypatch.setattr(celery, 'AsyncResult', FakeAsyncResult)
rv = test_client.get('/state/{}'.format(uuid))
body = flask.json.loads(rv.get_data())
assert 202 == rv.status_code, body
assert 'message' in body
assert 'ETLs are still running.' in body['message']
rv = test_client.get('/state/{}?wait=1'.format(state_id))
assert 'still running' in body.get('message')
def test_get_state_refuse_if_one_task_fails(
self, test_client, monkeypatch):
meta_state = {
'state': {'foo': ['$123$', '$456$']},
'task_ids': ['123', '456'],
}
uuid = str(uuid4())
redis.set(name='state:{}'.format(uuid),
value=json.dumps(meta_state))
with test_client.session_transaction() as sess:
sess['state_access'][uuid] = ['abc', 'efg']
class FakeAsyncResult:
def __init__(self, *args, **kwargs):
self.state = 'FAILURE'
self.id = args[0]
def get(self, *args, **kwargs):
pass
monkeypatch.setattr(celery, 'AsyncResult', FakeAsyncResult)
rv = test_client.get('/state/{}'.format(uuid))
body = flask.json.loads(rv.get_data())
assert 200 == rv.status_code, body
assert 'state' in body
assert '$123$' in body['state']
assert 403 == rv.status_code, body
assert 'no access' in body.get('error')
......@@ -35,9 +35,9 @@ class TestAnalyticsTask:
arg1 = '${"id": 123, "filters": {"foo": [1,2]}}$'
arg2 = '$123$'
data_task_id, filters = self.task.parse_value(arg1)
assert data_task_id == 123
assert data_task_id == '123'
assert 'foo' in filters
assert filters['foo'] == [1, 2]
data_task_id, filters = self.task.parse_value(arg2)
assert data_task_id == 123
assert data_task_id == '123'
assert not filters
......@@ -193,7 +193,8 @@ class TestETLHandler:
data_tasks=['123'], descriptor=descriptor)
assert task_id is None
def test_find_duplicate_limits_search_to_data_tasks(self, monkeypatch, redis):
def test_find_duplicate_limits_search_to_data_tasks(
self, monkeypatch, redis):
descriptor = {'a': {'b': 3}, 'c': 4}
self.etlhandler.create_redis_entry(task_id='123',
file_path='',
......@@ -278,6 +279,7 @@ class TestETLHandler:
task_ids = self.etlhandler.handle(descriptors=[descriptor, descriptor],
data_tasks=[],
use_existing=False)
assert len(task_ids) == 2
assert task_ids[0] != task_ids[1]
assert len(redis.keys('data:*')) == 1
......@@ -287,5 +289,5 @@ class TestETLHandler:
task_ids = self.etlhandler.handle(descriptors=[descriptor, descriptor],
data_tasks=[],
use_existing=True)
assert task_ids[0] == task_ids[1]
assert len(task_ids) == 1
assert len(redis.keys('data:*')) == 1
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment