Commit 19788d26 authored by Sascha Herzinger's avatar Sascha Herzinger
Browse files

Expanded data_states and API to handle meta data

parent 72793e60
Pipeline #2270 failed with stage
in 8 minutes and 16 seconds
......@@ -3,6 +3,8 @@ image: python:latest
services:
- redis
- rabbitmq
- r-base
- bioconductor/release_base2
before_script:
- pip install -e . --default-timeout=180
......
......@@ -39,6 +39,32 @@ def create_data_task() -> Tuple[Response, int]:
return jsonify(''), 201
def get_data_state_for_task_id(task_id: str, wait: bool) -> dict:
"""Return data state associated with task id.
:param task_id: The id associated with the ETL task.
:param wait: If true and ETL is still running wait for it.
:return: Data state that has been stored in Redis.
"""
async_result = celery.AsyncResult(task_id)
if wait:
logger.debug("'wait' was set. Waiting for tasks to finish ...")
async_result.get(propagate=False)
value = redis.get('data:{}'.format(task_id))
if not value:
error = "Could not find data entry in " \
"Redis for task_id '{}'.".format(task_id)
logger.error(error)
return {}
data_state = json.loads(value)
# add additional information to data_state
result = async_result.result
if isinstance(result, Exception): # Exception -> str
result = "{}: {}".format(type(result).__name__, str(result))
data_state['etl_message'] = result
data_state['etl_state'] = async_result.state
return data_state
@data_blueprint.route('', methods=['GET'])
def get_all_data() -> Tuple[Response, int]:
"""Get information for all tasks that have been submitted in the lifetime
......@@ -49,31 +75,17 @@ def get_all_data() -> Tuple[Response, int]:
logger.debug("Received GET request on /data.")
wait = request.args.get('wait') == '1'
data_states = []
expired_entries = []
for task_id in session['data_tasks']:
async_result = celery.AsyncResult(task_id)
if wait:
logger.debug("'wait' was set. Waiting for tasks to finish ...")
async_result.get(propagate=False)
value = redis.get('data:{}'.format(task_id))
if not value:
error = "Could not find data entry in Redis for task_id: " \
"'{}'. The entry probably expired.".format(task_id)
logger.warning(error)
expired_entries.append(task_id)
data_state = get_data_state_for_task_id(task_id, wait)
if not data_state:
# remove expired data task id
session['data_tasks'].remove(task_id)
continue
data_state = json.loads(value)
# remove internal information from response
del data_state['file_path']
del data_state['meta']
# add additional information to response
result = async_result.result
if isinstance(result, Exception): # Exception -> str
result = "{}: {}".format(type(result).__name__, str(result))
data_state['etl_message'] = result
data_state['etl_state'] = async_result.state
data_states.append(data_state)
session['data_tasks'] = [x for x in session['data_tasks']
if x not in expired_entries]
logger.debug("Data states collected. Sending response.")
return jsonify({'data_states': data_states}), 200
......@@ -116,3 +128,21 @@ def delete_all_data() -> Tuple[Response, int]:
logger.debug("Successfully removed all data from session. "
"Sending response.")
return jsonify(''), 200
@data_blueprint.route('/meta/<string:task_id>', methods=['GET'])
def get_meta_information(task_id: str) -> Tuple[Response, int]:
"""Get meta information for given task id.
:return: meta information object stored in redis.
"""
logger.debug("Received GET request on /data/meta/task_id.")
wait = request.args.get('wait') == '1'
if task_id not in session['data_tasks']:
error = "Task ID '{}' not found in session. " \
"Refusing access.".format(task_id)
logger.warning(error)
return jsonify({'error': error}), 403
data_state = get_data_state_for_task_id(task_id, wait)
logger.debug("Successfully gather meta information. Sending response.")
return jsonify({'meta': data_state['meta']}), 200
......@@ -98,13 +98,22 @@ class ETL(Task, metaclass=abc.ABCMeta):
"""
pass
def update_redis(self):
"""Update data entry in redis to a loaded state. This means the
data can now be used for analysis.
def update_redis(self, data_frame):
"""Set redis entry to 'loaded' state to indicate that the user has
has read access. At this step we also set several meta information
that can be used for preview functionality that do not require all
data to be loaded.
:param data_frame: The extracted and transformed data.
"""
value = redis.get(name='data:{}'.format(self.request.id))
assert value is not None
data_state = json.loads(value)
features = data_frame.get('feature')
if features:
features = features.unique().tolist()
data_state['meta'] = {
'features': features
}
data_state['loaded'] = True
redis.setex(name='data:{}'.format(self.request.id),
value=json.dumps(data_state),
......@@ -125,7 +134,6 @@ class ETL(Task, metaclass=abc.ABCMeta):
ciphertext, tag = cipher.encrypt_and_digest(data)
with open(file_path, 'wb') as f:
[f.write(x) for x in (cipher.nonce, tag, ciphertext)]
self.update_redis()
def load(self, data_frame: DataFrame, file_path: str) -> None:
"""Load (save) the data to the file system.
......@@ -134,7 +142,6 @@ class ETL(Task, metaclass=abc.ABCMeta):
"""
os.makedirs(os.path.dirname(file_path), exist_ok=True)
data_frame.to_csv(file_path, index=False)
self.update_redis()
def run(self, server: str, token: str,
descriptor: dict, file_path: str,
......@@ -155,13 +162,13 @@ class ETL(Task, metaclass=abc.ABCMeta):
raw_data = self.extract(server, token, descriptor)
except Exception as e:
logger.exception(e)
raise RuntimeError("Data extraction failed.")
raise RuntimeError("Data extraction failed. {}".format(e))
logger.info("(T)ransforming data to Fractalis format.")
try:
data_frame = self.transform(raw_data, descriptor)
except Exception as e:
logger.exception(e)
raise RuntimeError("Data transformation failed.")
raise RuntimeError("Data transformation failed. {}".format(e))
if not isinstance(data_frame, DataFrame):
error = "transform() must return 'pandas.DataFrame', " \
"but returned '{}' instead.".format(type(data_frame))
......@@ -172,6 +179,7 @@ class ETL(Task, metaclass=abc.ABCMeta):
self.secure_load(data_frame, file_path)
else:
self.load(data_frame, file_path)
self.update_redis(data_frame)
except Exception as e:
logger.exception(e)
raise RuntimeError("Data loading failed.")
raise RuntimeError("Data loading failed. {}".format(e))
......@@ -89,7 +89,8 @@ class ETLHandler(metaclass=abc.ABCMeta):
'label': cls.make_label(descriptor),
'descriptor': descriptor,
'data_type': data_type,
'loaded': False
'meta': {},
'loaded': False,
}
redis.setex(name='data:{}'.format(task_id),
value=json.dumps(data_state),
......
......@@ -26,7 +26,7 @@ def get_field(server: str, data_set: str,
cookies=cookie,
timeout=60)
if r.status_code != 200:
error = "Data extraction failed. Target server responded with " \
error = "Target server responded with " \
"status code {}.".format(r.status_code)
logger.error(error)
raise ValueError(error)
......@@ -34,8 +34,8 @@ def get_field(server: str, data_set: str,
field_data = r.json()
except Exception as e:
logger.exception(e)
raise TypeError("Data extraction failed. Target server did not return "
"expected data. Possible authentication error.")
raise TypeError("Unexpected data format. "
"Possible authentication error.")
return field_data
......
......@@ -36,7 +36,7 @@ class HighdimETL(ETL):
timeout=2000)
if r.status_code != 200:
error = "Data extraction failed. Target server responded with " \
error = "Target server responded with " \
"status code {}.".format(r.status_code)
logger.error(error)
raise ValueError(error)
......@@ -44,8 +44,7 @@ class HighdimETL(ETL):
pass # TODO
except Exception as e:
logger.exception(e)
raise ValueError("Data extraction failed. "
"Got unexpected data format.")
raise ValueError("Got unexpected data format.")
def transform(self, raw_data: dict, descriptor: dict) -> DataFrame:
rows = []
......
......@@ -26,7 +26,7 @@ def extract_data(server: str, descriptor: dict, token: str) -> dict:
},
timeout=2000)
if r.status_code != 200:
error = "Data extraction failed. Target server responded with " \
error = "Target server responded with " \
"status code {}.".format(r.status_code)
logger.error(error)
raise ValueError(error)
......@@ -34,5 +34,4 @@ def extract_data(server: str, descriptor: dict, token: str) -> dict:
return r.json()
except Exception as e:
logger.exception(e)
raise ValueError("Data extraction failed. "
"Got unexpected data format.")
raise ValueError("Got unexpected data format.")
......@@ -35,7 +35,7 @@ class TestAdaHandler:
@staticmethod
def request_callback(request):
headers = {
'Set-Cookie': 'PLAY2AUTH_SESS_ID="foo-token"'}
'Set-Cookie': 'PLAY2AUTH_SESS_ID=foo-token'}
body = ''
return 200, headers, body
......
......@@ -161,6 +161,7 @@ class TestData:
assert 'descriptor' in data_state
assert 'data_type' in data_state
assert 'loaded' in data_state
assert 'meta' in data_state
assert not data_state['loaded']
def test_valid_redis_after_loaded_on_post(self, test_client, payload):
......@@ -175,6 +176,7 @@ class TestData:
assert 'descriptor' in data_state
assert 'data_type' in data_state
assert 'loaded' in data_state
assert 'meta' in data_state
assert data_state['loaded']
def test_valid_filesystem_before_loaded_on_post(
......@@ -230,6 +232,7 @@ class TestData:
body = flask.json.loads(rv.get_data())
for data_state in body['data_states']:
assert 'file_path' not in data_state
assert 'meta' not in data_state
assert data_state['etl_state'] == 'PENDING'
assert not data_state['etl_message']
assert 'descriptor' in data_state
......@@ -282,7 +285,7 @@ class TestData:
for key in redis.keys('data:*'):
value = redis.get(key)
data_state = json.loads(value)
os.path.exists(data_state['file_path'])
assert not os.path.exists(data_state['file_path'])
test_client.delete('/data/{}?wait=1'.format(data_state['task_id']))
assert not redis.exists(key)
assert not os.path.exists(data_state['file_path'])
......@@ -294,7 +297,7 @@ class TestData:
for key in redis.keys('data:*'):
value = redis.get(key)
data_state = json.loads(value)
os.path.exists(data_state['file_path'])
assert not os.path.exists(data_state['file_path'])
test_client.delete('/data/{}?wait=1'.format(data_state['task_id']))
assert not redis.exists(key)
assert not os.path.exists(data_state['file_path'])
......@@ -308,10 +311,12 @@ class TestData:
for key in redis.keys('data:*'):
value = redis.get(key)
data_state = json.loads(value)
os.path.exists(data_state['file_path'])
assert os.path.exists(data_state['file_path'])
rv = test_client.delete('/data/{}?wait=1'
.format(data_state['task_id']))
body = flask.json.loads(rv.get_data())
assert rv.status_code == 403
assert 'Refusing access.' in body['error']
assert redis.exists(key)
assert os.path.exists(data_state['file_path'])
......@@ -336,3 +341,39 @@ class TestData:
with pytest.raises(UnicodeDecodeError):
open(file_path, 'r').readlines()
app.config['FRACTALIS_ENCRYPT_CACHE'] = False
def test_valid_response_before_loaded_on_meta(self, test_client, payload):
test_client.post('/data', data=payload['serialized'])
for key in redis.keys('data:*'):
value = redis.get(key)
data_state = json.loads(value)
rv = test_client.get('/data/meta/{}'.format(data_state['task_id']))
body = flask.json.loads(rv.get_data())
assert rv.status_code == 200
assert not body.get('meta')
def test_valid_response_after_loaded_on_meta(self, test_client, payload):
test_client.post('/data?wait=1', data=payload['serialized'])
for key in redis.keys('data:*'):
value = redis.get(key)
data_state = json.loads(value)
rv = test_client.get('/data/meta/{}?wait=1'
.format(data_state['task_id']))
body = flask.json.loads(rv.get_data())
assert rv.status_code == 200
assert body.get('meta')
def test_403_if_no_auth_on_get_meta(self, test_client, payload):
test_client.post('/data?wait=1', data=payload['serialized'])
with test_client.session_transaction() as sess:
sess['data_tasks'] = []
for key in redis.keys('data:*'):
value = redis.get(key)
data_state = json.loads(value)
rv = test_client.get('/data/meta/{}?wait=1'
.format(data_state['task_id']))
body = flask.json.loads(rv.get_data())
assert rv.status_code == 403
assert 'Refusing access.' in body['error']
assert redis.exists(key)
assert os.path.exists(data_state['file_path'])
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment