main.py 4.19 KB
Newer Older
Sascha Herzinger's avatar
Sascha Herzinger committed
1
"""Module containing the Celery Task for the Correlation Analysis."""
Sascha Herzinger's avatar
Sascha Herzinger committed
2

3
import logging
4
from typing import List, TypeVar
5

Sascha Herzinger's avatar
Sascha Herzinger committed
6
import pandas as pd
7
8
import numpy as np
from scipy import stats
Sascha Herzinger's avatar
Sascha Herzinger committed
9

10
from fractalis.analytics.task import AnalyticTask
11
12
from fractalis.analytics.tasks.shared.utils import \
    apply_subsets, apply_categories
Sascha Herzinger's avatar
Sascha Herzinger committed
13

14

15
logger = logging.getLogger(__name__)
16
17
18
T = TypeVar('T')


19
class CorrelationTask(AnalyticTask):
20
    """Correlation Analysis Task implementing AnalyticsTask. This class is a
Sascha Herzinger's avatar
Sascha Herzinger committed
21
    submittable celery task."""
Sascha Herzinger's avatar
Sascha Herzinger committed
22
23
24

    name = 'compute-correlation'

25
26
27
    def main(self,
             x: pd.DataFrame,
             y: pd.DataFrame,
28
             id_filter: List[T],
29
             method: str,
30
             subsets: List[List[T]],
Sascha Herzinger's avatar
Sascha Herzinger committed
31
             categories: List[pd.DataFrame]) -> dict:
Sascha Herzinger's avatar
Sascha Herzinger committed
32
33
34
35
36
37
        """Compute correlation statistics for the given parameters.
        :param x: DataFrame containing x axis values.
        :param y: DataFrame containing y axis values.
        :param id_filter: If specified use only given ids during the analysis.
        :param method: pearson, spearman or kendall.
        :param subsets: List of lists of subset ids.
Sascha Herzinger's avatar
Sascha Herzinger committed
38
        :param categories: List of DataFrames that categorise the data points.
Sascha Herzinger's avatar
Sascha Herzinger committed
39
40
        :return: corr. coef., p-value and other useful values.
        """
41
42
43
44
45
46
        if len(x['feature'].unique().tolist()) != 1 \
                or len(y['feature'].unique().tolist()) != 1:
            error = "Input is invalid. Please make sure that the two " \
                    "variables to compare have exactly one dimension, each."
            logger.error(error)
            raise ValueError(error)
47
48
49
        if method not in ['pearson', 'spearman', 'kendall']:
            raise ValueError("Unknown method '{}'".format(method))

50
        df = self.merge_x_y(x, y)
51
52
53
        (x_label, y_label) = (df['feature_x'][0], df['feature_y'][0])
        if id_filter:
            df = df[df['id'].isin(id_filter)]
54
        df = apply_subsets(df=df, subsets=subsets)
Sascha Herzinger's avatar
Sascha Herzinger committed
55
        df = apply_categories(df=df, categories=categories)
56
        global_stats = self.compute_stats(df, method)
57
        subset_dfs = [df[df['subset'] == i] for i in range(len(subsets) or 1)]
58
        subset_stats = [self.compute_stats(subset_df, method)
59
60
61
62
63
                        for subset_df in subset_dfs]

        output = global_stats
        output['subsets'] = subset_stats
        output['method'] = method
64
        output['data'] = df.to_json(orient='records')
65
66
67
68
69
70
71
        output['x_label'] = x_label
        output['y_label'] = y_label

        return output

    @staticmethod
    def merge_x_y(x: pd.DataFrame, y: pd.DataFrame) -> pd.DataFrame:
Sascha Herzinger's avatar
Sascha Herzinger committed
72
        """Merge the x and y DataFrame and drop all rows containing NA.
73
        :param x: The x-axis values.
Sascha Herzinger's avatar
Sascha Herzinger committed
74
75
76
        :param y: The y-axis values.
        :return: The merged data frame.
        """
77
        df = x.merge(y, on=['id'], how='inner')
78
        df = df.dropna()
79
80
        if df.shape[0] == 0:
            raise ValueError("X and Y do not share any ids.")
81
        return df
82

83
    @staticmethod
84
    def compute_stats(df: pd.DataFrame, method: str) -> dict:
Sascha Herzinger's avatar
Sascha Herzinger committed
85
86
        """Compute correlation statistics for the given data and the given
        correlation method.
87
        :param df: The DataFrame containing our data.
Sascha Herzinger's avatar
Sascha Herzinger committed
88
        :param method: The method to use.
89
        :return: Several relevant statistics
Sascha Herzinger's avatar
Sascha Herzinger committed
90
        """
91
        df = df.drop_duplicates('id')
92
        df = df.dropna()
93
94
95
96
97
98
99
        if df.shape[0] < 2:
            return {
                'coef': float('nan'),
                'p_value': float('nan'),
                'slope': float('nan'),
                'intercept': float('nan')
            }
100
        if method == 'pearson':
101
            corr_coef, p_value = stats.pearsonr(df['value_x'], df['value_y'])
102
        elif method == 'spearman':
103
            corr_coef, p_value = stats.spearmanr(df['value_x'], df['value_y'])
104
        elif method == 'kendall':
105
            corr_coef, p_value = stats.kendalltau(df['value_x'], df['value_y'])
106
107
        else:
            raise ValueError("Unknown correlation method.")
108
        slope, intercept, *_ = np.polyfit(df['value_x'], df['value_y'], deg=1)
109
        return {
110
111
112
113
            'coef': corr_coef,
            'p_value': p_value,
            'slope': slope,
            'intercept': intercept,
114
        }