Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
Open sidebar
Fractalis
fractalis
Commits
5f77e21c
Commit
5f77e21c
authored
Nov 10, 2016
by
Sascha Herzinger
Browse files
fixing celery worker
parent
a881ff93
Pipeline
#1371
failed with stage
in 4 minutes and 24 seconds
Changes
8
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
fractalis/__init__.py
View file @
5f77e21c
...
...
@@ -5,19 +5,17 @@ Modules in this package:
"""
from
flask
import
Flask
from
fractalis.celery
import
make_celery
from
fractalis.session
import
RedisSessionInterface
from
fractalis.celery
import
init_celery
from
fractalis.analytics.controllers
import
analytics_blueprint
app
=
Flask
(
__name__
)
app
.
config
.
from_object
(
'fractalis.config'
)
celery_app
=
make_celery
(
app
)
app
.
session_interface
=
RedisSessionInterface
(
app
.
config
)
celery_app
=
init_celery
(
app
)
app
.
register_blueprint
(
analytics_blueprint
,
url_prefix
=
'/analytics'
)
if
__name__
==
'__main__'
:
app
.
config
.
from_envvar
(
'FRACTALIS_CONFIG'
)
celery_app
.
worker_main
([
'worker'
,
'--loglevel=DEBUG'
])
app
.
run
()
fractalis/analytics/job.py
View file @
5f77e21c
"""
"""
import
fractalis.analytics.scripts
# flake8: noqa
def
get_celery_task
(
script
):
split
=
script
.
split
(
'.'
)
module
=
'fractalis.analytics.scripts.{}'
.
format
(
'.'
.
join
(
split
[:
-
1
]))
exec
(
'import {}'
.
format
(
module
))
celery_task
=
eval
(
'{}.{}'
.
format
(
module
,
split
[
-
1
]))
def
get_celery_task
(
task
):
celery_task
=
eval
(
'fractalis.analytics.scripts.{}'
.
format
(
task
))
return
celery_task
def
start_job
(
script
,
arguments
):
celery_task
=
get_celery_task
(
script
)
def
start_job
(
task
,
arguments
):
celery_task
=
get_celery_task
(
task
)
async_result
=
celery_task
.
delay
(
**
arguments
)
return
async_result
.
id
def
cancel_job
(
script
,
job_id
):
def
cancel_job
(
task
,
job_id
):
pass
def
get_job_result
(
script
,
job_id
):
celery_task
=
get_celery_task
(
script
)
def
get_job_result
(
task
,
job_id
):
celery_task
=
get_celery_task
(
task
)
return
celery_task
.
AsyncResult
(
job_id
)
fractalis/analytics/scripts/__init__.py
View file @
5f77e21c
from
fractalis.celery
import
get_scripts_packages
packages
=
get_scripts_packages
()
for
package
in
packages
:
exec
(
'import {}.tasks'
.
format
(
package
))
fractalis/analytics/scripts/test/__init.py
__
→
fractalis/analytics/scripts/test/__init
__
.py
View file @
5f77e21c
File moved
fractalis/analytics/scripts/test/
sample
.py
→
fractalis/analytics/scripts/test/
tasks
.py
View file @
5f77e21c
File moved
fractalis/celery.py
View file @
5f77e21c
"""This module is responsible for the establishment and configuration of a
Celery instance."""
from
celery
import
Celery
import
os
def
init_celery
(
app
):
"""Establish connection to celery broker and result backend.
from
celery
import
Celery
TThe function creates a new Celery object, configures it with the broker
from the application config, updates the rest of the Celery config from the
Flask config and then creates a subclass of the task that wraps the task
execution in an application context.
Arguments:
app (Flask) -- An instance of Flask
def
get_scripts_packages
():
packages
=
[]
script_dir
=
os
.
path
.
join
(
os
.
path
.
dirname
(
__file__
),
'analytics'
,
'scripts'
)
for
dirpath
,
dirnames
,
filenames
in
os
.
walk
(
script_dir
):
if
(
dirpath
==
script_dir
or
'__pycache__'
in
dirpath
or
'__init__.py'
not
in
filenames
):
continue
dirname
=
os
.
path
.
basename
(
dirpath
)
package
=
'fractalis.analytics.scripts.{}'
.
format
(
dirname
)
packages
.
append
(
package
)
return
packages
Exceptions:
ConnectionRefusedError (Exception) -- Is raised when connection fails
Returns:
(Celery) -- An instance of Celery
"""
def
make_celery
(
app
):
celery
=
Celery
(
app
.
import_name
,
backend
=
app
.
config
[
'CELERY_RESULT_BACKEND'
],
broker
=
app
.
config
[
'CELERY_BROKER_URL'
])
celery
.
conf
.
update
(
app
.
config
)
try
:
celery
.
connection
().
heartbeat_check
()
except
Exception
as
e
:
error_msg
=
"Could not establish connection to broker: {}"
.
format
(
app
.
config
[
'CELERY_BROKER_URL'
])
raise
ConnectionRefusedError
(
error_msg
)
from
e
try
:
@
celery
.
task
def
f
():
pass
f
.
delay
()
except
Exception
as
e
:
error_msg
=
"Could not establish connection to backend: {}"
.
format
(
app
.
config
[
'CELERY_BROKER_URL'
])
raise
ConnectionRefusedError
(
error_msg
)
from
e
# -- Execute celery tasks in app context --
# TaskBase = celery.Task
#
# class ContextTask(TaskBase):
# abstract = True
#
# def __call__(self, *args, **kwargs):
# with app.app_context():
# return TaskBase.__call__(self, *args, **kwargs)
# celery.Task = ContextTask
celery
.
autodiscover_tasks
(
packages
=
get_scripts_packages
())
return
celery
tests/test_celery.py
deleted
100644 → 0
View file @
a881ff93
import
pytest
from
celery
import
Celery
from
fractalis.celery
import
init_celery
class
TestCelery
(
object
):
@
pytest
.
fixture
()
def
app
(
self
):
from
flask
import
Flask
app
=
Flask
(
'test_app'
)
app
.
config
.
from_object
(
'fractalis.config'
)
return
app
def
test_exception_if_no_connection_to_broker
(
self
,
app
):
app
.
config
[
'CELERY_BROKER_URL'
]
=
'redis://lacolhost:6379'
with
pytest
.
raises
(
ConnectionRefusedError
):
init_celery
(
app
)
def
test_exception_if_no_connection_to_result_backend
(
self
,
app
):
app
.
config
[
'CELERY_RESULT_BACKEND'
]
=
'redis://lacolhost:6379'
with
pytest
.
raises
(
ConnectionRefusedError
):
init_celery
(
app
)
def
test_returns_celery_instance_if_connection_valid
(
self
,
app
):
celery_instance
=
init_celery
(
app
)
assert
isinstance
(
celery_instance
,
Celery
)
tests/test_job.py
View file @
5f77e21c
from
time
import
sleep
from
uuid
import
UUID
,
uuid4
import
pytest
...
...
@@ -8,49 +7,49 @@ from fractalis.analytics import job
class
TestJob
(
object
):
def
test_exception_when_starting_non_existing_script
(
self
):
with
pytest
.
raises
(
ImportError
):
job
.
start_job
(
'querty.sample.add'
,
{})
def
test_exception_when_starting_non_existing_task
(
self
):
with
pytest
.
raises
(
AttributeError
):
job
.
start_job
(
'querty.tasks.add'
,
{})
with
pytest
.
raises
(
AttributeError
):
job
.
start_job
(
'test.tasks.querty'
,
{})
def
test_exception_when_invalid_parameters
(
self
):
with
pytest
.
raises
(
TypeError
):
job
.
start_job
(
'test.
sample
.add'
,
{
'a'
:
1
})
job
.
start_job
(
'test.
tasks
.add'
,
{
'a'
:
1
})
def
test_start_job_returns_uuid
(
self
):
job_id
=
job
.
start_job
(
'test.
sample
.add'
,
{
'a'
:
1
,
'b'
:
2
})
job_id
=
job
.
start_job
(
'test.
tasks
.add'
,
{
'a'
:
1
,
'b'
:
2
})
UUID
(
job_id
)
def
test_finished_job_returns_results
(
self
):
job_id
=
job
.
start_job
(
'test.
sample
.add'
,
{
'a'
:
1
,
'b'
:
2
})
sleep
(
1
)
async_result
=
job
.
get_job_result
(
'test.sample.add'
,
job_id
)
job_id
=
job
.
start_job
(
'test.
tasks
.add'
,
{
'a'
:
1
,
'b'
:
2
})
async_result
=
job
.
get_job_result
(
'test.tasks.add'
,
job_id
)
async_result
.
get
(
timeout
=
1
)
assert
async_result
.
status
==
'SUCCESS'
assert
async_result
.
result
==
3
def
test_failing_job_return_exception_message
(
self
):
job_id
=
job
.
start_job
(
'test.
sample
.div'
,
{
'a'
:
1
,
'b'
:
0
})
sleep
(
1
)
async_result
=
job
.
get_job_result
(
'test.sample.div'
,
job_id
)
job_id
=
job
.
start_job
(
'test.
tasks
.div'
,
{
'a'
:
1
,
'b'
:
0
})
async_result
=
job
.
get_job_result
(
'test.tasks.div'
,
job_id
)
async_result
.
get
(
timeout
=
1
)
assert
async_result
.
status
==
'FAILURE'
assert
async_result
.
result
==
'wdawd'
assert
'ZeroDivisionError'
in
async_result
.
result
def
test_job_in_progress_has_running_status
(
self
):
job_id
=
job
.
start_job
(
'test.
sample
.do_nothing'
,
{
'time'
:
2
})
async_result
=
job
.
get_job_result
(
'test.
sample
.do_nothing'
,
job_id
)
job_id
=
job
.
start_job
(
'test.
tasks
.do_nothing'
,
{
'time'
:
2
})
async_result
=
job
.
get_job_result
(
'test.
tasks
.do_nothing'
,
job_id
)
assert
async_result
.
status
==
'PENDING'
def
test_exception_when_checking_non_existing_job
(
self
):
with
pytest
.
raises
(
LookupError
):
job
.
get_job_result
(
'test.
sample
.do_nothing'
,
str
(
uuid4
()))
job
.
get_job_result
(
'test.
tasks
.do_nothing'
,
str
(
uuid4
()))
def
test_job_is_gone_after_canceling
(
self
):
job_id
=
job
.
start_job
(
'test.sample.do_nothing'
,
{
'time'
:
10
})
job
.
cancel_job
(
'test.sample.do_nothing'
,
job_id
)
# TODO Not sure which exception is thrown
with
pytest
.
raises
():
job_id
=
job
.
start_job
(
'test.tasks.do_nothing'
,
{
'time'
:
10
})
job
.
cancel_job
(
'test.tasks.do_nothing'
,
job_id
)
with
pytest
.
raises
(
LookupError
):
job
.
get_job_result
(
job_id
)
def
test_exception_when_canceling_non_existing_job
(
self
):
# TODO Not sure which exception is thrown
with
pytest
.
raises
():
job
.
cancel_job
(
'test.sample.do_nothing'
,
uuid4
())
with
pytest
.
raises
(
LookupError
):
job
.
cancel_job
(
'test.tasks.do_nothing'
,
uuid4
())
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment