Commit 7b5532b0 authored by Sascha Herzinger's avatar Sascha Herzinger
Browse files

New unit tests

parent c5125d70
......@@ -96,7 +96,7 @@ class ETLHandler(metaclass=abc.ABCMeta):
value=json.dumps(data_state),
time=app.config['FRACTALIS_CACHE_EXP'])
def descriptor_to_hash(self, descriptor):
def descriptor_to_hash(self, descriptor: dict) -> int:
"""Compute hash for the given descriptor. Used to identify duplicates.
:param descriptor: ETL descriptor. Used to identify duplicates.
:return: Unique hash.
......@@ -126,8 +126,7 @@ class ETLHandler(metaclass=abc.ABCMeta):
task_ids.append(task_id)
return task_ids
def remove_duplicates(self, data_tasks: List[str],
descriptor: dict) -> None:
def remove_duplicates(self, data_tasks: List[str], descriptor: dict) -> None:
"""Delete the duplicates of the given descriptor from redis and call
the janitor afterwards to cleanup orphaned files.
:param data_tasks: Limit duplicate search to.
......
"""This module provides tests for the etlhandler module."""
import pytest
from fractalis import celery
from fractalis.data.etlhandler import ETLHandler
# noinspection PyMissingOrEmptyDocstring,PyMissingTypeHints,PyPep8Naming
class TestETLHandler:
@pytest.fixture(scope='function')
def redis(self, redis):
from fractalis import redis, sync
yield redis
sync.cleanup_all()
etlhandler = ETLHandler.factory(handler='test', server='localfoo', auth={})
def test_descriptor_to_hash_produces_unique_hash(self, redis):
hash_1 = self.etlhandler.descriptor_to_hash(descriptor={'a': 1})
hash_2 = self.etlhandler.descriptor_to_hash(descriptor={'': ''})
hash_3 = self.etlhandler.descriptor_to_hash(descriptor={'a': 1})
self.etlhandler._server = 'localbar'
hash_4 = self.etlhandler.descriptor_to_hash(descriptor={'a': 1})
assert isinstance(hash_1, int)
assert isinstance(hash_4, int)
assert hash_1 == hash_3
assert hash_1 != hash_2
assert hash_1 != hash_4
def test_find_duplicates_finds_duplicates_by_hash(self, redis):
descriptor = {'a': {'b': 3}, 'c': 4}
self.etlhandler.create_redis_entry(task_id='123',
file_path='',
descriptor=descriptor,
data_type='')
duplicates = self.etlhandler.find_duplicates(data_tasks=['123'],
descriptor=descriptor)
assert len(duplicates) == 1
assert duplicates[0] == 123
def test_finds_all_duplicates(self, redis):
descriptor = {'a': {'b': 3}, 'c': 4}
self.etlhandler.create_redis_entry(task_id='123',
file_path='',
descriptor=descriptor,
data_type='')
self.etlhandler.create_redis_entry(task_id='456',
file_path='',
descriptor=descriptor,
data_type='')
self.etlhandler.create_redis_entry(task_id='789',
file_path='',
descriptor={'a': 5},
data_type='')
duplicates = self.etlhandler.find_duplicates(
data_tasks=['123', '456', '789'], descriptor=descriptor)
assert len(duplicates) == 2
assert '123' in duplicates
assert '456' in duplicates
def test_find_duplicates_only_operates_on_given_list(self, redis):
descriptor = {'a': {'b': 3}, 'c': 4}
self.etlhandler.create_redis_entry(task_id='123',
file_path='',
descriptor=descriptor,
data_type='')
self.etlhandler.create_redis_entry(task_id='456',
file_path='',
descriptor=descriptor,
data_type='')
self.etlhandler.create_redis_entry(task_id='789',
file_path='',
descriptor={'a': 5},
data_type='')
duplicates = self.etlhandler.find_duplicates(
data_tasks=['456', '789'], descriptor=descriptor)
assert len(duplicates) == 1
assert '456' in duplicates
def test_remove_duplicates_removes_duplicate(self, redis):
descriptor = {'a': {'b': 3}, 'c': 4}
self.etlhandler.create_redis_entry(task_id='456',
file_path='',
descriptor=descriptor,
data_type='')
assert redis.exists('data:456')
self.etlhandler.remove_duplicates(data_tasks=['456'],
descriptor=descriptor)
assert not redis.exists('data:456')
def test_remove_duplicates_removes_all_duplicates(self, redis):
descriptor = {'a': {'b': 3}, 'c': 4}
self.etlhandler.create_redis_entry(task_id='123',
file_path='',
descriptor=descriptor,
data_type='')
self.etlhandler.create_redis_entry(task_id='456',
file_path='',
descriptor=descriptor,
data_type='')
self.etlhandler.create_redis_entry(task_id='789',
file_path='',
descriptor={'a': 1},
data_type='')
assert redis.exists('data:123')
assert redis.exists('data:456')
assert redis.exists('data:789')
self.etlhandler.remove_duplicates(data_tasks=['123', '456', '789'],
descriptor=descriptor)
assert not redis.exists('data:123')
assert not redis.exists('data:456')
assert redis.exists('data:789')
def test_remove_duplicates_only_operates_on_given_list(self, redis):
descriptor = {'a': {'b': 3}, 'c': 4}
self.etlhandler.create_redis_entry(task_id='123',
file_path='',
descriptor=descriptor,
data_type='')
self.etlhandler.create_redis_entry(task_id='456',
file_path='',
descriptor=descriptor,
data_type='')
self.etlhandler.create_redis_entry(task_id='789',
file_path='',
descriptor={'a': 1},
data_type='')
assert redis.exists('data:123')
assert redis.exists('data:456')
assert redis.exists('data:789')
self.etlhandler.remove_duplicates(data_tasks=['123', '789'],
descriptor=descriptor)
assert not redis.exists('data:123')
assert redis.exists('data:456')
assert redis.exists('data:789')
def test_find_duplicate_task_id_returns_task_id_of_SUCCESS(
self, monkeypatch, redis):
descriptor = {'a': {'b': 3}, 'c': 4}
self.etlhandler.create_redis_entry(task_id='123',
file_path='',
descriptor=descriptor,
data_type='')
monkeypatch.setattr('celery.AsyncResult.state', 'SUCCESS')
task_id = self.etlhandler.find_duplicate_task_id(
data_tasks=['123'], descriptor=descriptor)
assert task_id == 123
def test_find_duplicate_task_id_returns_task_id_of_SUBMITTED(
self, monkeypatch, redis):
descriptor = {'a': {'b': 3}, 'c': 4}
self.etlhandler.create_redis_entry(task_id='123',
file_path='',
descriptor=descriptor,
data_type='')
monkeypatch.setattr('celery.AsyncResult.state', 'SUBMITTED')
task_id = self.etlhandler.find_duplicate_task_id(
data_tasks=['123'], descriptor=descriptor)
assert task_id == 123
def test_find_duplicate_task_id_returns_None_for_FAILURE(
self, monkeypatch, redis):
descriptor = {'a': {'b': 3}, 'c': 4}
self.etlhandler.create_redis_entry(task_id='123',
file_path='',
descriptor=descriptor,
data_type='')
monkeypatch.setattr('celery.AsyncResult.state', 'FAILURE')
task_id = self.etlhandler.find_duplicate_task_id(
data_tasks=['123'], descriptor=descriptor)
assert task_id is None
def test_find_duplicate_limits_search_to_data_tasks(self, monkeypatch, redis):
descriptor = {'a': {'b': 3}, 'c': 4}
self.etlhandler.create_redis_entry(task_id='123',
file_path='',
descriptor=descriptor,
data_type='')
monkeypatch.setattr('celery.AsyncResult.state', 'SUCCESS')
task_id = self.etlhandler.find_duplicate_task_id(
data_tasks=['456'], descriptor=descriptor)
assert task_id is None
def test_find_duplicate_task_id_returns_None_for_not_existing(
self, monkeypatch, redis):
descriptor = {'a': {'b': 3}, 'c': 4}
monkeypatch.setattr('celery.AsyncResult.state', 'FAILURE')
task_id = self.etlhandler.find_duplicate_task_id(
data_tasks=['123'], descriptor=descriptor)
assert task_id is None
def test_handle_reuses_existing_task_ids_if_use_existing(
self, monkeypatch, redis):
descriptor = {'a': {'b': 3}, 'c': 4}
self.etlhandler.create_redis_entry(task_id='123',
file_path='',
descriptor=descriptor,
data_type='')
monkeypatch.setattr('celery.AsyncResult.state', 'SUBMITTED')
task_ids = self.etlhandler.handle(descriptors=[descriptor],
data_tasks=['123'],
use_existing=True)
assert len(task_ids) == 1
assert task_ids[0] == 123
def test_handle_limits_search_to_tasks_ids(self, monkeypatch, redis):
descriptor = {'a': {'b': 3}, 'c': 4}
self.etlhandler.create_redis_entry(task_id='123',
file_path='',
descriptor=descriptor,
data_type='')
monkeypatch.setattr('celery.AsyncResult.state', 'SUBMITTED')
task_ids = self.etlhandler.handle(descriptors=[descriptor],
data_tasks=['123'],
use_existing=True)
assert len(task_ids) == 1
assert task_ids[0] != 123
def test_handle_removes_old_and_returns_new_if_not_use_existing(
self, monkeypatch, redis):
descriptor = {'a': {'b': 3}, 'c': 4}
self.etlhandler.create_redis_entry(task_id='123',
file_path='',
descriptor=descriptor,
data_type='')
monkeypatch.setattr('celery.AsyncResult.state', 'SUBMITTED')
task_ids = self.etlhandler.handle(descriptors=[descriptor],
data_tasks=['123'],
use_existing=False)
assert len(task_ids) == 1
assert task_ids[0] != 123
def test_handle_removes_duplicate_of_previous_iteration(
self, monkeypatch, redis):
descriptor = {'a': {'b': 3}, 'c': 4}
monkeypatch.setattr('celery.AsyncResult.state', 'SUBMITTED')
task_ids = self.etlhandler.handle(descriptors=[descriptor, descriptor],
data_tasks=[],
use_existing=False)
monkeypatch.setattr('ETL.factory')
assert task_ids[0] != task_ids[1]
assert len(redis.keys('data:*')) == 1
def test_handle_uses_duplicate_of_previous_iteration(
self, monkeypatch, redis):
descriptor = {'a': {'b': 3}, 'c': 4}
monkeypatch.setattr('celery.AsyncResult.state', 'SUBMITTED')
task_ids = self.etlhandler.handle(descriptors=[descriptor, descriptor],
data_tasks=[],
use_existing=True)
monkeypatch.setattr('ETL.factory')
assert task_ids[0] == task_ids[1]
assert len(redis.keys('data:*')) == 1
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment