Commit e2d01be6 authored by Sascha Herzinger's avatar Sascha Herzinger
Browse files

major refactoring of analytics and etls to adapt to a new data standard

parent 3bec1a46
......@@ -8,8 +8,8 @@ import numpy as np
import scipy.stats
from fractalis.analytics.task import AnalyticTask
from fractalis.analytics.tasks.shared.common import \
apply_subsets, apply_categories, apply_id_filter
from fractalis.analytics.tasks.shared.utils import \
apply_subsets, apply_categories
T = TypeVar('T')
......@@ -22,40 +22,42 @@ class BoxplotTask(AnalyticTask):
name = 'compute-boxplot'
def main(self,
variables: List[pd.DataFrame],
features: List[pd.DataFrame],
categories: List[pd.DataFrame],
id_filter: List[T],
subsets: List[List[T]]) -> dict:
""" Compute boxplot statistics for the given parameters.
:param variables: List of numerical variables
:param categories: List of categorical variables used to group numerical
variables.
:param features: List of numerical features
:param categories: List of categorical features used to group numerical
features.
:param id_filter: List of ids that will be considered for analysis. If
empty all ids will be used.
:param subsets: List of subsets used as another way to group the
numerical variables.
numerical features.
"""
if not len(variables):
if not len(features):
raise ValueError("Must at least specify one "
"non empty numerical variable.")
df = reduce(lambda l, r: l.merge(r, on='id', how='outer'), variables)
df = apply_id_filter(df=df, id_filter=id_filter)
variable_names = df.columns.tolist()
variable_names.remove('id')
"non empty numerical feature.")
# merge dfs into single one
df = reduce(lambda l, r: l.append(r), features)
if id_filter:
df = df[df['id'].isin(id_filter)]
feature_names = df['feature'].unique()
df = apply_subsets(df=df, subsets=subsets)
df = apply_categories(df=df, categories=categories)
results = {
'data': df.to_json(orient='index'),
'data': df.to_json(orient='records'),
'statistics': {},
'variables': variable_names,
'categories': list(set(df['category'].tolist())),
'subsets': list(set(df['subset'].tolist()))
'features': feature_names,
'categories': df['category'].unique(),
'subsets': df['subset'].unique()
}
for variable in variable_names:
for feature in feature_names:
for subset in results['subsets']:
for category in results['categories']:
values = df[(df['subset'] == subset) &
(df['category'] == category)][variable].tolist()
(df['category'] == category) &
(df['feature'] == feature)]['value'].tolist()
values = [value for value in values if not np.isnan(value)]
if len(values) < 2:
continue
......@@ -64,7 +66,7 @@ class BoxplotTask(AnalyticTask):
xs = np.linspace(start=stats['l_wsk'],
stop=stats['u_wsk'], num=100)
stats['kde'] = kde(xs).tolist()
label = '{}//{}//s{}'.format(variable, category, subset + 1)
label = '{}//{}//s{}'.format(feature, category, subset + 1)
results['statistics'][label] = stats
return results
......
"""Module containing the Celery Task for the Correlation Analysis."""
from typing import List, TypeVar, Tuple
from typing import List, TypeVar
import pandas as pd
import numpy as np
from scipy import stats
from fractalis.analytics.task import AnalyticTask
from fractalis.analytics.tasks.shared.common import \
apply_subsets, apply_categories, apply_id_filter
from fractalis.analytics.tasks.shared.utils import \
apply_subsets, apply_categories
T = TypeVar('T')
......@@ -44,19 +44,20 @@ class CorrelationTask(AnalyticTask):
raise ValueError("Unknown method '{}'".format(method))
df = self.merge_x_y(x, y)
(x_label, y_label) = self.get_axis_labels(df)
df = apply_id_filter(df=df, id_filter=id_filter)
(x_label, y_label) = (df['feature_x'][0], df['feature_y'][0])
if id_filter:
df = df[df['id'].isin(id_filter)]
df = apply_subsets(df=df, subsets=subsets)
df = apply_categories(df=df, categories=categories)
global_stats = self.compute_stats(df, method, x_label, y_label)
global_stats = self.compute_stats(df, method)
subset_dfs = [df[df['subset'] == i] for i in range(len(subsets) or 1)]
subset_stats = [self.compute_stats(subset_df, method, x_label, y_label)
subset_stats = [self.compute_stats(subset_df, method)
for subset_df in subset_dfs]
output = global_stats
output['subsets'] = subset_stats
output['method'] = method
output['data'] = df.to_json(orient='index')
output['data'] = df.to_json(orient='records')
output['x_label'] = x_label
output['y_label'] = y_label
......@@ -69,27 +70,14 @@ class CorrelationTask(AnalyticTask):
:param y: The y-axis values.
:return: The merged data frame.
"""
df = x.merge(y, on='id', how='inner')
df = x.merge(y, on=['id'], how='inner')
df = df.dropna()
if df.shape[0] == 0:
raise ValueError("X and Y do not share any ids.")
return df
@staticmethod
def get_axis_labels(df: pd.DataFrame) -> Tuple[str, str]:
"""Extract axis labels from the given DataFrame.
:param df: DataFrame that has contains ids axis labels and possibly more
:return: A tuple containing both labels.
"""
colnames = [name for name in list(df) if name != 'id']
assert len(colnames) == 2
x_label = colnames[0]
y_label = colnames[1]
return x_label, y_label
@staticmethod
def compute_stats(df: pd.DataFrame, method: str,
x_label: str, y_label: str) -> dict:
def compute_stats(df: pd.DataFrame, method: str) -> dict:
"""Compute correlation statistics for the given data and the given
correlation method.
:param df: The DataFrame containing our data.
......@@ -100,7 +88,6 @@ class CorrelationTask(AnalyticTask):
"""
df = df.drop_duplicates('id')
df = df.dropna()
df = df[[x_label, y_label]]
if df.shape[0] < 2:
return {
'coef': float('nan'),
......@@ -108,17 +95,15 @@ class CorrelationTask(AnalyticTask):
'slope': float('nan'),
'intercept': float('nan')
}
x_list = df[x_label].values.tolist()
y_list = df[y_label].values.tolist()
if method == 'pearson':
corr_coef, p_value = stats.pearsonr(x_list, y_list)
corr_coef, p_value = stats.pearsonr(df['value_x'], df['value_y'])
elif method == 'spearman':
corr_coef, p_value = stats.spearmanr(x_list, y_list)
corr_coef, p_value = stats.spearmanr(df['value_x'], df['value_y'])
elif method == 'kendall':
corr_coef, p_value = stats.kendalltau(x_list, y_list)
corr_coef, p_value = stats.kendalltau(df['value_x'], df['value_y'])
else:
raise ValueError("Unknown correlation method.")
slope, intercept, *_ = np.polyfit(x_list, y_list, deg=1)
slope, intercept, *_ = np.polyfit(df['value_x'], df['value_y'], deg=1)
return {
'coef': corr_coef,
'p_value': p_value,
......
......@@ -5,7 +5,6 @@ from typing import List, Tuple
from collections import Counter
import pandas as pd
import numpy as np
from scipy.cluster import hierarchy as hclust
from scipy.cluster.vq import kmeans2
......
......@@ -9,7 +9,7 @@ from scipy.stats import zscore
from fractalis.analytics.task import AnalyticTask
from fractalis.analytics.tasks.heatmap.stats import StatisticTask
from fractalis.analytics.tasks.shared import array_utils
from fractalis.analytics.tasks.shared import utils
T = TypeVar('T')
......@@ -29,23 +29,26 @@ class HeatmapTask(AnalyticTask):
ranking_method: str,
id_filter: List[T],
subsets: List[List[T]]) -> dict:
# prepare inputs args
# merge input data into single df
df = reduce(lambda a, b: a.append(b), numerical_arrays)
if not subsets:
ids = list(df)
ids.remove('variable')
subsets = [ids]
df = array_utils.drop_ungrouped_samples(df=df, subsets=subsets)
df = array_utils.apply_id_filter(df=df, id_filter=id_filter)
subsets = array_utils.drop_unused_subset_ids(df=df, subsets=subsets)
# empty subsets equals all samples in one subset
subsets = df['id'].unique()
else:
# if subsets are defined we drop the rows that are not part of one
flattened_subsets = [x for subset in subsets for x in subset]
df = df[df['id'].isin(flattened_subsets)]
# apply id filter
if id_filter:
df = df[df['id'].isin(id_filter)]
# drop subset ids that are not in the df
subsets = utils.drop_unused_subset_ids(df=df, subsets=subsets)
# make sure the input data are still valid after the pre-processing
if df.shape[0] < 1 or df.shape[1] < 2:
if df.shape[0] < 1:
error = "Either the input data set is too small or " \
"the subset sample ids do not match the data."
logger.error(error)
raise ValueError(error)
for subset in subsets:
if not subset:
error = "One or more of the specified subsets does not " \
......@@ -53,23 +56,24 @@ class HeatmapTask(AnalyticTask):
logger.error(error)
raise ValueError(error)
# make matrix of input data
_df = df.pivot(index='feature', columns='id', values='value')
# create z-score matrix used for visualising the heatmap
variables = df['variable']
zscores = df.drop('variable', axis=1)
zscores = zscores.apply(zscore, axis=1)
zscores.insert(0, 'variable', variables)
z_df = _df.apply(zscore, axis=1)
# compute statistic for ranking
stats = self.stat_task.main(df=df, subsets=subsets,
stats = self.stat_task.main(df=_df, subsets=subsets,
ranking_method=ranking_method)
del _df
# prepare output for front-end
df = array_utils.melt_standard_format_df(df)
zscores = array_utils.melt_standard_format_df(zscores)
df = pd.merge(df, zscores, on=['id', 'variable'])
df.columns = ['id', 'variable', 'value', 'zscore']
z_df['feature'] = z_df.index
z_df = pd.melt(z_df, id_vars='feature')
df = df.merge(z_df, on=['id', 'feature'])
df.columns = ['id', 'feature', 'value', 'zscore']
return {
'data': df.to_json(orient='index'),
'stats': stats.to_json(orient='index')
'data': df.to_json(orient='records'),
'stats': stats.to_json(orient='records')
}
......@@ -37,34 +37,28 @@ class StatisticTask(AnalyticTask):
@staticmethod
def get_mean_stats(df: pd.DataFrame) -> pd.DataFrame:
variables = df['variable']
df = df.drop('variable', axis=1)
means = df.apply(mean, axis=1)
results = pd.concat([variables, means], axis=1)
results.columns = ['variable', 'mean']
return results
mean_series = df.apply(mean, axis=1)
df = mean_series.to_frame('mean')
df['feature'] = df.index
return df
@staticmethod
def get_median_stats(df: pd.DataFrame) -> pd.DataFrame:
variables = df['variable']
df = df.drop('variable', axis=1)
medians = df.apply(median, axis=1)
results = pd.concat([variables, medians], axis=1)
results.columns = ['variable', 'median']
return results
median_series = df.apply(median, axis=1)
df = median_series.to_frame('median')
df['feature'] = df.index
return df
@staticmethod
def get_variance_stats(df: pd.DataFrame) -> pd.DataFrame:
variables = df['variable']
df = df.drop('variable', axis=1)
variances = df.apply(var, axis=1)
results = pd.concat([variables, variances], axis=1)
results.columns = ['variable', 'variance']
return results
var_series = df.apply(var, axis=1)
df = var_series.to_frame('var')
df['feature'] = df.index
return df
@staticmethod
def get_limma_stats(df: pd.DataFrame,
subsets: List[List[T]]) -> pd.DataFrame:
subsets: List[List[T]]) -> pd.DataFrame:
"""Use the R bioconductor package 'limma' to perform a differential
gene expression analysis on the given data frame.
:param df: Matrix of measurements where each column represents a sample
......@@ -85,14 +79,10 @@ class StatisticTask(AnalyticTask):
logger.error(error)
raise ValueError(error)
# for analysis we want only sample cols
variables = df['variable']
df = df.drop('variable', axis=1)
flattened_subsets = [x for subset in subsets for x in subset]
df = df[flattened_subsets]
ids = list(df)
logger.critical(ids)
features = df.index
# creating the design vector according to the subsets
design_vector = [''] * len(ids)
......@@ -132,11 +122,11 @@ class StatisticTask(AnalyticTask):
r_fit_2 = r['contrasts.fit'](r_fit, r_contrast_matrix)
r_fit_2 = r['eBayes'](r_fit_2)
r_results = r['topTable'](r_fit_2, number=float('inf'),
sort='none', genelist=variables)
sort='none', genelist=features)
results = pandas2ri.ri2py(r_results)
# let's give the gene list column an appropriate name
colnames = results.columns.values
colnames[0] = 'variable'
colnames[0] = 'feature'
results.columns = colnames
return results
"""This module contains common array functionality used in analytic tasks."""
import logging
from copy import deepcopy
from typing import List, TypeVar
import pandas as pd
logger = logging.getLogger(__name__)
T = TypeVar('T')
_protected_colnames = ['variable']
def drop_ungrouped_samples(df: pd.DataFrame,
subsets: List[List[T]]) -> pd.DataFrame:
"""Drop samples cols that are no present in any of the subsets.
:param df: Dataframe containing array data in the Fractalis format.
:param subsets: Subgroups defined by the user.
:return: Filtered data frame.
"""
flattened_subsets = [x for subset in subsets for x in subset]
if not flattened_subsets:
error = "Subsets must not be empty."
logger.error(error)
raise ValueError(error)
colnames = list(set(flattened_subsets))
colnames += _protected_colnames # special colnames that we want to keep
colnames = [colname for colname in list(df) if colname in colnames]
df = df[colnames]
return df
def drop_unused_subset_ids(df: pd.DataFrame,
subsets: List[List[T]]) -> List[List[T]]:
"""Drop subset ids that are not present in the given data
:param df: Dataframe containing array data in the Fractalis format.
:param subsets: Subset groups specified by the user.
:return: Modified subsets list.
"""
df_ids = list(df)
df_ids = [el for el in df_ids if el not in _protected_colnames]
_subsets = deepcopy(subsets)
for subset in _subsets:
_subset = list(subset)
for id in _subset:
if id not in df_ids:
subset.remove(id)
return _subsets
def apply_id_filter(df: pd.DataFrame, id_filter: List[T]) -> pd.DataFrame:
if not id_filter:
return df
protected_cols = df[_protected_colnames]
samples = df[id_filter]
df = pd.concat([protected_cols, samples], axis=1)
return df
def melt_standard_format_df(df: pd.DataFrame) -> pd.DataFrame:
if df.shape[0] < 1 or df.shape[1] < 2:
error = "Data must be non-empty for melting."
logger.error(error)
raise ValueError(error)
variables = df['variable']
df.drop('variable', axis=1, inplace=True)
df = df.T
df.columns = variables
df.index.name = 'id'
df.reset_index(inplace=True)
df = pd.melt(df, id_vars='id')
return df
"""This module contains common functions used in analytic tasks."""
import logging
from typing import List, TypeVar
from functools import reduce
from copy import deepcopy
import pandas as pd
logger = logging.getLogger(__name__)
T = TypeVar('T')
......@@ -45,6 +48,8 @@ def apply_categories(df: pd.DataFrame,
:return: The base DataFrame with an additional 'category' column
"""
if len(categories):
# drop 'feature' column from dfs
categories = [df.drop('feature', axis=1) for df in categories]
# merge all dfs into one
data = reduce(lambda l, r: l.merge(r, on='id', how='outer'), categories)
# remember ids
......@@ -67,14 +72,18 @@ def apply_categories(df: pd.DataFrame,
return df
def apply_id_filter(df: pd.DataFrame, id_filter: list) -> pd.DataFrame:
"""Drop all rows whose id is not in id_filter.
:param df: The DataFrame to filter.
:param id_filter: The filter.
:return: The filtered DataFrame.
def drop_unused_subset_ids(df: pd.DataFrame,
subsets: List[List[T]]) -> List[List[T]]:
"""Drop subset ids that are not present in the given data
:param df: Dataframe containing array data in the Fractalis format.
:param subsets: Subset groups specified by the user.
:return: Modified subsets list.
"""
if id_filter:
df = df[df['id'].isin(id_filter)]
if df.shape[0] == 0:
raise ValueError("The current selection does not match any data.")
return df
ids = df['id'].unique()
_subsets = deepcopy(subsets)
for subset in _subsets:
_subset = list(subset)
for id in _subset:
if id not in ids:
subset.remove(id)
return _subsets
......@@ -49,7 +49,7 @@ class MergeDataFramesTask(AnalyticTask):
if not df_list:
return {'df': ''}
df = reduce(lambda l, r: l.append(r), df_list)
return {'df': df.to_json(orient='values')}
return {'df': df.to_json(orient='records')}
class InvalidReturnTask(AnalyticTask):
......
......@@ -32,5 +32,5 @@ class BooleanETL(ETL):
def transform(self, raw_data: List[dict], descriptor: dict) -> DataFrame:
data = shared.prepare_ids(raw_data)
data = shared.name_to_label(data, descriptor)
data_frame = DataFrame(data)
return data_frame
df = shared.make_data_frame(data)
return df
......@@ -32,5 +32,5 @@ class DateETL(ETL):
def transform(self, raw_data: List[dict], descriptor: dict) -> DataFrame:
data = shared.prepare_ids(raw_data)
data = shared.name_to_label(data, descriptor)
data_frame = DataFrame(data)
return data_frame
df = shared.make_data_frame(data)
return df
......@@ -32,6 +32,6 @@ class DoubleETL(ETL):
def transform(self, raw_data: List[dict], descriptor: dict) -> DataFrame:
data = shared.prepare_ids(raw_data)
data = shared.name_to_label(data, descriptor)
data_frame = DataFrame(data)
return data_frame
df = shared.make_data_frame(data)
return df
......@@ -30,7 +30,6 @@ class DoubleArrayETL(ETL):
return data
def transform(self, raw_data: List[dict], descriptor: dict) -> pd.DataFrame:
data = shared.prepare_ids(raw_data)
name = descriptor['dictionary']['name']
ids = []
values = []
......@@ -38,9 +37,7 @@ class DoubleArrayETL(ETL):
ids.append(row['id'])
values.append(row[name])
df = pd.DataFrame(values)
df = df.transpose()
df.columns = ids
variables = pd.Series(range(df.shape[0]))
df.insert(0, 'variable', variables)
df.insert(0, 'id', ids)
df = pd.melt(df, id_vars='id', var_name='feature')
return df
......@@ -36,5 +36,5 @@ class EnumETL(ETL):
value = descriptor['dictionary']['numValues'][str(value)]
row[descriptor['dictionary']['name']] = value
data = shared.name_to_label(data, descriptor)
data_frame = DataFrame(data)
return data_frame
df = shared.make_data_frame(data)
return df
......@@ -32,5 +32,5 @@ class IntegerETL(ETL):
def transform(self, raw_data: List[dict], descriptor: dict) -> DataFrame:
data = shared.prepare_ids(raw_data)
data = shared.name_to_label(data, descriptor)
data_frame = DataFrame(data)
return data_frame
df = shared.make_data_frame(data)
return df
......@@ -32,5 +32,5 @@ class StringETL(ETL):
def transform(self, raw_data: List[dict], descriptor: dict) -> DataFrame:
data = shared.prepare_ids(raw_data)
data = shared.name_to_label(data, descriptor)
data_frame = DataFrame(data)
return data_frame
df = shared.make_data_frame(data)
return df
......@@ -3,6 +3,7 @@
import logging
from typing import List
import pandas as pd
import requests
......@@ -59,3 +60,9 @@ def name_to_label(data: List[dict], descriptor: dict) -> List[dict]:
del row[descriptor['dictionary']['name']]
row[label] = value
return data
def make_data_frame(data: List[dict]) -> pd.DataFrame: