Commit 57da4d5e authored by Sascha Herzinger's avatar Sascha Herzinger
Browse files

some analytics stuff

parent 0ea49063
Pipeline #1310 failed with stage
in 25 seconds
swagger: '2.0'
info:
title: Fractalis API
description: foobar
version: '0.1.0'
host: 127.0.0.1:8000
schemes:
- http
produces:
- application/json
paths:
/data:
post:
summary: Add data to current session
parameters:
- name: etl
in: query
description: ETL to use for moving data from source to Fractalis
required: false
type: string
- name: data
in: query
description: Data in Fractalis standard format
required: false
type: string
- name: action
in: query
description: Action to extract the data from the source system
required: false
type: string
responses:
201:
description: Accepted
schema:
type: object
properties:
data_id:
type: string
get:
summary: Get all completed and running ETLs for current session
responses:
200:
description: OK
schema:
type: array
items:
$ref: '#/definitions/ETLStatus'
/data/{data_id}:
parameters:
- name: data_id
in: path
description: ID given on launching an ETL
required: true
type: string
get:
summary: Get status of ETL associated with data_id
responses:
200:
description: OK
schema:
$ref: '#/definitions/ETLStatus'
/data/{data_id}/subsets:
parameters:
- name: data_id
in: path
description: ID given on launching an ETL
required: true
type: string
post:
summary: Create a new subset for the loaded data
parameters:
- name: subjects
in: query
description: List of subjects that define a new subset
required: true
type: string
responses:
201:
description: Created
schema:
type: object
properties:
subset_id:
type: integer
/data/{data_id}/subsets/{subset_id}:
parameters:
- name: data_id
in: path
description: ID given on launching an ETL
required: true
type: string
- name: subset_id
in: path
description: ID given when creating subset
required: true
type: string
delete:
summary: Delete subset definition
responses:
200:
description: OK
/analytics:
post:
summary: Launch analytical script
parameters:
- name: script
in: query
description: Analysis script to run
required: true
type: string
- name: arguments
in: query
description: String describing the arguments to the analysis script
required: true
type: string
responses:
201:
description: Created
schema:
type: object
properties:
job_id:
type: string
/analytics/{job_id}:
parameters:
- name: job_id
in: path
description: ID given on job creation
required: true
type: string
get:
summary: Get job status information for given job id
responses:
200:
description: OK
schema:
$ref: '#/definitions/JobStatus'
delete:
summary: Attempts to kill all processes assiated with the job
responses:
200:
description: OK
definitions:
ETLStatus:
type: object
properties:
data_id:
type: string
status:
type: string
JobStatus:
type: object
properties:
status:
type: string
message:
type: string
\ No newline at end of file
......@@ -4,7 +4,13 @@ Modules in this package:
- config -- Manages Fractalis Flask app configuration
"""
from flask import Flask
from fractalis.config import configure_app
from fractalis.celery import init_celery
from fractalis.analytics.controllers import analytics
app = Flask(__name__)
configure_app(app)
celery = init_celery(app)
app.register_blueprint(analytics, url_prefix='/analytics')
import json
from flask import Blueprint
analytics = Blueprint('analytics', __name__)
@analytics.route('', methods=['POST'])
def create_job():
body = json.dumps({'job_id': 123})
return body, 201
@analytics.route('/<uuid:job_id>', methods=['GET'])
def get_job_details(job_id):
pass
@analytics.route('/<uuid:job_id>', methods=['DELETE'])
def cancel_job(job_id):
pass
"""This module is responsible for the establishment and configuration of a
Celery instance."""
from celery import Celery
def init_celery(app):
"""Establish connection to celery broker and result backend.
TThe function creates a new Celery object, configures it with the broker
from the application config, updates the rest of the Celery config from the
Flask config and then creates a subclass of the task that wraps the task
execution in an application context.
Arguments:
app (Flask) -- An instance of Flask
Exceptions:
ConnectionRefusedError (Exception) -- Is raised when connection fails
Returns:
(Celery) -- An instance of Celery
"""
celery = Celery(app.import_name,
backend=app.config['CELERY_RESULT_BACKEND'],
broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
# try:
# celery.connection().connect()
# except OSError as e:
# error_msg = """Could not establish connection to celery broker.
# URL: '{}'""".format(app.config['CELERY_BROKER_URL'])
# raise ConnectionRefusedError(error_msg) from e
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
......@@ -10,6 +10,8 @@ class BaseConfig(object):
"""Basic configuration that should be used in production."""
DEBUG = False
TESTING = False
CELERY_BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
class DevelopmentConfig(BaseConfig):
......
import uuid
import json
import pytest
class TestAnalytics(object):
@pytest.fixture
def test_client(self):
import fractalis
fractalis.app.testing = True
test_client = fractalis.app.test_client()
return test_client
def test_create_new_resource(self, test_client):
response = test_client.post('/analytics')
response_body = json.loads(response.get_data().decode('utf-8'))
new_resource_url = '/analytics/{}'.format(response_body['job_id'])
assert response.status_code == 201
assert uuid.UUID(response_body['job_id'])
assert test_client.head(new_resource_url).status_code == 200
def test_delete_resource(self, test_client):
response = test_client.post('/analytics')
response_body = json.loads(response.get_data().decode('utf-8'))
new_resource_url = '/analytics/{}'.format(response_body['job_id'])
assert test_client.delete(new_resource_url).status_code == 200
assert test_client.head(new_resource_url).status_code == 404
def test_get_status_new_resource(self, test_client):
pass
import pytest
from celery import Celery
from fractalis import celery
class TestCelery(object):
@pytest.fixture
def flask_app(self):
from flask import Flask
app = Flask('test_app')
app.config['TESTING'] = True
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379'
return app
def test_exception_if_no_connection_to_broker(self, flask_app):
flask_app.config['CELERY_BROKER_URL'] = 'foobar'
with pytest.raises(ConnectionRefusedError):
celery.init_celery(flask_app)
def test_exception_if_no_connection_to_result_backend(self, flask_app):
flask_app.config['CELERY_RESULT_BACKEND'] = 'foobar'
with pytest.raises(ConnectionRefusedError):
celery.init_celery(flask_app)
def test_returns_celery_instance_if_connection_valid(self, flask_app):
celery_instance = celery.init_celery(flask_app)
assert isinstance(celery_instance, Celery)
......@@ -6,7 +6,7 @@ import pytest
import fractalis
class TestConfig:
class TestConfig(object):
def test_config_when_test_mode(self):
os.environ['FRACTALIS_MODE'] = 'testing'
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment