Commit 6b6cd9e7 authored by Sascha Herzinger's avatar Sascha Herzinger
Browse files

Fixing most tests and some code, but not everything

parent 90e7671c
Pipeline #2091 failed with stage
in 3 minutes and 35 seconds
......@@ -70,9 +70,11 @@ def get_task_details(task_id: UUID) -> Tuple[Response, int]:
async_result = celery.AsyncResult(task_id)
if wait:
async_result.get(propagate=False)
result = async_result.result
if isinstance(result, Exception): # Exception -> str
result = "{}: {}".format(type(result).__name__, str(result))
logger.debug("Task found and has access. Sending response.")
return jsonify({'state': async_result.state,
'result': async_result.result}), 200
return jsonify({'state': async_result.state, 'result': result}), 200
@analytics_blueprint.route('/<uuid:task_id>', methods=['DELETE'])
......
......@@ -73,8 +73,11 @@ def get_all_data() -> Tuple[Response, int]:
# remove internal information from response
del data_state['file_path']
# add additional information to response
result = async_result.result
if isinstance(result, Exception): # Exception -> str
result = "{}: {}".format(type(result).__name__, str(result))
data_state['etl_message'] = result
data_state['etl_state'] = async_result.state
data_state['etl_message'] = async_result.result
data_states.append(data_state)
logger.debug("Data states collected. Sending response.")
return jsonify({'data_states': data_states}), 200
......@@ -89,8 +92,14 @@ def delete_data(task_id: str) -> Tuple[Response, int]:
"""
logger.debug("Received DELETE request on /data/task_id.")
wait = request.args.get('wait') == '1'
if task_id in session['data_tasks']:
session['data_tasks'].remove(task_id)
if task_id not in session['data_tasks']:
error = "Task ID '{}' not found in session. " \
"Refusing access.".format(task_id)
logger.warning(error)
return jsonify({'error': error}), 403
session['data_tasks'].remove(task_id)
# possibly dangerous: http://stackoverflow.com/a/29627549
celery.control.revoke(task_id, terminate=True, signal='SIGUSR1', wait=wait)
async_result = remove_data.delay(task_id)
if wait:
async_result.get(propagate=False)
......@@ -107,6 +116,9 @@ def delete_all_data() -> Tuple[Response, int]:
wait = request.args.get('wait') == '1'
for task_id in session['data_tasks']:
async_result = remove_data.delay(task_id)
# possibly dangerous: http://stackoverflow.com/a/29627549
celery.control.revoke(task_id, terminate=True,
signal='SIGUSR1', wait=wait)
if wait:
async_result.get(propagate=False)
session['data_tasks'] = []
......
......@@ -109,7 +109,8 @@ class ETL(Task, metaclass=abc.ABCMeta):
value = redis.get(name='data:{}'.format(self.request.id))
data_state = json.loads(value.decode('utf-8'))
data_state['loaded'] = True
redis.set(name='data:{}'.format(self.request.id), value=data_state)
redis.set(name='data:{}'.format(self.request.id),
value=json.dumps(data_state))
def run(self, server: str, token: str,
descriptor: dict, file_path: str) -> None:
......
import time
import pandas as pd
import numpy as np
......@@ -15,6 +17,7 @@ class RandomDFETL(ETL):
if 'fail' in token:
raise Exception('Throwing because I was told to.')
fake_raw_data = np.random.randn(10, 5)
time.sleep(0.5)
return fake_raw_data
def transform(self, raw_data):
......
......@@ -4,22 +4,44 @@ db and the file system.
"""
import os
import json
import logging
from shutil import rmtree
from fractalis import redis, app, celery
logger = logging.getLogger(__name__)
@celery.task
def remove_data(task_id: str) -> None:
pass
"""Remove all traces of any data associated with the given id. That includes
redis and the file system.
:param task_id: The id associated with a data state
"""
key = 'data:{}'.format(task_id)
value = redis.get(key)
celery.control.revoke(task_id, terminate=True, signal='SIGUSR1')
redis.delete(key)
if value:
data_state = json.loads(value.decode('utf-8'))
remove_file.apply(args=[data_state['file_path']])
else:
logger.warning("Can't delete file for task id '{}',because there is "
"no associated entry in Redis.".format(task_id))
@celery.task
def remove_file(file_path: str) -> None:
"""Remove the file for the given file path.
:param file_path: Path of file to remove.
"""
try:
os.remove(file_path)
except FileNotFoundError:
pass
logger.warning("Attempted to remove file '{}', "
"but it does not exist.".format(file_path))
@celery.task
......@@ -27,6 +49,7 @@ def cleanup_all() -> None:
"""Reset redis and the filesystem. This is only useful for testing and
should !!!NEVER!!! be used otherwise.
"""
celery.control.purge()
redis.flushall()
tmp_dir = app.config['FRACTALIS_TMP_DIR']
if os.path.exists(tmp_dir):
......
......@@ -2,16 +2,15 @@ import json
import pytest
import numpy as np
import pandas as pd
from fractalis.analytics.tasks.correlation.main import CorrelationJob
from fractalis.analytics.tasks.correlation.main import CorrelationTask
class TestCorrelation:
@pytest.mark.skip(reason="Not implemented yet.")
def test_returns_valid_response(self):
job = CorrelationJob()
job = CorrelationTask()
x = np.random.rand(10).tolist()
y = np.random.rand(10).tolist()
result = job.main(x=x, y=y, ids=[])
......
......@@ -11,6 +11,7 @@ import pytest
from fractalis import sync, redis
# noinspection PyMissingOrEmptyDocstring,PyMissingTypeHints
class TestAnalytics:
@pytest.fixture(scope='function')
......@@ -20,9 +21,6 @@ class TestAnalytics:
app.testing = True
with app.test_client() as test_client:
yield test_client
# cleanup running jobs after each test
for job_id in flask.session['analytics_jobs']:
test_client.delete('/analytics/{}?wait=1'.format(job_id))
sync.cleanup_all()
@pytest.fixture(scope='function')
......@@ -42,94 +40,76 @@ class TestAnalytics:
def test_new_resource_created(self, test_client):
rv = test_client.post('/analytics', data=flask.json.dumps(dict(
job_name='add_test_job',
task_name='add_test_task',
args={'a': 1, 'b': 2}
)))
body = flask.json.loads(rv.get_data())
assert rv.status_code == 201, body
new_url = '/analytics/{}'.format(body['job_id'])
assert uuid.UUID(body['job_id'])
new_url = '/analytics/{}'.format(body['task_id'])
assert uuid.UUID(body['task_id'])
assert test_client.head(new_url).status_code == 200
@pytest.fixture(scope='function',
params=[{'job_name': 'i_dont_exist_job',
params=[{'task_name': 'i_dont_exist_task',
'args': {'a': 1, 'b': 2}},
{'job_name': '',
{'task_name': '',
'args': {'a': 1, 'b': 2}}])
def bad_request(self, test_client, request):
return test_client.post('/analytics',
data=flask.json.dumps(request.param))
def test_400_if_POST_body_invalid(self, bad_request):
def test_400_if_post_body_invalid(self, bad_request):
assert bad_request.status_code == 400
def test_no_conflict_when_running_job_twice(self, test_client):
def test_no_conflict_when_running_task_twice(self, test_client):
rv1 = test_client.post('/analytics', data=flask.json.dumps(dict(
job_name='nothing_test_job',
task_name='nothing_test_task',
args={'seconds': 4}
)))
assert rv1.status_code == 201
rv2 = test_client.post('/analytics', data=flask.json.dumps(dict(
job_name='nothing_test_job',
task_name='nothing_test_task',
args={'seconds': 4}
)))
assert rv2.status_code == 201
body1 = flask.json.loads(rv1.get_data())
new_url1 = '/analytics/{}?wait=0'.format(body1['job_id'])
new_url1 = '/analytics/{}?wait=0'.format(body1['task_id'])
new_response1 = test_client.get(new_url1)
assert new_response1.status_code == 200
new_body1 = flask.json.loads(new_response1.get_data())
assert new_body1['state'] != 'FAILURE'
body2 = flask.json.loads(rv2.get_data())
new_url2 = '/analytics/{}?wait=0'.format(body2['job_id'])
new_url2 = '/analytics/{}?wait=0'.format(body2['task_id'])
new_response2 = test_client.get(new_url2)
assert new_response2.status_code == 200
new_body2 = flask.json.loads(new_response2.get_data())
assert new_body2['state'] != 'FAILURE'
def test_404_if_no_session_auth(self, test_client, small_data_post):
rv = small_data_post(random=False)
body = flask.json.loads(rv.get_data())
assert rv.status_code == 201, body
def test_403_if_no_session_auth(self, test_client, small_data_post):
small_data_post(random=False)
with test_client.session_transaction() as sess:
assert len(sess['data_tasks']) == 1
task_id = sess['data_tasks'][0]
with test_client.session_transaction() as sess:
sess['data_ids'] = []
sess['data_tasks'] = []
rv = test_client.post('/analytics', data=flask.json.dumps(dict(
job_name='sum_df_test_job',
args={'a': '${}$'.format(body['data_ids'][0])}
task_name='sum_df_test_task',
args={'a': '${}$'.format(task_id)}
)))
body = flask.json.loads(rv.get_data())
assert rv.status_code == 201, body
url = '/analytics/{}?wait=1'.format(body['job_id'])
url = '/analytics/{}?wait=1'.format(body['task_id'])
rv = test_client.get(url)
body = flask.json.loads(rv.get_data())
assert rv.status_code == 200, body
assert body['state'] == 'FAILURE', body
assert 'KeyError' in body['result'], body
assert 'PermissionError' in body['result'], body
def test_403_if_no_data_access_auth(self, test_client):
rv = test_client.post(
'/data?wait=1', data=flask.json.dumps(dict(
handler='test',
server='localhost:1234',
auth={'token': '7746391376142672192764'},
descriptors=[
{
'data_type': 'default',
'concept': 'concept'
}
]
)))
body = flask.json.loads(rv.get_data())
assert rv.status_code == 201, body
assert len(body['data_ids']) == 1
data_id = body['data_ids'][0]
assert redis.get('data:{}'.format(data_id))
with test_client.session_transaction() as sess:
sess.sid = str(uuid4()) # we are someone else now
rv = test_client.post(
test_client.post(
'/data?wait=1', data=flask.json.dumps(dict(
handler='test',
server='localhost:1234',
......@@ -141,17 +121,16 @@ class TestAnalytics:
}
]
)))
body = flask.json.loads(rv.get_data())
assert rv.status_code == 201, body
assert len(body['data_ids']) == 1
assert body['data_ids'][0] == data_id
with test_client.session_transaction() as sess:
assert len(sess['data_tasks']) == 1
task_id = sess['data_tasks'][0]
rv = test_client.post('/analytics?wait=1', data=flask.json.dumps(dict(
job_name='sum_df_test_job',
args={'a': '${}$'.format(data_id)}
task_name='sum_df_test_task',
args={'a': '${}$'.format(task_id)}
)))
assert rv.status_code == 201
body = flask.json.loads(rv.get_data())
url = '/analytics/{}?wait=1'.format(body['job_id'])
url = '/analytics/{}?wait=1'.format(body['task_id'])
rv = test_client.get(url)
body = flask.json.loads(rv.get_data())
assert rv.status_code == 200
......@@ -160,53 +139,53 @@ class TestAnalytics:
def test_resource_deleted(self, test_client):
rv = test_client.post('/analytics', data=flask.json.dumps(dict(
job_name='add_test_job',
task_name='add_test_task',
args={'a': 1, 'b': 1}
)))
assert rv.status_code == 201
body = flask.json.loads(rv.get_data())
new_url = '/analytics/{}?wait=1'.format(body['job_id'])
new_url = '/analytics/{}?wait=1'.format(body['task_id'])
assert test_client.head(new_url).status_code == 200
assert test_client.delete(new_url).status_code == 200
assert test_client.head(new_url).status_code == 404
assert test_client.head(new_url).status_code == 403
def test_404_if_deleting_non_existing_resource(self, test_client):
def test_403_if_deleting_non_existing_resource(self, test_client):
rv = test_client.delete('/analytics/{}?wait=1'.format(str(uuid4())))
assert rv.status_code == 404
assert rv.status_code == 403
def test_running_resource_deleted(self, test_client):
rv = test_client.post('/analytics', data=flask.json.dumps(dict(
job_name='nothing_test_job',
task_name='nothing_test_task',
args={'seconds': 4}
)))
assert rv.status_code == 201
body = flask.json.loads(rv.get_data())
new_url = '/analytics/{}?wait=1'.format(body['job_id'])
new_url = '/analytics/{}?wait=1'.format(body['task_id'])
assert test_client.head(new_url).status_code == 200
assert test_client.delete(new_url).status_code == 200
assert test_client.head(new_url).status_code == 404
assert test_client.head(new_url).status_code == 403
def test_404_if_deleting_without_auth(self, test_client):
def test_403_if_deleting_without_auth(self, test_client):
rv = test_client.post('/analytics', data=flask.json.dumps(dict(
job_name='nothing_test_job',
task_name='nothing_test_task',
args={'seconds': 4}
)))
assert rv.status_code == 201
time.sleep(1)
body = flask.json.loads(rv.get_data())
new_url = '/analytics/{}?wait=1'.format(body['job_id'])
new_url = '/analytics/{}?wait=1'.format(body['task_id'])
with test_client.session_transaction() as sess:
sess['analytics_jobs'] = []
assert test_client.delete(new_url).status_code == 404
sess['analytic_tasks'] = []
assert test_client.delete(new_url).status_code == 403
def test_status_contains_result_if_finished(self, test_client):
rv = test_client.post('/analytics', data=flask.json.dumps(dict(
job_name='add_test_job',
task_name='add_test_task',
args={'a': 1, 'b': 2}
)))
assert rv.status_code == 201
body = flask.json.loads(rv.get_data())
new_url = '/analytics/{}?wait=1'.format(body['job_id'])
new_url = '/analytics/{}?wait=1'.format(body['task_id'])
new_response = test_client.get(new_url)
assert new_response.status_code == 200
new_body = flask.json.loads(new_response.get_data())
......@@ -215,65 +194,65 @@ class TestAnalytics:
def test_status_result_empty_if_not_finished(self, test_client):
rv = test_client.post('/analytics', data=flask.json.dumps(dict(
job_name='nothing_test_job',
task_name='nothing_test_task',
args={'seconds': 4}
)))
time.sleep(1)
body = flask.json.loads(rv.get_data())
new_url = '/analytics/{}?wait=0'.format(body['job_id'])
new_url = '/analytics/{}?wait=0'.format(body['task_id'])
new_response = test_client.get(new_url)
assert new_response.status_code == 200
new_body = flask.json.loads(new_response.get_data())
assert not new_body['result']
assert new_body['state'] == 'PENDING'
def test_correct_response_if_job_fails(self, test_client):
def test_correct_response_if_task_fails(self, test_client):
rv = test_client.post('/analytics', data=flask.json.dumps(dict(
job_name='div_test_job',
task_name='div_test_task',
args={'a': 2, 'b': 0}
)))
assert rv.status_code == 201
body = flask.json.loads(rv.get_data())
new_url = '/analytics/{}?wait=1'.format(body['job_id'])
new_url = '/analytics/{}?wait=1'.format(body['task_id'])
new_response = test_client.get(new_url)
assert new_response.status_code == 200
new_body = flask.json.loads(new_response.get_data())
assert new_body['state'] == 'FAILURE'
assert 'ZeroDivisionError' in new_body['result']
def test_404_if_status_non_existing_resource(self, test_client):
def test_403_if_status_non_existing_resource(self, test_client):
assert test_client.get('/analytics/{}?wait=1'
.format(str(uuid4()))).status_code == 404
.format(str(uuid4()))).status_code == 403
def test_404_if_status_without_auth(self, test_client):
def test_403_if_status_without_auth(self, test_client):
rv = test_client.post('/analytics', data=flask.json.dumps(dict(
job_name='nothing_test_job',
task_name='nothing_test_task',
args={'seconds': 4}
)))
assert rv.status_code == 201
body = flask.json.loads(rv.get_data())
new_url = '/analytics/{}?wait=0'.format(body['job_id'])
new_url = '/analytics/{}?wait=0'.format(body['task_id'])
with test_client.session_transaction() as sess:
sess['analytics_jobs'] = []
assert test_client.get(new_url).status_code == 404
sess['analytic_tasks'] = []
assert test_client.get(new_url).status_code == 403
def test_float_when_summing_up_df(self, test_client, small_data_post):
data_ids = []
data_tasks = []
data_rv1 = small_data_post(random=True, wait=1)
data_body1 = flask.json.loads(data_rv1.get_data())
assert data_rv1.status_code == 201, data_body1
data_ids += data_body1['data_ids']
data_tasks += data_body1['data_tasks']
assert len(data_ids) == 1
assert len(data_tasks) == 1
rv = test_client.post('/analytics', data=flask.json.dumps(dict(
job_name='sum_df_test_job',
args={'a': '${}$'.format(data_ids[0])}
task_name='sum_df_test_task',
args={'a': '${}$'.format(data_tasks[0])}
)))
assert rv.status_code == 201
body = flask.json.loads(rv.get_data())
new_url = '/analytics/{}?wait=1'.format(body['job_id'])
new_url = '/analytics/{}?wait=1'.format(body['task_id'])
new_response = test_client.get(new_url)
new_body = flask.json.loads(new_response.get_data())
assert new_response.status_code == 200, new_body
......@@ -282,12 +261,12 @@ class TestAnalytics:
def test_exception_if_result_not_json(self, test_client):
rv = test_client.post('/analytics', data=flask.json.dumps(dict(
job_name='invalid_json_job',
task_name='invalid_json_task',
args={'a': 1}
)))
assert rv.status_code == 201
body = flask.json.loads(rv.get_data())
new_url = '/analytics/{}?wait=1'.format(body['job_id'])
new_url = '/analytics/{}?wait=1'.format(body['task_id'])
new_response = test_client.get(new_url)
assert new_response.status_code == 200
new_body = flask.json.loads(new_response.get_data())
......@@ -296,12 +275,12 @@ class TestAnalytics:
def test_exception_if_result_not_dict(self, test_client):
rv = test_client.post('/analytics', data=flask.json.dumps(dict(
job_name='no_dict_job',
task_name='no_dict_task',
args={'a': 1}
)))
assert rv.status_code == 201
body = flask.json.loads(rv.get_data())
new_url = '/analytics/{}?wait=1'.format(body['job_id'])
new_url = '/analytics/{}?wait=1'.format(body['task_id'])
new_response = test_client.get(new_url)
assert new_response.status_code == 200
new_body = flask.json.loads(new_response.get_data())
......
......@@ -15,11 +15,12 @@ class TestData:
@pytest.fixture(scope='function')
def test_client(self):
sync.cleanup_all.apply()
from fractalis import app
app.testing = True
with app.test_client() as test_client:
yield test_client
sync.cleanup_all()
sync.cleanup_all.apply()
@staticmethod
def small_load(fail=False):
......@@ -57,14 +58,15 @@ class TestData:
@pytest.fixture(scope='function', params=['small', 'big'])
def payload(self, request):
load = self.small_load() if request == 'small' else self.big_load()
load = self.small_load() if request.param == 'small' \
else self.big_load()
return {'size': len(load['descriptors']),
'serialized': flask.json.dumps(load)}
@pytest.fixture(scope='function', params=['small', 'big'])
def faiload(self, request):
load = self.small_load(True) \
if request == 'small' else self.big_load(True)
load = self.small_load(True) if request.param == 'small' \
else self.big_load(True)
return {'size': len(load['descriptors']),
'serialized': flask.json.dumps(load)}
......@@ -179,7 +181,8 @@ class TestData:
self, test_client, payload):
data_dir = os.path.join(app.config['FRACTALIS_TMP_DIR'], 'data')
test_client.post('/data', data=payload['serialized'])
assert len(os.listdir(data_dir)) == 0
if os.path.exists(data_dir):
assert len(os.listdir(data_dir)) == 0
keys = redis.keys('data:*')
for key in keys:
value = redis.get(key)
......@@ -298,6 +301,20 @@ class TestData:
with test_client.session_transaction() as sess:
assert data_state['task_id'] not in sess['data_tasks']
def test_403_if_no_auth_on_delete(self, test_client, payload):
test_client.post('/data?wait=1', data=payload['serialized'])
with test_client.session_transaction() as sess:
sess['data_tasks'] = []
for key in redis.keys('data:*'):
value = redis.get(key)
data_state = json.loads(value.decode('utf-8'))
os.path.exists(data_state['file_path'])
rv = test_client.delete('/data/{}?wait=1'
.format(data_state['task_id']))
assert rv.status_code == 403
assert redis.exists(key)
assert os.path.exists(data_state['file_path'])
def test_valid_state_for_finished_etl_on_delete_all(
self, test_client, payload):
data_dir = os.path.join(app.config['FRACTALIS_TMP_DIR'], 'data')
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment