Commit c9f6e3fc authored by Sascha Herzinger's avatar Sascha Herzinger
Browse files

Implemented cluster tests and fixed discovered bugs

parent 84ddc1fb
Pipeline #2251 failed with stage
in 1 minute and 14 seconds
......@@ -3,7 +3,6 @@
import logging
from typing import List, Tuple
from collections import Counter
from operator import itemgetter
import pandas as pd
import numpy as np
......@@ -22,7 +21,12 @@ class ClusteringTask(AnalyticTask):
def main(self, df: str, cluster_algo: str,
options: dict) -> dict:
df = pd.read_json(df)
try:
df = pd.read_json(df)
except Exception:
error = "Failed to parse string to data frame."
logger.error(error)
raise ValueError(error)
# fill NAs with col medians so the clustering algorithms will work
df = df.T.fillna(df.median(axis=1)).T
if cluster_algo == 'hclust':
......@@ -45,9 +49,9 @@ class ClusteringTask(AnalyticTask):
"perform a hierarchical clustering."
logger.error(error)
raise ValueError(error)
row_names, row_clusters = self._hclust(df, method,
row_names, row_clusters = self._hclust(df.T, method,
metric, n_row_clusters)
col_names, col_clusters = self._hclust(df.T, method,
col_names, col_clusters = self._hclust(df, method,
metric, n_col_clusters)
return {
'row_names': row_names,
......@@ -61,15 +65,22 @@ class ClusteringTask(AnalyticTask):
names = list(df)
series = np.array(df)
z = hclust.linkage(series, method=method, metric=metric)
leaf_order = list(hclust.leaves_list(z))
cluster = [x[0] for x in hclust.cut_tree(z,
n_clusters=[n_clusters])]
names = list(itemgetter(*leaf_order)(names))
cluster_count = Counter(cluster)
# sort elements by their cluster size
sorted_cluster = sorted(zip(names, cluster),
key=lambda x: cluster_count[x[1]], reverse=True)
names = [x[0] for x in sorted_cluster]
cluster = [x[1] for x in sorted_cluster]
# relabel cluster, with the biggest cluster being 0
c = 0
relabeled_cluster = []
for i, v in enumerate(cluster):
if i > 0 and cluster[i] != cluster[i-1]:
c += 1
relabeled_cluster.append(c)
cluster = relabeled_cluster
return names, cluster
def kmeans(self, df: pd.DataFrame, options: dict) -> dict:
......@@ -82,8 +93,8 @@ class ClusteringTask(AnalyticTask):
logger.error(error)
raise ValueError(error)
row_names, row_clusters = self._kmeans(df, n_row_centroids)
col_names, col_clusters = self._kmeans(df.T, n_col_centroids)
row_names, row_clusters = self._kmeans(df.T, n_row_centroids)
col_names, col_clusters = self._kmeans(df, n_col_centroids)
return {
'row_names': row_names,
'col_names': col_names,
......@@ -93,11 +104,20 @@ class ClusteringTask(AnalyticTask):
def _kmeans(self, df: pd.DataFrame, n_centroids) -> Tuple[List, List]:
names = list(df)
series = np.array(df)
cluster = list(kmeans2(series, k=n_centroids)[1])
series = np.array(df).astype('float')
cluster = list(kmeans2(series, k=n_centroids, minit='points')[1])
cluster_count = Counter(cluster)
# sort elements by their cluster size
sorted_cluster = sorted(zip(names, cluster),
key=lambda x: cluster_count[x[1]], reverse=True)
names = [x[0] for x in sorted_cluster]
cluster = [x[1] for x in sorted_cluster]
# relabel cluster, with the biggest cluster being 0
c = 0
relabeled_cluster = []
for i, v in enumerate(cluster):
if i > 0 and cluster[i] != cluster[i-1]:
c += 1
relabeled_cluster.append(c)
cluster = relabeled_cluster
return names, cluster
"""This module provides tests for the cluster task
within the heatmap workflow."""
import json
import pytest
from fractalis.analytics.tasks.heatmap.cluster import ClusteringTask
# noinspection PyMissingOrEmptyDocstring,PyMethodMayBeStatic
class TestClustering:
task = ClusteringTask()
valid_df = json.dumps({
'A': {
'a': 50,
'b': 2,
'c': 45
},
'B': {
'a': 250,
'b': 5,
'c': 300
},
'C': {
'a': 55,
'b': 4,
'c': 60
}
})
def test_hclust_raises_with_invalid_param_1(self):
with pytest.raises(ValueError) as e:
options = {
'method': 'single',
'metric': 'euclidean',
'n_row_clusters': 2,
'n_col_clusters': 2
}
self.task.main(df='{//foo', cluster_algo='hclust', options=options)
assert 'parse string to data frame' in e
def test_hclust_raises_with_invalid_param_2(self):
with pytest.raises(ValueError) as e:
options = {
'method': 'abc',
'metric': 'euclidean',
'n_row_clusters': 2,
'n_col_clusters': 2
}
self.task.main(df=self.valid_df,
cluster_algo='hclust', options=options)
assert 'Invalid method' in e
def test_hclust_raises_with_invalid_param_3(self):
with pytest.raises(ValueError) as e:
options = {
'method': 'single',
'metric': 'abc',
'n_row_clusters': 2,
'n_col_clusters': 2
}
self.task.main(df=self.valid_df,
cluster_algo='hclust', options=options)
assert 'Invalid metric' in e
def test_hclust_raises_with_invalid_param_4(self):
with pytest.raises(ValueError) as e:
options = {
'method': 'single',
'metric': 'abc',
'n_row_clusters': 2,
}
self.task.main(df=self.valid_df,
cluster_algo='hclust', options=options)
assert 'mandatory parameters' in e
def test_hclust_returns_valid_result(self):
options = {
'method': 'single',
'metric': 'euclidean',
'n_row_clusters': 2,
'n_col_clusters': 2
}
result = self.task.main(df=self.valid_df,
cluster_algo='hclust', options=options)
assert 'row_names' in result
assert 'col_names' in result
assert 'row_cluster' in result
assert 'col_cluster' in result
assert ['a', 'c', 'b'] == result['row_names']
assert ['A', 'C', 'B'] == result['col_names']
assert [0, 0, 1] == result['row_cluster']
assert [0, 0, 1] == result['col_cluster']
def test_kmean_raises_with_invalid_param_1(self):
with pytest.raises(ValueError) as e:
options = {
'n_row_centroids': 2,
'n_col_centroids': 2
}
self.task.main(df='{//foo', cluster_algo='kmeans', options=options)
assert 'parse string to data frame' in e
def test_kmean_raises_with_invalid_param_2(self):
with pytest.raises(ValueError) as e:
options = {
'n_row_centroids': 2,
'n_col_centroids': 'abc'
}
self.task.main(df=self.valid_df,
cluster_algo='kmeans', options=options)
assert 'invalid' in e
def test_kmean_raises_with_invalid_param_3(self):
with pytest.raises(ValueError) as e:
options = {
'n_row_centroids': 2,
}
self.task.main(df=self.valid_df,
cluster_algo='kmeans', options=options)
assert 'mandatory parameters' in e
def test_kmean_returns_valid_result(self):
options = {
'n_row_centroids': 2,
'n_col_centroids': 2
}
result = self.task.main(df=self.valid_df,
cluster_algo='kmeans', options=options)
assert 'row_names' in result
assert 'col_names' in result
assert 'row_cluster' in result
assert 'col_cluster' in result
assert ['a', 'c', 'b'] == result['row_names']
assert ['A', 'C', 'B'] == result['col_names']
assert [0, 0, 1] == result['row_cluster']
assert [0, 0, 1] == result['col_cluster']
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment