controller.py 5.47 KB
Newer Older
Sascha Herzinger's avatar
Sascha Herzinger committed
1
2
3
4
5
6
7
8
9
10
11
"""The /state controller."""

import re
import json
import logging
from uuid import UUID, uuid4
from typing import Tuple

from flask import Blueprint, jsonify, Response, request, session

from fractalis import redis
12
from fractalis.validator import validate_json, validate_schema
Sascha Herzinger's avatar
Sascha Herzinger committed
13
14
15
from fractalis.analytics.task import AnalyticTask
from fractalis.data.etlhandler import ETLHandler
from fractalis.data.controller import get_data_state_for_task_id
16
from fractalis.state.schema import request_state_access_schema
Sascha Herzinger's avatar
Sascha Herzinger committed
17
18
19
20
21
22
23
24
25
26
27
28
29


state_blueprint = Blueprint('state_blueprint', __name__)
logger = logging.getLogger(__name__)


@state_blueprint.route('', methods=['POST'])
@validate_json
def save_state() -> Tuple[Response, int]:
    """Save given payload to redis, so it can be accessed later on.
    :return: UUID linked to the saved state.
    """
    logger.debug("Received POST request on /state.")
30
    payload = request.get_json(force=True)
31
    # check if task ids in payload are valid
32
    matches = re.findall('\$.+?\$', str(payload))
33
34
35
36
37
38
39
    if not matches:
        error = "This state cannot be saved because it contains no data " \
                "task ids. These are used to verify access to the state and " \
                "its potentially sensitive data."
        logger.error(error)
        return jsonify({'error': error}), 400
    for match in matches:
40
        task_id, _ = AnalyticTask.parse_value(match)
41
42
43
44
45
46
47
48
49
        value = redis.get('data:{}'.format(task_id))
        if value is None:
            error = "Data task id is {} could not be found in redis. " \
                    "State cannot be saved".format(task_id)
            logger.error(error)
            return jsonify({'error': error}), 400
        try:
            json.loads(value)['meta']['descriptor']
        except (ValueError, KeyError):
50
51
52
            error = "Task with id {} was found in redis but it represents " \
                    "no valid data state. " \
                    "State cannot be saved.".format(task_id)
53
            return jsonify({'error': error}), 400
Sascha Herzinger's avatar
Sascha Herzinger committed
54
    uuid = uuid4()
55
    redis.set(name='state:{}'.format(uuid), value=json.dumps(payload))
Sascha Herzinger's avatar
Sascha Herzinger committed
56
57
58
59
60
    logger.debug("Successfully saved data to redis. Sending response.")
    return jsonify({'state_id': uuid}), 201


@state_blueprint.route('/<uuid:state_id>', methods=['POST'])
61
62
@validate_json
@validate_schema(request_state_access_schema)
Sascha Herzinger's avatar
Sascha Herzinger committed
63
64
65
66
67
68
69
70
71
72
def request_state_access(state_id: UUID) -> Tuple[Response, int]:
    """Traverse through the state object linked to the given UUID and look for
    data ids. Then attempt to load the data into the current session to verify
    access.
    :param state_id: The id associated with the saved state.
    :return: See redirect target.
    """
    logger.debug("Received POST request on /state/<uuid:state_id>.")
    wait = request.args.get('wait') == '1'
    payload = request.get_json(force=True)
Sascha Herzinger's avatar
Sascha Herzinger committed
73
    state_id = str(state_id)
Sascha Herzinger's avatar
Sascha Herzinger committed
74
75
76
77
78
79
    value = redis.get('state:{}'.format(state_id))
    if not value:
        error = "Could not find state associated with id {}".format(state_id)
        logger.error(error)
        return jsonify({'error': error}), 404
    descriptors = []
80
81
    matches = re.findall('\$.+?\$', value)
    for match in matches:
82
        task_id, _ = AnalyticTask.parse_value(match)
Sascha Herzinger's avatar
Sascha Herzinger committed
83
84
        value = redis.get('data:{}'.format(task_id))
        if value is None:
85
            error = "The state with id {} exists, but one or more of the " \
Sascha Herzinger's avatar
Sascha Herzinger committed
86
87
                    "associated data task ids are missing. Hence this saved " \
                    "state is lost forever because access can no longer be " \
88
                    "verified. Deleting state..."
Sascha Herzinger's avatar
Sascha Herzinger committed
89
            logger.error(error)
Sascha Herzinger's avatar
Sascha Herzinger committed
90
            redis.delete('state:{}'.format(state_id))
91
            return jsonify({'error': error}), 403
Sascha Herzinger's avatar
Sascha Herzinger committed
92
93
        data_state = json.loads(value)
        descriptors.append(data_state['meta']['descriptor'])
Sascha Herzinger's avatar
Sascha Herzinger committed
94
95
96
    etl_handler = ETLHandler.factory(handler=payload['handler'],
                                     server=payload['server'],
                                     auth=payload['auth'])
97
    task_ids = etl_handler.handle(descriptors=descriptors, wait=wait)
Sascha Herzinger's avatar
Sascha Herzinger committed
98
99
100
101
102
    session['data_tasks'] += task_ids
    session['data_tasks'] = list(set(session['data_tasks']))
    # if all task finish successfully we now that session has access to state
    session['state_access'][state_id] = task_ids
    logger.debug("Tasks successfully submitted. Sending response.")
103
    return jsonify(''), 202
Sascha Herzinger's avatar
Sascha Herzinger committed
104
105
106
107
108
109
110
111
112
113
114
115
116
117


@state_blueprint.route('/<uuid:state_id>', methods=['GET'])
def get_state_data(state_id: UUID) -> Tuple[Response, int]:
    """Check whether every ETL linked to the state_id successfully executed for
    this session. If and only if every ETL successfully completed grant access
    to the state information.
    :param state_id: ID of the state that is requested.
    :return: Previously saved state.
    """
    logger.debug("Received GET request on /state/<uuid:state_id>.")
    wait = request.args.get('wait') == '1'
    for task_id in session['state_access'][state_id]:
        data_state = get_data_state_for_task_id(task_id=task_id, wait=wait)
118
        if data_state['etl_state'] == 'SUBMITTED':
Sascha Herzinger's avatar
Sascha Herzinger committed
119
120
121
122
123
124
125
126
127
128
            return jsonify({'message': 'ETLs are still running.'}), 200
        elif data_state['etl_state'] == 'SUCCESS':
            continue
        else:
            error = "One or more ETLs failed or has unknown status. " \
                    "Assuming no access to saved state."
            logger.error(error)
            return jsonify({'error': error}), 403
    state = json.loads(redis.get('state:{}'.format(state_id)))
    return jsonify({'state': state}), 200