controller.py 3.55 KB
Newer Older
1
2
3
4
5
6
"""The /analytics controller. Please refer to doc/api for more information."""

import logging
from typing import Tuple
from uuid import UUID

Sascha Herzinger's avatar
Sascha Herzinger committed
7
from flask import Blueprint, session, request, jsonify
8
from flask.wrappers import Response
Sascha Herzinger's avatar
Sascha Herzinger committed
9

10
from fractalis import celery, app
11
from fractalis.validator import validate_json, validate_schema
12
13
from fractalis.analytics.schema import create_task_schema
from fractalis.analytics.task import AnalyticTask
Sascha Herzinger's avatar
Sascha Herzinger committed
14
15


Sascha Herzinger's avatar
a lot    
Sascha Herzinger committed
16
analytics_blueprint = Blueprint('analytics_blueprint', __name__)
17
logger = logging.getLogger(__name__)
Sascha Herzinger's avatar
Sascha Herzinger committed
18
19


Sascha Herzinger's avatar
a lot    
Sascha Herzinger committed
20
@analytics_blueprint.route('', methods=['POST'])
21
@validate_json
22
23
24
@validate_schema(create_task_schema)
def create_task() -> Tuple[Response, int]:
    """Create a new analytics task based on the parameters in the POST body.
25
26
27
28
    See doc/api/ for more information.
    :return: Flask Response 
    """
    logger.debug("Received POST request on /analytics.")
29
    json = request.get_json(force=True)
30
31
32
33
34
35
    analytic_task = AnalyticTask.factory(json['task_name'])
    if analytic_task is None:
        logger.error("Could not submit task for unknown task name: "
                     "'{}'".format(json['task_name']))
        return jsonify({'error_msg': "Task with name '{}' not found."
                       .format(json['task_name'])}), 400
36
37
38
    async_result = analytic_task.delay(
        session_data_tasks=session['data_tasks'], args=json['args'],
        decrypt=app.config['FRACTALIS_ENCRYPT_CACHE'])
39
40
41
    session['analytic_tasks'].append(async_result.id)
    logger.debug("Task successfully submitted. Sending response.")
    return jsonify({'task_id': async_result.id}), 201
42
43


44
45
46
@analytics_blueprint.route('/<uuid:task_id>', methods=['GET'])
def get_task_details(task_id: UUID) -> Tuple[Response, int]:
    """Get task details for the given task_id.
47
     See doc/api/ for more information.
48
    :param task_id: ID returned on task creation.
49
50
    :return: Flask Response 
    """
51
    logger.debug("Received GET request on /analytics/task_id.")
52
    wait = request.args.get('wait') == '1'
53
54
55
56
    task_id = str(task_id)
    if task_id not in session['analytic_tasks']:
        error = "Task ID '{}' not found in session. " \
                "Refusing access.".format(task_id)
57
58
        logger.warning(error)
        return jsonify({'error': error}), 403
59
    async_result = celery.AsyncResult(task_id)
60
    if wait:
61
        async_result.get(propagate=False)
62
63
64
    result = async_result.result
    if isinstance(result, Exception):  # Exception -> str
        result = "{}: {}".format(type(result).__name__, str(result))
65
    logger.debug("Task found and has access. Sending response.")
66
    return jsonify({'state': async_result.state, 'result': result}), 200
Sascha Herzinger's avatar
Sascha Herzinger committed
67
68


69
70
71
@analytics_blueprint.route('/<uuid:task_id>', methods=['DELETE'])
def cancel_task(task_id: UUID) -> Tuple[Response, int]:
    """Cancel a task for a given task_id.
72
    See doc/api/ for more information.
73
    :param task_id: ID returned on task creation.
74
75
    :return: Flask Response
    """
76
77
78
79
80
    logger.debug("Received DELETE request on /analytics/task_id.")
    task_id = str(task_id)
    if task_id not in session['analytic_tasks']:
        error = "Task ID '{}' not found in session. " \
                "Refusing access.".format(task_id)
81
82
        logger.warning(error)
        return jsonify({'error': error}), 403
83
    wait = request.args.get('wait') == '1'
84
    # possibly dangerous: http://stackoverflow.com/a/29627549
85
    celery.control.revoke(task_id, terminate=True, signal='SIGUSR1', wait=wait)
86
    session['analytic_tasks'].remove(task_id)
87
    logger.debug("Successfully send term signal to task. Sending response.")
88
    return jsonify(''), 200