Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
Open sidebar
Jochem Bijlard
fractalis
Commits
c3b5d656
Commit
c3b5d656
authored
Feb 27, 2018
by
Sascha Herzinger
Browse files
etlhandler tests passing
parent
7b5532b0
Changes
2
Hide whitespace changes
Inline
Side-by-side
fractalis/data/etlhandler.py
View file @
c3b5d656
...
...
@@ -175,6 +175,7 @@ class ETLHandler(metaclass=abc.ABCMeta):
task_id
=
self
.
find_duplicate_task_id
(
data_tasks
,
descriptor
)
if
task_id
:
task_ids
.
append
(
task_id
)
data_tasks
.
append
(
task_id
)
continue
else
:
self
.
remove_duplicates
(
data_tasks
,
descriptor
)
...
...
@@ -189,6 +190,7 @@ class ETLHandler(metaclass=abc.ABCMeta):
async_result
=
etl
.
apply_async
(
kwargs
=
kwargs
,
task_id
=
task_id
)
assert
async_result
.
id
==
task_id
task_ids
.
append
(
task_id
)
data_tasks
.
append
(
task_id
)
if
wait
and
async_result
.
state
==
'SUBMITTED'
:
logger
.
debug
(
"'wait' was set. Waiting for tasks to finish ..."
)
async_result
.
get
(
propagate
=
False
)
...
...
tests/unit/etls/transmart/test_etlhandler.py
View file @
c3b5d656
...
...
@@ -10,7 +10,7 @@ from fractalis.data.etlhandler import ETLHandler
class
TestETLHandler
:
@
pytest
.
fixture
(
scope
=
'function'
)
def
redis
(
self
,
redis
):
def
redis
(
self
):
from
fractalis
import
redis
,
sync
yield
redis
sync
.
cleanup_all
()
...
...
@@ -38,7 +38,7 @@ class TestETLHandler:
duplicates
=
self
.
etlhandler
.
find_duplicates
(
data_tasks
=
[
'123'
],
descriptor
=
descriptor
)
assert
len
(
duplicates
)
==
1
assert
duplicates
[
0
]
==
123
assert
duplicates
[
0
]
==
'
123
'
def
test_finds_all_duplicates
(
self
,
redis
):
descriptor
=
{
'a'
:
{
'b'
:
3
},
'c'
:
4
}
...
...
@@ -143,10 +143,17 @@ class TestETLHandler:
file_path
=
''
,
descriptor
=
descriptor
,
data_type
=
''
)
monkeypatch
.
setattr
(
'celery.AsyncResult.state'
,
'SUCCESS'
)
class
FakeAsyncResult
:
def
__init__
(
self
,
*
args
,
**
kwargs
):
self
.
state
=
'SUCCESS'
def
get
(
self
,
*
args
,
**
kwargs
):
pass
monkeypatch
.
setattr
(
celery
,
'AsyncResult'
,
FakeAsyncResult
)
task_id
=
self
.
etlhandler
.
find_duplicate_task_id
(
data_tasks
=
[
'123'
],
descriptor
=
descriptor
)
assert
task_id
==
123
assert
task_id
==
'
123
'
def
test_find_duplicate_task_id_returns_task_id_of_SUBMITTED
(
self
,
monkeypatch
,
redis
):
...
...
@@ -155,10 +162,17 @@ class TestETLHandler:
file_path
=
''
,
descriptor
=
descriptor
,
data_type
=
''
)
monkeypatch
.
setattr
(
'celery.AsyncResult.state'
,
'SUBMITTED'
)
class
FakeAsyncResult
:
def
__init__
(
self
,
*
args
,
**
kwargs
):
self
.
state
=
'SUBMITTED'
def
get
(
self
,
*
args
,
**
kwargs
):
pass
monkeypatch
.
setattr
(
celery
,
'AsyncResult'
,
FakeAsyncResult
)
task_id
=
self
.
etlhandler
.
find_duplicate_task_id
(
data_tasks
=
[
'123'
],
descriptor
=
descriptor
)
assert
task_id
==
123
assert
task_id
==
'
123
'
def
test_find_duplicate_task_id_returns_None_for_FAILURE
(
self
,
monkeypatch
,
redis
):
...
...
@@ -167,7 +181,14 @@ class TestETLHandler:
file_path
=
''
,
descriptor
=
descriptor
,
data_type
=
''
)
monkeypatch
.
setattr
(
'celery.AsyncResult.state'
,
'FAILURE'
)
class
FakeAsyncResult
:
def
__init__
(
self
,
*
args
,
**
kwargs
):
self
.
state
=
'FAILURE'
def
get
(
self
,
*
args
,
**
kwargs
):
pass
monkeypatch
.
setattr
(
celery
,
'AsyncResult'
,
FakeAsyncResult
)
task_id
=
self
.
etlhandler
.
find_duplicate_task_id
(
data_tasks
=
[
'123'
],
descriptor
=
descriptor
)
assert
task_id
is
None
...
...
@@ -178,7 +199,14 @@ class TestETLHandler:
file_path
=
''
,
descriptor
=
descriptor
,
data_type
=
''
)
monkeypatch
.
setattr
(
'celery.AsyncResult.state'
,
'SUCCESS'
)
class
FakeAsyncResult
:
def
__init__
(
self
,
*
args
,
**
kwargs
):
self
.
state
=
'SUCCESS'
def
get
(
self
,
*
args
,
**
kwargs
):
pass
monkeypatch
.
setattr
(
celery
,
'AsyncResult'
,
FakeAsyncResult
)
task_id
=
self
.
etlhandler
.
find_duplicate_task_id
(
data_tasks
=
[
'456'
],
descriptor
=
descriptor
)
assert
task_id
is
None
...
...
@@ -186,70 +214,78 @@ class TestETLHandler:
def
test_find_duplicate_task_id_returns_None_for_not_existing
(
self
,
monkeypatch
,
redis
):
descriptor
=
{
'a'
:
{
'b'
:
3
},
'c'
:
4
}
monkeypatch
.
setattr
(
'celery.AsyncResult.state'
,
'FAILURE'
)
class
FakeAsyncResult
:
def
__init__
(
self
,
*
args
,
**
kwargs
):
self
.
state
=
'FAILURE'
def
get
(
self
,
*
args
,
**
kwargs
):
pass
monkeypatch
.
setattr
(
celery
,
'AsyncResult'
,
FakeAsyncResult
)
task_id
=
self
.
etlhandler
.
find_duplicate_task_id
(
data_tasks
=
[
'123'
],
descriptor
=
descriptor
)
assert
task_id
is
None
def
test_handle_reuses_existing_task_ids_if_use_existing
(
self
,
monkeypatch
,
redis
):
descriptor
=
{
'
a'
:
{
'b'
:
3
},
'c'
:
4
}
descriptor
=
{
'
data_type'
:
'default'
}
self
.
etlhandler
.
create_redis_entry
(
task_id
=
'123'
,
file_path
=
''
,
descriptor
=
descriptor
,
data_type
=
''
)
monkeypatch
.
setattr
(
'celery.AsyncResult.state'
,
'SUBMITTED'
)
class
FakeAsyncResult
:
def
__init__
(
self
,
*
args
,
**
kwargs
):
self
.
state
=
'SUBMITTED'
def
get
(
self
,
*
args
,
**
kwargs
):
pass
monkeypatch
.
setattr
(
celery
,
'AsyncResult'
,
FakeAsyncResult
)
task_ids
=
self
.
etlhandler
.
handle
(
descriptors
=
[
descriptor
],
data_tasks
=
[
'123'
],
use_existing
=
True
)
assert
len
(
task_ids
)
==
1
assert
task_ids
[
0
]
==
123
assert
task_ids
[
0
]
==
'
123
'
def
test_handle_limits_search_to_tasks_ids
(
self
,
monkeypatch
,
redis
):
descriptor
=
{
'
a'
:
{
'b'
:
3
},
'c'
:
4
}
descriptor
=
{
'
data_type'
:
'default'
}
self
.
etlhandler
.
create_redis_entry
(
task_id
=
'123'
,
file_path
=
''
,
descriptor
=
descriptor
,
data_type
=
''
)
monkeypatch
.
setattr
(
'celery.AsyncResult.state'
,
'SUBMITTED'
)
task_ids
=
self
.
etlhandler
.
handle
(
descriptors
=
[
descriptor
],
data_tasks
=
[
'123'
],
data_tasks
=
[],
use_existing
=
True
)
assert
len
(
task_ids
)
==
1
assert
task_ids
[
0
]
!=
123
assert
task_ids
[
0
]
!=
'
123
'
def
test_handle_removes_old_and_returns_new_if_not_use_existing
(
self
,
monkeypatch
,
redis
):
descriptor
=
{
'
a'
:
{
'b'
:
3
},
'c'
:
4
}
descriptor
=
{
'
data_type'
:
'default'
}
self
.
etlhandler
.
create_redis_entry
(
task_id
=
'123'
,
file_path
=
''
,
descriptor
=
descriptor
,
data_type
=
''
)
monkeypatch
.
setattr
(
'celery.AsyncResult.state'
,
'SUBMITTED'
)
task_ids
=
self
.
etlhandler
.
handle
(
descriptors
=
[
descriptor
],
data_tasks
=
[
'123'
],
use_existing
=
False
)
assert
len
(
task_ids
)
==
1
assert
task_ids
[
0
]
!=
123
assert
task_ids
[
0
]
!=
'
123
'
def
test_handle_removes_duplicate_of_previous_iteration
(
self
,
monkeypatch
,
redis
):
descriptor
=
{
'a'
:
{
'b'
:
3
},
'c'
:
4
}
monkeypatch
.
setattr
(
'celery.AsyncResult.state'
,
'SUBMITTED'
)
descriptor
=
{
'data_type'
:
'default'
}
task_ids
=
self
.
etlhandler
.
handle
(
descriptors
=
[
descriptor
,
descriptor
],
data_tasks
=
[],
use_existing
=
False
)
monkeypatch
.
setattr
(
'ETL.factory'
)
assert
task_ids
[
0
]
!=
task_ids
[
1
]
assert
len
(
redis
.
keys
(
'data:*'
))
==
1
def
test_handle_uses_duplicate_of_previous_iteration
(
self
,
monkeypatch
,
redis
):
descriptor
=
{
'a'
:
{
'b'
:
3
},
'c'
:
4
}
monkeypatch
.
setattr
(
'celery.AsyncResult.state'
,
'SUBMITTED'
)
descriptor
=
{
'data_type'
:
'default'
}
task_ids
=
self
.
etlhandler
.
handle
(
descriptors
=
[
descriptor
,
descriptor
],
data_tasks
=
[],
use_existing
=
True
)
monkeypatch
.
setattr
(
'ETL.factory'
)
assert
task_ids
[
0
]
==
task_ids
[
1
]
assert
len
(
redis
.
keys
(
'data:*'
))
==
1
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment