Commit 84ddc1fb authored by Sascha Herzinger's avatar Sascha Herzinger
Browse files

Implemented additional heatmap statistics

parent 00d078b8
Pipeline #2250 failed with stage
in 1 minute and 30 seconds
"""This module provides clustering algorithm for array type data."""
import logging
from typing import List, Tuple
from collections import Counter
from operator import itemgetter
import pandas as pd
import numpy as np
from scipy.cluster import hierarchy as hclust
from scipy.cluster.vq import kmeans2
from fractalis.analytics.task import AnalyticTask
logger = logging.getLogger(__name__)
class ClusteringTask(AnalyticTask):
name = 'compute-clustering'
def main(self, df: str, cluster_algo: str,
options: dict) -> dict:
df = pd.read_json(df)
# fill NAs with col medians so the clustering algorithms will work
df = df.T.fillna(df.median(axis=1)).T
if cluster_algo == 'hclust':
return self.hclust(df, options)
elif cluster_algo == 'kmeans':
return self.kmeans(df, options)
error = "Algorithm not implemented: '{}'".format(cluster_algo)
logger.error(error)
raise ValueError(error)
def hclust(self, df: pd.DataFrame, options: dict) -> dict:
try:
method = options['method']
metric = options['metric']
n_row_clusters = options['n_row_clusters']
n_col_clusters = options['n_col_clusters']
except KeyError:
error = "'method', 'metric', 'n_row_clusters', " \
"and 'n_col_clusters' are mandatory parameters to " \
"perform a hierarchical clustering."
logger.error(error)
raise ValueError(error)
row_names, row_clusters = self._hclust(df, method,
metric, n_row_clusters)
col_names, col_clusters = self._hclust(df.T, method,
metric, n_col_clusters)
return {
'row_names': row_names,
'col_names': col_names,
'row_cluster': row_clusters,
'col_cluster': col_clusters
}
def _hclust(self, df: pd.DataFrame,
method: str, metric: str, n_clusters: int) -> Tuple[List, List]:
names = list(df)
series = np.array(df)
z = hclust.linkage(series, method=method, metric=metric)
leaf_order = list(hclust.leaves_list(z))
cluster = [x[0] for x in hclust.cut_tree(z,
n_clusters=[n_clusters])]
names = list(itemgetter(*leaf_order)(names))
cluster_count = Counter(cluster)
sorted_cluster = sorted(zip(names, cluster),
key=lambda x: cluster_count[x[1]], reverse=True)
names = [x[0] for x in sorted_cluster]
cluster = [x[1] for x in sorted_cluster]
return names, cluster
def kmeans(self, df: pd.DataFrame, options: dict) -> dict:
try:
n_row_centroids = options['n_row_centroids']
n_col_centroids = options['n_col_centroids']
except KeyError:
error = "'n_row_centroids' and 'n_col_centroids' are mandatory " \
"parameters to perform a kmeans clustering."
logger.error(error)
raise ValueError(error)
row_names, row_clusters = self._kmeans(df, n_row_centroids)
col_names, col_clusters = self._kmeans(df.T, n_col_centroids)
return {
'row_names': row_names,
'col_names': col_names,
'row_cluster': row_clusters,
'col_cluster': col_clusters
}
def _kmeans(self, df: pd.DataFrame, n_centroids) -> Tuple[List, List]:
names = list(df)
series = np.array(df)
cluster = list(kmeans2(series, k=n_centroids)[1])
cluster_count = Counter(cluster)
sorted_cluster = sorted(zip(names, cluster),
key=lambda x: cluster_count[x[1]], reverse=True)
names = [x[0] for x in sorted_cluster]
cluster = [x[1] for x in sorted_cluster]
return names, cluster
"""Module containing analysis code for heatmap analytics."""
from copy import deepcopy
from typing import List, TypeVar
from functools import reduce
import logging
import pandas as pd
from scipy.stats import zscore
from rpy2 import robjects as R
from rpy2.robjects import r, pandas2ri
from rpy2.robjects.packages import importr
from fractalis.analytics.task import AnalyticTask
from fractalis.analytics.tasks.shared.array_utils \
import drop_ungrouped_samples, drop_unused_subset_ids
from fractalis.analytics.tasks.heatmap.stats import StatisticTask
from fractalis.analytics.tasks.shared import array_utils
importr('limma')
pandas2ri.activate()
T = TypeVar('T')
logger = logging.getLogger(__name__)
......@@ -28,10 +21,13 @@ class HeatmapTask(AnalyticTask):
submittable celery task."""
name = 'compute-heatmap'
stat_task = StatisticTask()
def main(self, numerical_arrays: List[pd.DataFrame],
numericals: List[pd.DataFrame],
categoricals: List[pd.DataFrame],
ranking_method: str,
id_filter: List[T],
subsets: List[List[T]]) -> dict:
# prepare inputs args
df = reduce(lambda a, b: a.append(b), numerical_arrays)
......@@ -39,8 +35,9 @@ class HeatmapTask(AnalyticTask):
ids = list(df)
ids.remove('variable')
subsets = [ids]
df = drop_ungrouped_samples(df=df, subsets=subsets)
subsets = drop_unused_subset_ids(df=df, subsets=subsets)
df = array_utils.drop_ungrouped_samples(df=df, subsets=subsets)
df = array_utils.apply_id_filter(df=df, id_filter=id_filter)
subsets = array_utils.drop_unused_subset_ids(df=df, subsets=subsets)
# make sure the input data are still valid after the pre-processing
if df.shape[0] < 1 or df.shape[1] < 2:
......@@ -62,12 +59,13 @@ class HeatmapTask(AnalyticTask):
zscores = zscores.apply(zscore, axis=1)
zscores.insert(0, 'variable', variables)
# execute differential gene expression analysis
stats = self.get_limma_stats(df, subsets)
# compute statistic for ranking
stats = self.stat_task.main(df=df, subsets=subsets,
ranking_method=ranking_method)
# prepare output for front-end
df = self.melt_standard_format_df(df)
zscores = self.melt_standard_format_df(zscores)
df = array_utils.melt_standard_format_df(df)
zscores = array_utils.melt_standard_format_df(zscores)
df = pd.merge(df, zscores, on=['id', 'variable'])
df.columns = ['id', 'variable', 'value', 'zscore']
......@@ -75,95 +73,3 @@ class HeatmapTask(AnalyticTask):
'data': df.to_json(orient='index'),
'stats': stats.to_json(orient='index')
}
def melt_standard_format_df(self, df: pd.DataFrame) -> pd.DataFrame:
if df.shape[0] < 1 or df.shape[1] < 2:
error = "Data must be non-empty for melting."
logger.error(error)
raise ValueError(error)
variables = df['variable']
df.drop('variable', axis=1, inplace=True)
df = df.T
df.columns = variables
df.index.name = 'id'
df.reset_index(inplace=True)
df = pd.melt(df, id_vars='id')
return df
def get_limma_stats(self, df: pd.DataFrame,
subsets: List[List[T]]) -> pd.DataFrame:
"""Use the R bioconductor package 'limma' to perform a differential
gene expression analysis on the given data frame.
:param df: Matrix of measurements where each column represents a sample
and each row a gene/probe.
:param subsets: Groups to compare with each other.
:return: Results of limma analysis. More than 2 subsets will result in
a different structured result data frame. See ?topTableF in R.
"""
# prepare the df in case an id exists in more than one subset
if len(subsets) < 2:
error = "Limma analysis requires at least " \
"two groups for comparison."
logger.error(error)
raise ValueError(error)
if df.shape[0] < 1 or df.shape[1] < 2:
error = "Limma analysis requires a " \
"data frame with dimension 1x2 or more."
logger.error(error)
raise ValueError(error)
# for analysis we want only sample cols
variables = df['variable']
df = df.drop('variable', axis=1)
flattened_subsets = [x for subset in subsets for x in subset]
df = df[flattened_subsets]
ids = list(df)
logger.critical(ids)
# creating the design vector according to the subsets
design_vector = [''] * len(ids)
subsets_copy = deepcopy(subsets)
for i, id in enumerate(ids):
for j, subset in enumerate(subsets_copy):
try:
subset.index(id) # raises an Exception if not found
subset.remove(id)
design_vector[i] = str(j + 1)
break
except ValueError:
assert j != len(subsets_copy) - 1
assert '' not in design_vector
# create group names
groups = ['group{}'.format(i + 1) for i in list(range(len(subsets)))]
# create a string for each pairwise comparison of the groups
comparisons = []
for i in reversed(range(len(subsets))):
for j in range(i):
comparisons.append('group{}-group{}'.format(i+1, j+1))
# fitting according to limma doc Chapter 8: Linear Models Overview
r_form = R.Formula('~ 0+factor(c({}))'.format(','.join(design_vector)))
r_design = r['model.matrix'](r_form)
r_design.colnames = R.StrVector(groups)
r_data = pandas2ri.py2ri(df)
# the next two lines are necessary if column ids are not unique, because
# the python to r transformation drops those columns otherwise
r_ids = R.StrVector(['X{}'.format(id) for id in ids])
r_data = r_data.rx(r_ids)
r_fit = r['lmFit'](r_data, r_design)
r_contrast_matrix = r['makeContrasts'](*comparisons, levels=r_design)
r_fit_2 = r['contrasts.fit'](r_fit, r_contrast_matrix)
r_fit_2 = r['eBayes'](r_fit_2)
r_results = r['topTable'](r_fit_2, number=float('inf'),
sort='none', genelist=variables)
results = pandas2ri.ri2py(r_results)
# let's give the gene list column an appropriate name
colnames = results.columns.values
colnames[0] = 'variable'
results.columns = colnames
return results
"""This module provides row-wise statistics for the heat map."""
from copy import deepcopy
from typing import List, TypeVar
import logging
import pandas as pd
from numpy import mean, median, var
from rpy2 import robjects as R
from rpy2.robjects import r, pandas2ri
from rpy2.robjects.packages import importr
from fractalis.analytics.task import AnalyticTask
T = TypeVar('T')
importr('limma')
pandas2ri.activate()
logger = logging.getLogger(__name__)
class StatisticTask(AnalyticTask):
name = 'expression-stats-task'
def main(self, df: pd.DataFrame, subsets: List[List[T]],
ranking_method: str) -> pd.DataFrame:
if ranking_method == 'mean':
stats = self.get_mean_stats(df)
elif ranking_method == 'median':
stats = self.get_median_stats(df)
elif ranking_method == 'variance':
stats = self.get_variance_stats(df)
else:
stats = self.get_limma_stats(df, subsets)
return stats
@staticmethod
def get_mean_stats(df: pd.DataFrame) -> pd.DataFrame:
variables = df['variable']
df = df.drop('variable', axis=1)
means = df.apply(mean, axis=1)
results = pd.concat([variables, means], axis=1)
results.columns = ['variable', 'mean']
return results
@staticmethod
def get_median_stats(df: pd.DataFrame) -> pd.DataFrame:
variables = df['variable']
df = df.drop('variable', axis=1)
medians = df.apply(median, axis=1)
results = pd.concat([variables, medians], axis=1)
results.columns = ['variable', 'median']
return results
@staticmethod
def get_variance_stats(df: pd.DataFrame) -> pd.DataFrame:
variables = df['variable']
df = df.drop('variable', axis=1)
variances = df.apply(var, axis=1)
results = pd.concat([variables, variances], axis=1)
results.columns = ['variable', 'variance']
return results
@staticmethod
def get_limma_stats(df: pd.DataFrame,
subsets: List[List[T]]) -> pd.DataFrame:
"""Use the R bioconductor package 'limma' to perform a differential
gene expression analysis on the given data frame.
:param df: Matrix of measurements where each column represents a sample
and each row a gene/probe.
:param subsets: Groups to compare with each other.
:return: Results of limma analysis. More than 2 subsets will result in
a different structured result data frame. See ?topTableF in R.
"""
# prepare the df in case an id exists in more than one subset
if len(subsets) < 2:
error = "Limma analysis requires at least " \
"two groups for comparison."
logger.error(error)
raise ValueError(error)
if df.shape[0] < 1 or df.shape[1] < 2:
error = "Limma analysis requires a " \
"data frame with dimension 1x2 or more."
logger.error(error)
raise ValueError(error)
# for analysis we want only sample cols
variables = df['variable']
df = df.drop('variable', axis=1)
flattened_subsets = [x for subset in subsets for x in subset]
df = df[flattened_subsets]
ids = list(df)
logger.critical(ids)
# creating the design vector according to the subsets
design_vector = [''] * len(ids)
subsets_copy = deepcopy(subsets)
for i, id in enumerate(ids):
for j, subset in enumerate(subsets_copy):
try:
subset.index(id) # raises an Exception if not found
subset.remove(id)
design_vector[i] = str(j + 1)
break
except ValueError:
assert j != len(subsets_copy) - 1
assert '' not in design_vector
# create group names
groups = ['group{}'.format(i + 1) for i in list(range(len(subsets)))]
# create a string for each pairwise comparison of the groups
comparisons = []
for i in reversed(range(len(subsets))):
for j in range(i):
comparisons.append('group{}-group{}'.format(i+1, j+1))
# fitting according to limma doc Chapter 8: Linear Models Overview
r_form = R.Formula('~ 0+factor(c({}))'.format(','.join(design_vector)))
r_design = r['model.matrix'](r_form)
r_design.colnames = R.StrVector(groups)
r_data = pandas2ri.py2ri(df)
# the next two lines are necessary if column ids are not unique, because
# the python to r transformation drops those columns otherwise
r_ids = R.StrVector(['X{}'.format(id) for id in ids])
r_data = r_data.rx(r_ids)
r_fit = r['lmFit'](r_data, r_design)
r_contrast_matrix = r['makeContrasts'](*comparisons, levels=r_design)
r_fit_2 = r['contrasts.fit'](r_fit, r_contrast_matrix)
r_fit_2 = r['eBayes'](r_fit_2)
r_results = r['topTable'](r_fit_2, number=float('inf'),
sort='none', genelist=variables)
results = pandas2ri.ri2py(r_results)
# let's give the gene list column an appropriate name
colnames = results.columns.values
colnames[0] = 'variable'
results.columns = colnames
return results
......@@ -51,3 +51,26 @@ def drop_unused_subset_ids(df: pd.DataFrame,
subset.remove(id)
return _subsets
def apply_id_filter(df: pd.DataFrame, id_filter: List[T]) -> pd.DataFrame:
if not id_filter:
return df
protected_cols = df[_protected_colnames]
samples = df[id_filter]
df = pd.concat([protected_cols, samples], axis=1)
return df
def melt_standard_format_df(df: pd.DataFrame) -> pd.DataFrame:
if df.shape[0] < 1 or df.shape[1] < 2:
error = "Data must be non-empty for melting."
logger.error(error)
raise ValueError(error)
variables = df['variable']
df.drop('variable', axis=1, inplace=True)
df = df.T
df.columns = variables
df.index.name = 'id'
df.reset_index(inplace=True)
df = pd.melt(df, id_vars='id')
return df
......@@ -18,6 +18,7 @@ setup(
'pandas',
'numpy',
'scipy',
'sklearn',
'requests',
'PyYAML',
'pycrypto',
......
......@@ -11,67 +11,6 @@ class TestHeatmap:
task = HeatmapTask()
def test_melt_standard_format_df_works_for_standard_df(self):
df = pd.DataFrame([['foo', 5, 10],
['bar', 10, 15]],
columns=['variable', 101, 102])
df = self.task.melt_standard_format_df(df)
assert list(df) == ['id', 'variable', 'value']
assert df.shape == (4, 3)
def test_melt_standard_format_df_works_for_minimal_df(self):
df = pd.DataFrame([['foo', 5]], columns=['variable', 101])
df = self.task.melt_standard_format_df(df)
assert list(df) == ['id', 'variable', 'value']
assert df.shape == (1, 3)
def test_melt_standard_format_df_raises_for_invalid_df(self):
df = pd.DataFrame([['foo']], columns=['variable'])
with pytest.raises(ValueError) as e:
self.task.melt_standard_format_df(df)
assert 'must be non-empty' in e
def test_get_limma_stats_raises_for_invalid_subsets(self):
df = pd.DataFrame([['foo', 5, 10, 15, 20]],
columns=['variable', 0, 1, 2, 3])
subsets = [[0, 1]]
with pytest.raises(ValueError) as e:
self.task.get_limma_stats(df=df, subsets=subsets)
assert 'requires at least two' in e
def test_get_limma_stats_raises_for_invalid_df(self):
df = pd.DataFrame([['foo']], columns=['variable'])
subsets = [[0], [0]]
with pytest.raises(ValueError) as e:
self.task.get_limma_stats(df=df, subsets=subsets)
assert 'dimension 1x2 or more' in e
def test_get_limma_stats_returns_correct_for_2_groups(self):
df = pd.DataFrame([['foo', 5, 10, 15, 20]],
columns=['variable', 0, 1, 2, 3])
subsets = [[0, 1], [2, 3]]
stats = self.task.get_limma_stats(df=df, subsets=subsets)
assert all(stat in list(stats) for stat in
['variable', 'logFC', 'AveExpr', 't', 'P.Value', 'adj.P.Val', 'B'])
def test_get_limma_stats_returns_correct_for_3_groups(self):
df = pd.DataFrame([['foo', 5, 10, 15, 20]],
columns=['variable', 0, 1, 2, 3])
subsets = [[0, 1], [2], [3]]
stats = self.task.get_limma_stats(df=df, subsets=subsets)
assert all(stat in list(stats) for stat in
['variable', 'AveExpr', 'F', 'P.Value', 'adj.P.Val'])
assert all(stat not in list(stats) for stat in ['logFC', 'B', 't'])
def test_get_limma_stats_returns_correct_for_4_groups(self):
df = pd.DataFrame([['foo', 5, 10, 15, 20]],
columns=['variable', 0, 1, 2, 3])
subsets = [[0, 1], [1, 2], [2, 3], [3, 0]]
stats = self.task.get_limma_stats(df=df, subsets=subsets)
assert all(stat in list(stats) for stat in
['variable', 'AveExpr', 'F', 'P.Value', 'adj.P.Val'])
assert all(stat not in list(stats) for stat in ['logFC', 'B', 't'])
def test_functional_1(self):
numerical_arrays = [
pd.DataFrame([['foo', 5, 10, 15, 20], ['bar', 6, 11, 16, 21]],
......
"""This module provides tests for the array_utils module."""
import pytest
import pandas as pd
from fractalis.analytics.tasks.heatmap.stats import StatisticTask
# noinspection PyMissingOrEmptyDocstring,PyMethodMayBeStatic
class TestArrayUtils:
stat_task = StatisticTask()
def test_get_limma_stats_raises_for_invalid_subsets(self):
df = pd.DataFrame([['foo', 5, 10, 15, 20]],
columns=['variable', 0, 1, 2, 3])
subsets = [[0, 1]]
with pytest.raises(ValueError) as e:
self.stat_task.get_limma_stats(df=df, subsets=subsets)
assert 'requires at least two' in e
def test_get_limma_stats_raises_for_invalid_df(self):
df = pd.DataFrame([['foo']], columns=['variable'])
subsets = [[0], [0]]
with pytest.raises(ValueError) as e:
self.stat_task.get_limma_stats(df=df, subsets=subsets)
assert 'dimension 1x2 or more' in e
def test_get_limma_stats_returns_correct_for_2_groups(self):
df = pd.DataFrame([['foo', 5, 10, 15, 20]],
columns=['variable', 0, 1, 2, 3])
subsets = [[0, 1], [2, 3]]
stats = self.stat_task.get_limma_stats(df=df, subsets=subsets)
assert all(stat in list(stats) for stat in
['variable', 'logFC', 'AveExpr', 't', 'P.Value', 'adj.P.Val',
'B'])
def test_get_limma_stats_returns_correct_for_3_groups(self):
df = pd.DataFrame([['foo', 5, 10, 15, 20]],
columns=['variable', 0, 1, 2, 3])
subsets = [[0, 1], [2], [3]]
stats = self.stat_task.get_limma_stats(df=df, subsets=subsets)
assert all(stat in list(stats) for stat in
['variable', 'AveExpr', 'F', 'P.Value', 'adj.P.Val'])
assert all(stat not in list(stats) for stat in ['logFC', 'B', 't'])
def test_get_limma_stats_returns_correct_for_4_groups(self):
df = pd.DataFrame([['foo', 5, 10, 15, 20]],
columns=['variable', 0, 1, 2, 3])
subsets = [[0, 1], [1, 2], [2, 3], [3, 0]]
stats = self.stat_task.get_limma_stats(df=df, subsets=subsets)
assert all(stat in list(stats) for stat in
['variable', 'AveExpr', 'F', 'P.Value', 'adj.P.Val'])
assert all(stat not in list(stats) for stat in ['logFC', 'B', 't'])
......@@ -3,8 +3,7 @@
import pytest
import pandas as pd
from fractalis.analytics.tasks.shared.array_utils \
import drop_unused_subset_ids, drop_ungrouped_samples
from fractalis.analytics.tasks.shared import array_utils