Commit fd345093 authored by Sascha Herzinger's avatar Sascha Herzinger
Browse files

implemented job factory system

parent 4fd472b6
Pipeline #1720 failed with stage
in 1 minute and 18 seconds
from fractalis.utils import list_classes_with_base_class
from .job import AnalyticsJob
JOB_REGISTRY = list_classes_with_base_class('fractalis.analytics.jobs',
AnalyticsJob)
......@@ -4,29 +4,18 @@ from flask import Blueprint, session, request, jsonify
from fractalis.celery import app as celery
from fractalis.validator import validate_json, validate_schema
from fractalis.analytics.schema import create_job_schema
from .schema import create_job_schema
from .job import AnalyticsJob
analytics_blueprint = Blueprint('analytics_blueprint', __name__)
def get_celery_task(task):
try:
split = task.split('.')
import_cmd = ('importlib.import_module("'
'fractalis.analytics.scripts.{}.{}").{}')
task = eval(import_cmd.format(*split))
except Exception as e:
# some logging here would be nice
return None
return task
@analytics_blueprint.before_request
def prepare_session():
session.permanent = True
if 'tasks' not in session:
session['tasks'] = []
if 'jobs' not in session:
session['jobs'] = []
@analytics_blueprint.route('', methods=['POST'])
......@@ -34,27 +23,21 @@ def prepare_session():
@validate_schema(create_job_schema)
def create_job():
json = request.get_json(force=True)
task = get_celery_task(json['task'])
if task is None:
return jsonify({'error_msg': 'Task {} not found.'.format(
json['task'])}), 400
try:
async_result = task.delay(**json['args'])
except TypeError as e:
return jsonify({'error_msg':
'Invalid Arguments for task {}: {}'.format(
json['task'], e)}), 400
session['tasks'].append(async_result.id)
return jsonify({'task_id': async_result.id}), 201
@analytics_blueprint.route('/<uuid:task_id>', methods=['GET'])
def get_job_details(task_id):
task_id = str(task_id)
if task_id not in session['tasks']: # access control
return jsonify({'error_msg': "No matching task found."}), 404
async_result = celery.AsyncResult(task_id)
analytics_job = AnalyticsJob.factory(json['job_name'])
if analytics_job is None:
return jsonify({'error_msg': "Job with name '{}' not found.".format(
json['job_name'])}), 400
async_result = analytics_job.delay(**json['args'])
session['jobs'].append(async_result.id)
return jsonify({'job_id': async_result.id}), 201
@analytics_blueprint.route('/<uuid:job_id>', methods=['GET'])
def get_job_details(job_id):
job_id = str(job_id)
if job_id not in session['jobs']: # access control
return jsonify({'error_msg': "No matching job found."}), 404
async_result = celery.AsyncResult(job_id)
wait = request.args.get('wait') == '1'
if wait:
async_result.get(propagate=False) # wait for results
......@@ -66,13 +49,13 @@ def get_job_details(task_id):
'result': result}), 200
@analytics_blueprint.route('/<uuid:task_id>', methods=['DELETE'])
def cancel_job(task_id):
task_id = str(task_id)
if task_id not in session['tasks']: # Access control
return jsonify({'error_msg': "No matching task found."}), 404
@analytics_blueprint.route('/<uuid:job_id>', methods=['DELETE'])
def cancel_job(job_id):
job_id = str(job_id)
if job_id not in session['jobs']: # Access control
return jsonify({'error_msg': "No matching job found."}), 404
wait = request.args.get('wait') == '1'
# possibly dangerous: http://stackoverflow.com/a/29627549
celery.control.revoke(task_id, terminate=True, signal='SIGUSR1', wait=wait)
session['tasks'].remove(task_id)
return jsonify({'task_id': task_id}), 200
celery.control.revoke(job_id, terminate=True, signal='SIGUSR1', wait=wait)
session['jobs'].remove(job_id)
return jsonify({'job_id': job_id}), 200
import abc
# TODO: is there a difference between this and importing
# fractalis.celery.app.Task ?
from celery import Task
class AnalyticsJob(Task, metaclass=abc.ABCMeta):
@property
@abc.abstractmethod
def name(self):
pass
@staticmethod
def factory(job_name):
from . import JOB_REGISTRY
for job in JOB_REGISTRY:
if job.name == job_name:
return job()
@abc.abstractmethod
def run(self):
pass
import time
from fractalis.analytics.job import AnalyticsJob
class AddJob(AnalyticsJob):
name = 'add_test_job'
def run(self, a, b):
return a + b
class DoNothingJob(AnalyticsJob):
name = 'nothing_test_job'
def run(self, seconds):
time.sleep(seconds)
return 1
class DivJob(AnalyticsJob):
name = 'div_test_job'
def run(self, a, b):
return a / b
create_job_schema = {
"type": "object",
"properties": {
"task": {"type": "string", "minLength": 5},
"job_name": {"type": "string", "minLength": 5},
"args": {"type": "object", "minProperties": 1},
},
"required": ["task", "args"]
"required": ["job_name", "args"]
}
import time
from fractalis.celery import app
@app.task
def add(a, b):
return a + b
@app.task
def do_nothing(seconds):
time.sleep(seconds)
return 1
@app.task
def div(a, b):
return a / b
......@@ -8,6 +8,7 @@ from celery import Celery
from fractalis.utils import list_classes_with_base_class
from fractalis.data.etls.etl import ETL
from fractalis.analytics.job import AnalyticsJob
app = Celery(__name__)
......@@ -27,3 +28,8 @@ except KeyError:
etl_classes = list_classes_with_base_class('fractalis.data.etls', ETL)
for etl_class in etl_classes:
app.tasks.register(etl_class)
analytics_job_classes = list_classes_with_base_class('fractalis.analytics.job',
AnalyticsJob)
for analytics_job_class in analytics_job_classes:
app.tasks.register(analytics_job_class)
from fractalis.utils import list_classes_with_base_class
from fractalis.data.etls.etlhandler import ETLHandler
from fractalis.data.etls.etl import ETL
from .etlhandler import ETLHandler
from .etl import ETL
HANDLER_REGISTRY = list_classes_with_base_class('fractalis.data.etls',
ETLHandler)
......
......@@ -15,59 +15,75 @@ class TestAnalytics(object):
app.testing = True
with app.test_client() as test_client:
yield test_client
# cleanup running tasks after each test
for task_id in flask.session['tasks']:
test_client.delete('/analytics/{}?wait=1'.format(task_id))
# cleanup running jobs after each test
for job_id in flask.session['jobs']:
test_client.delete('/analytics/{}?wait=1'.format(job_id))
# test POST to /analytics
def test_new_resource_created(self, app):
rv = app.post('/analytics', data=flask.json.dumps(dict(
task='test.tasks.add',
job_name='add_test_job',
args={'a': 1, 'b': 1}
)))
body = flask.json.loads(rv.get_data())
new_url = '/analytics/{}'.format(body['task_id'])
assert rv.status_code == 201
assert uuid.UUID(body['task_id'])
body = flask.json.loads(rv.get_data())
new_url = '/analytics/{}'.format(body['job_id'])
assert uuid.UUID(body['job_id'])
assert app.head(new_url).status_code == 200
@pytest.fixture(scope='function',
params=[{'task': 'querty.tasks.add',
'args': {'a': 1, 'b': 2}},
{'task': 'test.tasks.querty',
'args': {'a': 1, 'b': 2}},
{'task': 'test.add',
'args': {'a': 1, 'b': 2}},
{'task': 'test.tasks.add',
'args': {'a': 1, 'c': 2}},
{'task': 'test.tasks.add',
'args': {'a': 1}},
{'task': 'test.tasks.add'},
{'args': {'a': 1, 'b': 2}},
{'task': '',
params=[{'job_name': 'i_dont_exist_job',
'args': {'a': 1, 'b': 2}},
{'task': 'querty.tasks.add',
'args': ''}])
{'job_name': '',
'args': {'a': 1, 'b': 2}}])
def bad_request(self, app, request):
return app.post('/analytics', data=flask.json.dumps(request.param))
def test_400_if_POST_body_invalid(self, bad_request):
assert bad_request.status_code == 400
def test_no_conflict_when_running_job_twice(self, app):
rv1 = app.post('/analytics', data=flask.json.dumps(dict(
job_name='nothing_test_job',
args={'seconds': 4}
)))
assert rv1.status_code == 201
rv2 = app.post('/analytics', data=flask.json.dumps(dict(
job_name='nothing_test_job',
args={'seconds': 4}
)))
assert rv2.status_code == 201
body1 = flask.json.loads(rv1.get_data())
new_url1 = '/analytics/{}?wait=0'.format(body1['job_id'])
new_response1 = app.get(new_url1)
assert new_response1.status_code == 200
new_body1 = flask.json.loads(new_response1.get_data())
new_body1['status'] != 'FAILURE'
body2 = flask.json.loads(rv2.get_data())
new_url2 = '/analytics/{}?wait=0'.format(body2['job_id'])
new_response2 = app.get(new_url2)
assert new_response2.status_code == 200
new_body2 = flask.json.loads(new_response2.get_data())
new_body2['status'] != 'FAILURE'
@pytest.mark.skip(reason="Data interface not implemented yet.")
def test_404_if_creating_without_auth(self, app):
pass
# test DELETE to /analytics/{task_id}
# test DELETE to /analytics/{job_id}
def test_resource_deleted(self, app):
rv = app.post('/analytics', data=flask.json.dumps(dict(
task='test.tasks.add',
job_name='add_test_job',
args={'a': 1, 'b': 1}
)))
assert rv.status_code == 201
body = flask.json.loads(rv.get_data())
new_url = '/analytics/{}?wait=1'.format(body['task_id'])
new_url = '/analytics/{}?wait=1'.format(body['job_id'])
assert app.head(new_url).status_code == 200
assert app.delete(new_url).status_code == 200
assert app.head(new_url).status_code == 404
......@@ -78,61 +94,69 @@ class TestAnalytics(object):
def test_running_resource_deleted(self, app):
rv = app.post('/analytics', data=flask.json.dumps(dict(
task='test.tasks.do_nothing',
job_name='nothing_test_job',
args={'seconds': 4}
)))
assert rv.status_code == 201
body = flask.json.loads(rv.get_data())
new_url = '/analytics/{}?wait=1'.format(body['task_id'])
new_url = '/analytics/{}?wait=1'.format(body['job_id'])
assert app.head(new_url).status_code == 200
assert app.delete(new_url).status_code == 200
assert app.head(new_url).status_code == 404
def test_404_if_deleting_without_auth(self, app):
rv = app.post('/analytics', data=flask.json.dumps(dict(
task='test.tasks.do_nothing',
job_name='nothing_test_job',
args={'seconds': 4}
)))
assert rv.status_code == 201
time.sleep(1)
body = flask.json.loads(rv.get_data())
new_url = '/analytics/{}?wait=1'.format(body['task_id'])
new_url = '/analytics/{}?wait=1'.format(body['job_id'])
with app.session_transaction() as sess:
sess['tasks'] = []
sess['jobs'] = []
assert app.delete(new_url).status_code == 404
# test GET to /analytics/{task_id}
# test GET to /analytics/{job_id}
def test_status_contains_result_if_finished(self, app):
rv = app.post('/analytics', data=flask.json.dumps(dict(
task='test.tasks.add',
job_name='add_test_job',
args={'a': 1, 'b': 2}
)))
assert rv.status_code == 201
body = flask.json.loads(rv.get_data())
new_url = '/analytics/{}?wait=1'.format(body['task_id'])
new_url = '/analytics/{}?wait=1'.format(body['job_id'])
new_response = app.get(new_url)
assert new_response.status_code == 200
new_body = flask.json.loads(new_response.get_data())
assert new_body['result'] == 3
assert new_body['status'] == 'SUCCESS', new_body
assert new_body['result'] == 3, new_body
def test_status_result_empty_if_not_finished(self, app):
rv = app.post('/analytics', data=flask.json.dumps(dict(
task='test.tasks.do_nothing',
job_name='nothing_test_job',
args={'seconds': 4}
)))
time.sleep(1)
body = flask.json.loads(rv.get_data())
new_url = '/analytics/{}?wait=0'.format(body['task_id'])
new_url = '/analytics/{}?wait=0'.format(body['job_id'])
new_response = app.get(new_url)
assert new_response.status_code == 200
new_body = flask.json.loads(new_response.get_data())
assert not new_body['result']
assert new_body['status'] == 'PENDING'
def test_correct_response_if_task_fails(self, app):
def test_correct_response_if_job_fails(self, app):
rv = app.post('/analytics', data=flask.json.dumps(dict(
task='test.tasks.div',
job_name='div_test_job',
args={'a': 2, 'b': 0}
)))
assert rv.status_code == 201
body = flask.json.loads(rv.get_data())
new_url = '/analytics/{}?wait=1'.format(body['task_id'])
new_url = '/analytics/{}?wait=1'.format(body['job_id'])
new_response = app.get(new_url)
assert new_response.status_code == 200
new_body = flask.json.loads(new_response.get_data())
assert new_body['status'] == 'FAILURE'
assert 'ZeroDivisionError' in new_body['result']
......@@ -143,11 +167,12 @@ class TestAnalytics(object):
def test_404_if_status_without_auth(self, app):
rv = app.post('/analytics', data=flask.json.dumps(dict(
task='test.tasks.do_nothing',
job_name='nothing_test_job',
args={'seconds': 4}
)))
assert rv.status_code == 201
body = flask.json.loads(rv.get_data())
new_url = '/analytics/{}?wait=0'.format(body['task_id'])
new_url = '/analytics/{}?wait=0'.format(body['job_id'])
with app.session_transaction() as sess:
sess['tasks'] = []
sess['jobs'] = []
assert app.get(new_url).status_code == 404
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment