Commit 35620ba3 authored by Sascha Herzinger's avatar Sascha Herzinger
Browse files

Added ETL sanity check

parent d5fe6635
......@@ -100,6 +100,17 @@ class ETL(Task, metaclass=abc.ABCMeta):
"""
pass
def sanityCheck(self):
"""Check whether ETL is still sane and should be continued. E.g. if
redis has been cleared it does not make sense to proceed. Raise an
exception if not sane."""
check_1 = redis.exists('data:{}'.format(self.request.id))
if not check_1:
error = "ETL failed! The associated entry in " \
"Redis has been removed while the ETL was running."
logger.error(error)
raise RuntimeError(error)
def update_redis(self, data_frame: DataFrame) -> None:
"""Set several meta information that can be used to filter the data
before the analysis.
......@@ -159,12 +170,14 @@ class ETL(Task, metaclass=abc.ABCMeta):
logger.info("Starting ETL process ...")
logger.info("(E)xtracting data from server '{}'.".format(server))
try:
self.sanityCheck()
raw_data = self.extract(server, token, descriptor)
except Exception as e:
logger.exception(e)
raise RuntimeError("Data extraction failed. {}".format(e))
logger.info("(T)ransforming data to Fractalis format.")
try:
self.sanityCheck()
data_frame = self.transform(raw_data, descriptor)
checker = IntegrityCheck.factory(self.produces)
checker.check(data_frame)
......@@ -177,6 +190,7 @@ class ETL(Task, metaclass=abc.ABCMeta):
logging.error(error, exc_info=1)
raise TypeError(error)
try:
self.sanityCheck()
if encrypt:
self.secure_load(data_frame, file_path)
else:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment