Commit 992dcfe7 authored by Sascha Herzinger's avatar Sascha Herzinger
Browse files

make project pass flake8 checks

parent 735ef15c
......@@ -18,3 +18,4 @@ before_script:
test:
script:
- python3 setup.py test
- flake8
......@@ -21,4 +21,3 @@ RUN celery worker -A fractalis:celery -D -l info
EXPOSE 5000
CMD ["python3", "fractalis/__init__.py"]
......@@ -23,7 +23,7 @@ logger = logging.getLogger(__name__)
def create_task() -> Tuple[Response, int]:
"""Create a new analytics task based on the parameters in the POST body.
See doc/api/ for more information.
:return: Flask Response
:return: Flask Response
"""
logger.debug("Received POST request on /analytics.")
json = request.get_json(force=True)
......@@ -46,7 +46,7 @@ def get_task_details(task_id: UUID) -> Tuple[Response, int]:
"""Get task details for the given task_id.
See doc/api/ for more information.
:param task_id: ID returned on task creation.
:return: Flask Response
:return: Flask Response
"""
logger.debug("Received GET request on /analytics/task_id.")
wait = request.args.get('wait') == '1'
......
......@@ -66,12 +66,12 @@ class AnalyticTask(Task, metaclass=abc.ABCMeta):
def data_task_id_to_data_frame(
self, data_task_id: str,
session_data_tasks: List[str], decrypt: bool) -> DataFrame:
"""Attempts to load the data frame associated with the provided data id.
"""Attempts to load the data frame associated with the provided data id
:param data_task_id: The data id associated with the previously loaded
data.
:param session_data_tasks: A list of data tasks previously executed by
this the requesting session. This is used for permission checks.
:param decrypt: Specify whether the data have to be decrypted for usage.
:param decrypt: Specify whether the data have to be decrypted for usage
only part of the data, for instance some genes out of thousands.
:return: A pandas data frame associated with the data id.
"""
......@@ -104,8 +104,8 @@ class AnalyticTask(Task, metaclass=abc.ABCMeta):
def apply_filters(df: DataFrame, filters: dict) -> DataFrame:
"""Apply filter to data frame and return it.
:param df: The data frame.
:param filters: The filters where each key is represents a column in the
data frame and the value a list of values to keep.
:param filters: The filters where each key is represents a column
in the data frame and the value a list of values to keep.
:return: Filtered data frame.
"""
for key in filters:
......@@ -121,8 +121,8 @@ class AnalyticTask(Task, metaclass=abc.ABCMeta):
:return: True if argument contains data_task_id.
"""
return isinstance(value, str) and \
value.startswith('$') and \
value.endswith('$')
value.startswith('$') and \
value.endswith('$')
@staticmethod
def parse_value(value: str) -> Tuple[str, dict]:
......@@ -146,7 +146,7 @@ class AnalyticTask(Task, metaclass=abc.ABCMeta):
def prepare_args(self, session_data_tasks: List[str],
args: dict, decrypt: bool) -> dict:
"""Replace data task ids in the arguments with their associated
"""Replace data task ids in the arguments with their associated
data frame located on the file system. This currently works for non
nested strings and non nested lists containing strings.
:param session_data_tasks: We use this list to check access.
......
......@@ -70,7 +70,7 @@ class CorrelationTask(AnalyticTask):
@staticmethod
def merge_x_y(x: pd.DataFrame, y: pd.DataFrame) -> pd.DataFrame:
"""Merge the x and y DataFrame and drop all rows containing NA.
:param x: The x-axis values.
:param x: The x-axis values.
:param y: The y-axis values.
:return: The merged data frame.
"""
......@@ -84,11 +84,11 @@ class CorrelationTask(AnalyticTask):
def compute_stats(df: pd.DataFrame, method: str) -> dict:
"""Compute correlation statistics for the given data and the given
correlation method.
:param df: The DataFrame containing our data.
:param df: The DataFrame containing our data.
:param method: The method to use.
:param x_label: The name of the x-axis.
:param y_label: The name of the y-axis.
:return: Coefficient, p-value, regression slope and regression intercept
:return: Several relevant statistics
"""
df = df.drop_duplicates('id')
df = df.dropna()
......
......@@ -57,8 +57,8 @@ class ClusteringTask(AnalyticTask):
'col_clusters': list(zip(col_names, col_clusters))
}
def _hclust(self, df: pd.DataFrame,
method: str, metric: str, n_clusters: int) -> Tuple[List, List]:
def _hclust(self, df: pd.DataFrame, method: str,
metric: str, n_clusters: int) -> Tuple[List, List]:
names = list(df.index)
values = df.values
z = hclust.linkage(values, method=method, metric=metric)
......
......@@ -112,8 +112,8 @@ class StatisticTask(AnalyticTask):
r_design = r['model.matrix'](r_form)
r_design.colnames = R.StrVector(groups)
r_data = pandas2ri.py2ri(df)
# the next two lines are necessary if column ids are not unique, because
# the python to r transformation drops those columns otherwise
# the next two lines are necessary if column ids are not unique,
# because the python to r transformation drops those columns otherwise
r_ids = R.StrVector(['X{}'.format(id) for id in ids])
r_data = r_data.rx(r_ids)
r_fit = r['lmFit'](r_data, r_design)
......
......@@ -78,4 +78,4 @@ class PCATask(AnalyticTask):
'data': reduced_df.to_dict(orient='list'),
'loadings': loadings.to_dict(orient='list'),
'variance_ratios': variance_ratios.tolist()
}
\ No newline at end of file
}
......@@ -51,15 +51,19 @@ def apply_categories(df: pd.DataFrame,
# drop 'feature' column from dfs
categories = [df.drop('feature', axis=1) for df in categories]
# merge all dfs into one
data = reduce(lambda l, r: l.merge(r, on='id', how='outer'), categories)
data = reduce(lambda l, r: l.merge(r, on='id', how='outer'),
categories)
# remember ids
ids = data['id']
# drop id column
data = data.drop('id', axis=1)
# replace everything that is not an category with ''
data = data.applymap(lambda el: el if isinstance(el, str) and el else '')
data = data.applymap(
lambda el: el if isinstance(el, str) and el else '')
# join all columns with && into a single one. Ignore '' entries.
data = data.apply(lambda row: ' AND '.join(list(map(str, [el for el in row.tolist() if el]))), axis=1)
data = data.apply(
lambda row: ' AND '.join(
list(map(str, [el for el in row.tolist() if el]))), axis=1)
# cast Series to DataFrame
data = pd.DataFrame(data, columns=['category'])
# reassign ids to collapsed df
......
......@@ -41,6 +41,7 @@ class SumDataFrameTask(AnalyticTask):
result = {'sum': a.sum().sum()}
return result
class MergeDataFramesTask(AnalyticTask):
name = 'merge_df_task'
......@@ -51,6 +52,7 @@ class MergeDataFramesTask(AnalyticTask):
df = reduce(lambda l, r: l.append(r), df_list)
return {'df': df.to_json(orient='records')}
class InvalidReturnTask(AnalyticTask):
name = 'no_dict_task'
......
......@@ -39,6 +39,7 @@ def make_celery(app: Flask) -> Celery:
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
......@@ -66,6 +67,3 @@ def register_tasks() -> None:
for analytics_task_class in analytics_task_classes:
logger.info("Registering task: {}".format(analytics_task_class.name))
celery.tasks.register(analytics_task_class)
import os
import logging
from uuid import uuid4
from datetime import timedelta
# DO NOT MODIFY THIS FILE DIRECTLY
......
......@@ -32,7 +32,8 @@ def create_data_task() -> Tuple[Response, int]:
etl_handler = ETLHandler.factory(handler=payload['handler'],
server=payload['server'],
auth=payload['auth'])
task_ids = etl_handler.handle(descriptors=payload['descriptors'], wait=wait)
task_ids = etl_handler.handle(descriptors=payload['descriptors'],
wait=wait)
session['data_tasks'] += task_ids
session['data_tasks'] = list(set(session['data_tasks']))
logger.debug("Tasks successfully submitted. Sending response.")
......@@ -70,7 +71,7 @@ def get_all_data() -> Tuple[Response, int]:
"""Get information for all tasks that have been submitted in the lifetime
of the current session.
See doc/api/ for more information.
:return: Information associated with each submitted task
:return: Information associated with each submitted task
"""
logger.debug("Received GET request on /data.")
wait = request.args.get('wait') == '1'
......
......@@ -20,8 +20,8 @@ class ETL(Task, metaclass=abc.ABCMeta):
"""This is an abstract class that implements a celery Task and provides a
factory method to create instances of implementations of itself. Its main
purpose is to manage extraction of the data from the target server. ETL
stands for (E)xtract (T)ransform (L)oad and not by coincidence similar named
methods can be found in this class.
stands for (E)xtract (T)ransform (L)oad and not by coincidence similar
named methods can be found in this class.
"""
@property
......@@ -69,10 +69,11 @@ class ETL(Task, metaclass=abc.ABCMeta):
return ETLTask()
except Exception as e:
logger.warning("Caught exception and assumed that ETL '{}' "
"cannot handle handler '{}' and descriptor: '{}'"
" Exception:'{}'".format(type(ETLTask).__name__,
handler,
str(descriptor), e))
"cannot handle handler '{}' "
"and descriptor: '{}'. Exception:'{}'".format(
type(ETLTask).__name__,
handler,
str(descriptor), e))
continue
raise NotImplementedError(
......@@ -156,7 +157,7 @@ class ETL(Task, metaclass=abc.ABCMeta):
:param descriptor: Contains all necessary information to download data
:param file_path: The location where the data will be stored
:param encrypt: Whether or not the data should be encrypted.
:return: The data id. Used to access the associated redis entry later on
:return: The data id. Used to access the associated redis entry later
"""
logger.info("Starting ETL process ...")
logger.info("(E)xtracting data from server '{}'.".format(server))
......
......@@ -77,7 +77,7 @@ class ETLHandler(metaclass=abc.ABCMeta):
descriptor: dict, data_type: str) -> None:
"""Creates an entry in Redis that reflects all meta data surrounding the
downloaded data. E.g. data type, file system location, ...
:param task_id: Id associated with the loaded data.
:param task_id: Id associated with the loaded data.
:param file_path: Location of the data on the file system.
:param descriptor: Describes the data and is used to download them.
:param data_type: The fractalis internal data type of the loaded data.
......@@ -98,8 +98,8 @@ class ETLHandler(metaclass=abc.ABCMeta):
def handle(self, descriptors: List[dict], wait: bool = False) -> List[str]:
"""Create instances of ETL for the given descriptors and submit them
(ETL implements celery.Task) to the broker. The task ids are returned to
keep track of them.
(ETL implements celery.Task) to the broker. The task ids are returned
to keep track of them.
:param descriptors: A list of items describing the data to download.
:param wait: Makes this method synchronous by waiting for the tasks to
return.
......
......@@ -34,4 +34,3 @@ class DoubleETL(ETL):
data = shared.name_to_label(data, descriptor)
df = shared.make_data_frame(data)
return df
......@@ -29,7 +29,8 @@ class DoubleArrayETL(ETL):
cookie=cookie, projection=projection)
return data
def transform(self, raw_data: List[dict], descriptor: dict) -> pd.DataFrame:
def transform(self, raw_data: List[dict],
descriptor: dict) -> pd.DataFrame:
name = descriptor['dictionary']['name']
ids = []
values = []
......@@ -40,4 +41,3 @@ class DoubleArrayETL(ETL):
df.insert(0, 'id', ids)
df = pd.melt(df, id_vars='id', var_name='feature')
return df
......@@ -21,7 +21,7 @@ def get_field(server: str, data_set: str,
params={
'dataSet': data_set,
'projection': ['_id', projection],
'filterOrId': '[{{"fieldName":"{}","conditionType":"!=","value":""}}]'.format(projection)
'filterOrId': '[{{"fieldName":"{}","conditionType":"!=","value":""}}]'.format(projection) # noqa: 501
},
cookies=cookie,
timeout=60)
......
......@@ -22,8 +22,9 @@ class RandomNumericalETL(ETL):
token: str, descriptor: dict) -> pd.DataFrame:
feature = ''.join(random.choice(string.ascii_letters + string.digits)
for _ in range(30))
data = pd.DataFrame(np.random.randn(descriptor['num_samples']).tolist(),
columns=[feature])
data = pd.DataFrame(
np.random.randn(descriptor['num_samples']).tolist(),
columns=[feature])
return data
def transform(self, raw_data: pd.DataFrame,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment