Commit dd2e5e35 authored by Sascha Herzinger's avatar Sascha Herzinger
Browse files

Added common array functionality and some tests to cover them

parent fd3f44b3
Pipeline #2246 failed with stage
in 1 minute and 10 seconds
......@@ -11,6 +11,8 @@ from rpy2.robjects import r, pandas2ri
from rpy2.robjects.packages import importr
from fractalis.analytics.task import AnalyticTask
from fractalis.analytics.tasks.shared.array_utils \
import drop_ungrouped_samples, drop_unused_subset_ids
importr('limma')
......@@ -29,10 +31,26 @@ class HeatmapTask(AnalyticTask):
numericals: List[pd.DataFrame],
categoricals: List[pd.DataFrame],
subsets: List[List[T]]) -> dict:
# combine data frames col wise
df = reduce(lambda a, b: a.append(b), numerical_arrays)
# prepare inputs
if not subsets:
subsets = [[]]
df = drop_ungrouped_samples(df=df, subsets=subsets)
subsets = drop_unused_subset_ids(df=df, subsets=subsets)
# get samples-only data frame
variables = df['variable']
df = df.drop('variable', axis=1)
zscores = df.apply(zscore, axis=1)
_df = df.drop('variable', axis=1)
# create z-score matrix used for visualising the heatmap
zscores = _df.apply(zscore, axis=1)
# execute differential gene expression analysis
stats = self.getLimmaStats(_df, subsets)
# not needed any longer
del _df
# prepare output for front-end
df = df.transpose()
......@@ -51,18 +69,27 @@ class HeatmapTask(AnalyticTask):
df.columns = ['id', 'variable', 'value', 'zscore']
return {
'data': df.to_json(orient='index')
'data': df.to_json(orient='index'),
'stats': stats.to_json(orient='index')
}
def getLimmaStats(self, df: pd.DataFrame, subsets: List[List[T]]):
# we consider all subset ids rather than df ids because an id might be
# in multiple subsets. In such a case we have to copy a row in the df.
def getLimmaStats(self, df: pd.DataFrame,
subsets: List[List[T]]) -> pd.DataFrame:
"""Use the R bioconductor package 'limma' to perform a differential
gene expression analysis on the given data frame.
:param df: Matrix of measurements where each column represents a sample
and each row a gene/probe.
:param subsets: Groups to compare with each other.
:return: Results of limma analysis. More than 2 subsets will result in
a different structured result data frame. See ?topTableF in R.
"""
# prepare the df in case an id exists in more than one subset
flattened_subsets = [x for subset in subsets for x in subset]
df = df[flattened_subsets]
ids = list(df)
# creating the design vector according to the subsets
design_vector = [None] * len(ids)
design_vector = [''] * len(ids)
subsets_copy = deepcopy(subsets)
for i, id in enumerate(ids):
for j, subset in enumerate(subsets_copy):
......@@ -74,7 +101,7 @@ class HeatmapTask(AnalyticTask):
except ValueError:
assert j != len(subsets_copy) - 1
assert None not in design_vector
assert '' not in design_vector
# create group names
groups = ['group{}'.format(i + 1) for i in list(range(len(subsets)))]
......
"""This module contains common array functionality used in analytic tasks."""
import logging
from copy import deepcopy
from typing import List, TypeVar
import pandas as pd
logger = logging.getLogger(__name__)
T = TypeVar('T')
_protected_colnames = ['variable']
def drop_ungrouped_samples(df: pd.DataFrame,
subsets: List[List[T]]) -> pd.DataFrame:
"""Drop samples cols that are no present in any of the subsets.
:param df: Unmodified data frame
submitted to the main method of an AnalyticTask.
:param subsets: Subgroups defined by the user.
:return: Filtered data frame.
"""
flattened_subsets = [x for subset in subsets for x in subset]
if not flattened_subsets:
error = "Subsets must not be empty."
logger.error(error)
raise ValueError(error)
colnames = list(set(flattened_subsets))
colnames += _protected_colnames # special colnames that we want to keep
colnames = [colname for colname in list(df) if colname in colnames]
df = df[colnames]
return df
def drop_unused_subset_ids(df: pd.DataFrame,
subsets: List[List[T]]) -> List[List[T]]:
"""Drop subset ids that are not present in the given data
:param df: Unmodified data frame
submitted to the main method of an AnalyticTask.
:param subsets: Subset groups specified by the user.
:return: Modified subsets list.
"""
df_ids = list(df)
df_ids = [el for el in df_ids if el not in _protected_colnames]
_subsets = deepcopy(subsets)
for subset in _subsets:
_subset = list(subset)
for id in _subset:
if id not in df_ids:
subset.remove(id)
return _subsets
......@@ -68,7 +68,7 @@ def apply_categories(df: pd.DataFrame,
def apply_id_filter(df: pd.DataFrame, id_filter: list) -> pd.DataFrame:
"""Throw away all rows whose id is not in id_filter.
"""Drop all rows whose id is not in id_filter.
:param df: The DataFrame to filter.
:param id_filter: The filter.
:return: The filtered DataFrame.
......
"""This module provides tests for the array_utils module."""
import pytest
import pandas as pd
from fractalis.analytics.tasks.shared.array_utils \
import drop_unused_subset_ids, drop_ungrouped_samples
# noinspection PyMissingOrEmptyDocstring,PyMethodMayBeStatic
class TestArrayUtils:
def test_drop_ungrouped_samples_1(self):
df = pd.DataFrame([[5, 10, 15, 20]])
subsets = [[]]
with pytest.raises(ValueError) as e:
drop_ungrouped_samples(df=df, subsets=subsets)
assert 'must not be empty' in e
def test_drop_ungrouped_samples_2(self):
df = pd.DataFrame([[5, 10, 15, 20]])
subsets = [[0, 1], [3]]
df = drop_ungrouped_samples(df=df, subsets=subsets)
assert [0, 1, 3] == list(df)
def test_drop_ungrouped_samples_3(self):
df = pd.DataFrame([[5, 10, 15, 20]])
subsets = [[0, 1], []]
df = drop_ungrouped_samples(df=df, subsets=subsets)
assert [0, 1] == list(df)
def test_drop_ungrouped_samples_4(self):
df = pd.DataFrame([[5, 10, 15, 20]])
subsets = [[0, 1], [5]]
df = drop_ungrouped_samples(df=df, subsets=subsets)
assert [0, 1] == list(df)
def test_drop_ungrouped_samples_5(self):
df = pd.DataFrame()
subsets = [[0, 1], [5]]
df = drop_ungrouped_samples(df=df, subsets=subsets)
assert not list(df)
def test_drop_unused_subset_ids_1(self):
df = pd.DataFrame([[5, 10, 15, 20]])
subsets = []
subsets = drop_unused_subset_ids(df=df, subsets=subsets)
assert subsets == []
def test_drop_unused_subset_ids_2(self):
df = pd.DataFrame([[5, 10, 15, 20]])
subsets = [[]]
subsets = drop_unused_subset_ids(df=df, subsets=subsets)
assert subsets == [[]]
def test_drop_unused_subset_ids_3(self):
df = pd.DataFrame([[5, 10, 15, 20]])
subsets = [[0, 1], [1, 2, 4], [8]]
subsets = drop_unused_subset_ids(df=df, subsets=subsets)
assert subsets == [[0, 1], [1, 2], []]
def test_drop_unused_subset_ids_4(self):
df = pd.DataFrame()
subsets = [[0, 1]]
subsets = drop_unused_subset_ids(df=df, subsets=subsets)
assert subsets == [[]]
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment