diff --git a/src/datajudge/constraints/stats.py b/src/datajudge/constraints/stats.py index 0ff3de4b..d3750072 100644 --- a/src/datajudge/constraints/stats.py +++ b/src/datajudge/constraints/stats.py @@ -133,3 +133,71 @@ def test(self, engine: sa.engine.Engine) -> TestResult: ) return TestResult.success() + + +class AndersonDarling2Sample(Constraint): + def __init__( + self, ref: DataReference, ref2: DataReference, significance_level: float = 0.05 + ): + self.significance_level = significance_level + super().__init__(ref, ref2=ref) + + @staticmethod + def approximate_critical_value( + size_sample1: int, size_sample2: int, significance_level: float + ) -> float: + """Approximate critical value for specific significance_level given sample sizes.""" + coefficient_map = { + 0.25: {"b0": 0.675, "b1": -0.245, "b2": -0.105}, + 0.1: {"b0": 1.281, "b1": 0.25, "b2": -0.305}, + 0.05: {"b0": 1.645, "b1": 0.678, "b2": -0.362}, + 0.025: {"b0": 1.96, "b1": 1.149, "b2": -0.391}, + 0.01: {"b0": 2.326, "b1": 1.822, "b2": -0.396}, + 0.005: {"b0": 2.573, "b1": 2.364, "b2": -0.345}, + 0.001: {"b0": 3.085, "b1": 3.615, "b2": -0.154}, + } + + if significance_level not in coefficient_map.keys(): + raise KeyError( + f"Significance-level {significance_level} not supported." + f" Please choose one of {coefficient_map.keys()}." + ) + + coefficients = coefficient_map[significance_level] + b0 = coefficients["b0"] + b1 = coefficients["b1"] + b2 = coefficients["b2"] + critical_value = b0 + b1 / math.sqrt(size_sample1) + b2 / size_sample1 + return critical_value + + @staticmethod + def compute_test_statistic(sum1, sum2, sample_size1, sample_size2): + sample_size = sample_size1 + sample_size2 + return ((sum1 * sample_size1) + (sum2 * sample_size2)) / sample_size + + def test(self, engine: sa.engine.Engine) -> TestResult: + sample_size1, sample_size1_selections = db_access.get_row_count( + engine, self.ref + ) + sample_size2, sample_size2_selections = db_access.get_row_count( + engine, self.ref2 + ) + sample_size = sample_size1 + sample_size2 + sum1, sum2, sum_selections = db_access.get_anderson_darling_sums( + engine, self.ref, self.ref2, sample_size + ) + test_statistic = self.compute_test_statistic( + sum1, sum2, sample_size1, sample_size2 + ) + critical_value = self.approximate_critical_value( + sample_size1, sample_size2, self.significance_level + ) + result = test_statistic <= critical_value + assertion_text = "" + if not result: + return TestResult.failure( + assertion_text, + self.get_description(), + sum_selections + sample_size1_selections + sample_size2_selections, + ) + return TestResult.success() diff --git a/src/datajudge/db_access.py b/src/datajudge/db_access.py index 8bc229ba..f51889ab 100644 --- a/src/datajudge/db_access.py +++ b/src/datajudge/db_access.py @@ -1031,6 +1031,72 @@ def _forward_filled_cdf_column(table, cdf_label, value_label, group_label): return filled_cross_cdf, cdf_label1, cdf_label2 +def get_anderson_darling_sums( + engine: sa.engine.Engine, + ref1: DataReference, + ref2: DataReference, + sample_size: int, +): + """Return per-sample inner sum term of k-sample Anderson Darling test statistic. + + For the first sample, this sum term equals + + .. math:: + + \\sum_{j=1}^{N-1} \\frac{(N F_1(j) - j n_1)^2}{j(N-j)} + + while for the second sample, this sum term equals: + + .. math:: + + \\sum_{j=1}^{N-1} \\frac{(N F_2(j) - j n_2)^2}{j(N-j)} + + where: + + * :math:`F_i` is the respective empirical cumulative distribution function + * :math:`n_i` is the respective sample size + """ + cdf_label = "cdf" + value_label = "val" + filled_cross_cdf_selection, cdf_label1, cdf_label2 = _cross_cdf_selection( + engine, ref1, ref2, cdf_label, value_label + ) + filled_cross_cdf_selection = filled_cross_cdf_selection.subquery() + + # Add a row number to the cross_cdf_selection. + row_number = "row_number" + selection = sa.select( + [ + sa.func.row_number() + .over( + order_by=filled_cross_cdf_selection.c[value_label], + ) + .label(row_number), + filled_cross_cdf_selection.c[value_label], + filled_cross_cdf_selection.c[cdf_label1], + filled_cross_cdf_selection.c[cdf_label2], + ] + ).subquery() + + def sum_selection(cdf_label): + return sa.select( + sa.func.sum( + (sample_size * selection.c[cdf_label] - selection.c[row_number]) + * (sample_size * selection.c[cdf_label] - selection.c[row_number]) + / (selection.c[row_number] * (sample_size - selection.c[row_number])) + ) + ) + + sum_selection1 = sum_selection(cdf_label1) + sum_selection2 = sum_selection(cdf_label2) + + with engine.connect() as connection: + sum1 = connection.execute(sum_selection1).scalar() + sum2 = connection.execute(sum_selection2).scalar() + + return sum1, sum2, [selection] + + def get_ks_2sample( engine: sa.engine.Engine, ref1: DataReference, diff --git a/src/datajudge/requirements.py b/src/datajudge/requirements.py index d86a9376..a724f2b4 100644 --- a/src/datajudge/requirements.py +++ b/src/datajudge/requirements.py @@ -28,6 +28,13 @@ T = TypeVar("T") +def _check_significance_level(significance_level: float): + if significance_level <= 0.0 or significance_level > 1.0: + raise ValueError( + "The requested significance level has to be in ``(0.0, 1.0]``. Default is 0.05." + ) + + class TableQualifier: def __init__(self, db_name: str, schema_name: str, table_name: str): self.db_name = db_name @@ -1330,8 +1337,8 @@ def add_ks_2sample_constraint( ): """ Apply the so-called two-sample Kolmogorov-Smirnov test to the distributions of the two given columns. - The constraint is fulfilled, when the resulting p-value of the test is higher than the significance level - (default is 0.05, i.e., 5%). + The constraint is fulfilled when the resulting p-value of the test is higher than the significance level + (default is 0.05, i.e. 5%). The signifance_level must be a value between 0.0 and 1.0. """ @@ -1340,13 +1347,27 @@ def add_ks_2sample_constraint( "Column names have to be given for this test's functionality." ) - if significance_level <= 0.0 or significance_level > 1.0: - raise ValueError( - "The requested significance level has to be in ``(0.0, 1.0]``. Default is 0.05." - ) + _check_significance_level(significance_level) ref = DataReference(self.data_source, [column1], condition=condition1) ref2 = DataReference(self.data_source2, [column2], condition=condition2) self._constraints.append( stats_constraints.KolmogorovSmirnov2Sample(ref, ref2, significance_level) ) + + def add_anderson_darling_2sample_constraint( + self, + column1: str, + column2: str, + condition1: Condition = None, + condition2: Condition = None, + significance_level: float = 0.05, + ): + """Do.""" + _check_significance_level(significance_level) + + ref = DataReference(self.data_source, [column1], condition=condition1) + ref2 = DataReference(self.data_source2, [column2], condition=condition2) + self._constraints.append( + stats_constraints.AndersonDarling2Sample(ref, ref2, significance_level) + ) diff --git a/tests/integration/test_stats.py b/tests/integration/test_stats.py index 30d1fb1e..d667a310 100644 --- a/tests/integration/test_stats.py +++ b/tests/integration/test_stats.py @@ -62,3 +62,16 @@ def test_ks_2sample_calculate_statistic(engine, random_normal_table, configurati assert ( abs(p_value - expected_p) <= 1e-05 ), f"The approx. p-value does not match: {expected_p} vs {p_value}" + + +def test_ad(engine, random_normal_table): + col_1 = "value_0_1" + col_2 = "value_005_1" + database, schema, table = random_normal_table + tds = TableDataSource(database, table, schema) + ref = DataReference(tds, columns=[col_1]) + ref2 = DataReference(tds, columns=[col_2]) + constraint = datajudge.constraints.stats.AndersonDarling2Sample( + ref, ref2, significance_level=0.05 + ) + constraint.test(engine)