from __future__ import annotations
import warnings
from collections import defaultdict
from typing import TYPE_CHECKING, Any, NoReturn
import numpy as np
import pandas as pd
from scipy.stats import shapiro, t
if TYPE_CHECKING:
from collections.abc import Callable
__eps = np.finfo(np.float32).eps
[docs]
def t_value(p: float, v: float) -> float:
r"""
Return the value on the x-axis if you plot the cumulative t-distribution with a fractional
area of `p` (p is therefore a fractional value between 0 and 1 on the y-axis) and `v` is the
degrees of freedom.
Examples
--------
Since the cumulative distribution passes symmetrically through the x-axis at 0.0 for any
number of degrees of freedom
>>> t_value(0.5, v)
0.0
Zero fractional area under the curve is always at :math:`-\infty`:
>>> t_value(0.0, v)
-Inf
100% fractional area is always at :math:`+\infty`:
>>> t_value(1.0, v)
+Inf
See also
--------
t_value_cdf: does the inverse of this function.
"""
return t.ppf(p, df=v)
[docs]
def t_value_cdf(z: float, v: float) -> float:
r"""
Return the fractional area under the cumulative t-distribution (y-axis value) at the t-value
`z` on the x-axis, with `v` degrees of freedom.
Examples
--------
The cumulative distribution is symmetric through the x-axis at 0.0 for any number of degrees
of freedom, so half of the area lies below zero:
>>> t_value_cdf(0.0, v)
0.5
Zero fractional area under the curve is at :math:`-\infty`:
>>> t_value_cdf(-np.inf, v)
0.0
100% fractional area is at :math:`+\infty`:
>>> t_value_cdf(np.inf, v)
1.0
See also
--------
t_value: does the inverse of this function.
"""
return t.cdf(z, df=v)
[docs]
def test_normality(x: np.ndarray | pd.Series) -> float:
"""
Check the p-value of the hypothesis that the data are from a normal distribution.
If the p-value is less than the chosen alpha level (e.g. 0.05 or 0.025), then there is
evidence that the data tested are NOT normally distributed.
On the other hand, if the p-value is greater than the chosen alpha level, then the null
hypothesis that the data came from a normally distributed population can not be rejected.
NOTE: it does not mean that the data are normally distributed, just that we have nothing better
to say about it. See the `Shapiro-Wilk test`_.
Implementation: Uses the Shapiro Wilk test directly taken from `scipy.stats.shapiro`_.
.. _Shapiro-Wilk test:
https://en.wikipedia.org/wiki/Shapiro-Wilk_test
.. _scipy.stats.shapiro:
https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.shapiro.html
"""
output = shapiro(x)
return output[1]
[docs]
def Sn(x: np.ndarray | pd.Series, constant: float = 1.1926) -> np.floating: # noqa: N802
"""
Compute a robust scale estimator. The Sn metric is an efficient alternative to MAD.
Parameters
----------
x : np.ndarray or pd.Series
A vector of values. NaN entries are ignored.
constant : float, optional
Multiplicative constant that makes the estimator consistent with iid values from a
Gaussian distribution with no outliers. Default is 1.1926.
Returns
-------
np.floating
A scalar value, the Sn estimate of spread. Returns NaN if all entries are missing,
and 0.0 when only a single non-missing value is supplied.
Notes
-----
Tested against once of the most reliable open-source packages, written by some of the
most respected names in the area of robust methods: [1]_ and [2]_.
Disadvantages of MAD:
* It does not have particularly high efficiency for data that is in fact normal (37%).
In comparison, the median has 64% efficiency for normal data.
* The MAD statistic also has an implicit assumption of symmetry. That is, it measures the
distance from a measure of central location (the median).
References
----------
.. [1] https://cran.r-project.org/web/packages/robustbase/
.. [2] Rousseeuw, Peter J.; Croux, Christophe (December 1993),
"Alternatives to the Median Absolute Deviation", Journal of the American Statistical
Association, American Statistical Association, 88 (424): 1273-1283,
https://dx.doi.org/10.2307/2291267
"""
arr = np.asarray(x, dtype=np.float64)
n = np.sum(~np.isnan(arr))
if n == 0:
return np.float64(np.nan)
elif n == 1:
return np.float64(0.0)
# Remove NaNs and compute all pairwise absolute differences via broadcasting
clean = arr[~np.isnan(arr)]
diffs = np.abs(clean[:, None] - clean[None, :])
medians = np.median(diffs, axis=1)
if n <= 9:
# Correction factors for n = 2 to 9:
correction = [0.743, 1.851, 0.954, 1.351, 0.993, 1.198, 1.005, 1.131][n - 2]
elif n % 2:
correction = n / (n - 0.9) # odd number of cases
else:
correction = 1.0
return constant * np.median(medians) * correction
def _contains_nan(a: np.ndarray, nan_policy: str = "propagate") -> tuple[bool, str]:
"""From scipy.stats.stats."""
policies = ["propagate", "raise", "omit"]
if nan_policy not in policies:
raise ValueError(f"nan_policy must be one of {{{', '.join(f'{s!r}' for s in policies)}}}")
try:
# Calling np.sum to avoid creating a huge array into memory
# e.g. np.isnan(a).any()
with np.errstate(invalid="ignore"):
contains_nan = np.isnan(np.sum(a))
except TypeError:
# This can happen when attempting to sum things which are not
# numbers (e.g. as in the function `mode`). Try an alternative method:
try:
contains_nan = any(np.isnan(v) for v in a.ravel())
except TypeError: # pragma: no cover
# Don't know what to do. Fall back to omitting nan values and
# issue a warning.
contains_nan = False
nan_policy = "omit"
warnings.warn(
"The input array could not be properly checked for nan values. nan values will be ignored.",
RuntimeWarning,
stacklevel=2,
)
if contains_nan and nan_policy == "raise":
raise ValueError("The input contains nan values")
return (contains_nan, nan_policy)
[docs]
def ttest_independent(sample_A: pd.Series, sample_B: pd.Series, conflevel: float = 0.995) -> dict:
"""Core calculation for a test of differences between the average of A and the average of B.
No checking of inputs.
Parameters
----------
sample_A : iterable
Vector of n_a measurements.
sample_B : iterable
Vector of n_b measurements.
conflevel : float
Value between 0 and 1 (closer to 1.0), that gives the level of confidence required for
the 2-sided test.
Returns
-------
dict
Outcomes from the statistical test.
"""
axis = 0
v1, v2 = sample_A.var(axis, ddof=1), sample_B.var(axis, ddof=1)
n_A, n_B = sample_A.shape[axis], sample_B.shape[axis]
m1, m2 = sample_A.mean(), sample_B.mean()
df = n_A + n_B - 2.0
ct = abs(t_value(p=(1 - conflevel) / 2.0, v=df))
d = m2 - m1
svar = ((n_A - 1) * v1 + (n_B - 1) * v2) / df
with np.errstate(divide="ignore", invalid="ignore"):
sd_z_variate = np.sqrt(svar * (np.divide(1.0, n_A) + np.divide(1.0, n_B)))
z_variate = np.divide(d, sd_z_variate)
confint_lo = d - ct * sd_z_variate
confint_hi = d + ct * sd_z_variate
return {
"Group A number": int(n_A),
"Group B number": int(n_B),
"Group A average": sample_A.mean(),
"Group B average": sample_B.mean(),
"z value": z_variate,
"ConfInt: Lo": confint_lo,
"ConfInt: Hi": confint_hi,
"p value": 2 * t_value_cdf(-np.abs(z_variate), df),
"Degrees of freedom": df,
"Pooled standard deviation": sd_z_variate,
}
[docs]
def ttest_independent_from_df(
df: pd.DataFrame, grouper_column: str, values_column: str, conflevel: float = 0.995,
) -> pd.DataFrame:
"""
Calculate the t-test for differences between two or more groups and returns a confidence
interval for the difference. The test is for UNPAIRED differences.
The dataframe `df` contains a `grouper_column` with 2 or more unique values (e.g. 'A' and 'B').
All unique values of the `grouper_column` are used, and t-tests are done between the values
in the `values_column`.
Args:
df (pd.DataFrame): Dataframe of the values and grouping variable.
grouper_column (str): Indicates which column will be grouped on.
values_column (str): Which column contains the numeric values to calculate the test on.
conflevel (float, optional): [description]. Defaults to 0.995.
Output: Dataframe with columns containing the statistical outputs of the t-test, including:
1. Group "A" name
2. Group "B" name
3. Group "A" mean
4. Group "B" mean
5. z-value for the difference between group "B" minus group "A"
6. p-value for this z-value
7. Confidence interval low value for difference between group "B" minus group "A"
8. Confidence interval high value for difference between group "B" minus group "A"
Example:
df : has 3 levels in the grouper variable; ['Marco', 'Pete', 'Sam']
Output will have 3 rows:
Group A name Group B name
Marco Pete
Marco Sam
Pete Sam
"""
data_subset = df[[grouper_column, values_column]].copy()
data_subset = data_subset.dropna()
groups = df[grouper_column].unique()
output = pd.DataFrame()
groups = list(groups)
while len(groups) > 0:
groupA_name = groups.pop(0)
for groupB_name in groups:
sample_A = data_subset[data_subset[grouper_column].eq(groupA_name)][values_column]
sample_B = data_subset[data_subset[grouper_column].eq(groupB_name)][values_column]
sample_A = sample_A.astype(np.float64)
sample_B = sample_B.astype(np.float64)
basic_stats = ttest_independent(sample_A, sample_B, conflevel)
basic_stats.update(
{
"Group A name": groupA_name,
"Group B name": groupB_name,
}
)
output = pd.concat([output, pd.DataFrame(basic_stats, index=[0])])
return output
[docs]
def ttest_paired(differences: pd.Series, conflevel: float = 0.995) -> dict:
"""Core calculation for a test of paired differences.
Parameters
----------
differences : pd.Series
The paired differences (e.g. ``sample_A - sample_B``) for which the test is run.
conflevel : float, optional
Value between 0 and 1 (closer to 1.0), that gives the level of confidence required for
the 2-sided test. Default is 0.995.
Returns
-------
dict
Outcomes from the statistical test.
"""
diff_mean = differences.mean()
diff_svar = differences.std(ddof=1)
dof = differences.shape[0] - 1 # n-1 d
# By the central limit theorem, the `differences` values should be normally distributed
# with average (central value) given by mean of group A values, minus mean of group B values.
# Scale factor is the standard deviations of `differences`: estimated, therefore t-distribution
ct = abs(t_value(p=(1 - conflevel) / 2.0, v=dof))
with np.errstate(divide="ignore", invalid="ignore"):
sd_z_variate = diff_svar * np.sqrt(np.divide(1.0, dof + 1))
z_variate = np.divide(diff_mean, sd_z_variate)
confint_lo = diff_mean - ct * sd_z_variate
confint_hi = diff_mean + ct * sd_z_variate
return {
"Differences mean": diff_mean,
"z value": z_variate,
"ConfInt: Lo": confint_lo,
"ConfInt: Hi": confint_hi,
"p value": 2 * t_value_cdf(-np.abs(z_variate), dof),
"Degrees of freedom": dof,
"Standard deviation": sd_z_variate,
}
[docs]
def ttest_paired_from_df(
df: pd.DataFrame, grouper_column: str, values_column: str, conflevel: float = 0.995,
) -> pd.DataFrame:
"""
Calculate the t-test for paired differences between two or more groups and returns a
confidence interval for the difference. The test is for PAIRED differences.
The differences is always defined as the A values minus the B values: after - before, or A - B.
The dataframe `df` contains a `grouper_column` with 2 or more unique values (e.g. 'A' and 'B').
All unique values of the `grouper_column` are used, and t-tests are done between the values
in the `values_column`.
When selecting the columns, the number of values per column must be the same.
Args:
df (pd.DataFrame): Dataframe of the values and grouping variable.
grouper_column (str): Indicates which column will be grouped on.
values_column (str): Which column contains the numeric values to calculate the test on.
conflevel (float, optional): [description]. Defaults to 0.995.
Output: Dataframe with columns containing the statistical outputs of the t-test, including:
1. Group A name
2. Group B name
3. Group A mean
4. Group B mean
5. Differences mean: average difference between the groups (not the same as the difference
of the averages from items 3 and 4 above)
6. z-value for the difference between group "B" minus group "A"
7. p-value for this z-value
8. Confidence interval low value for difference between group "B" minus group "A"
9. Confidence interval high value for difference between group "B" minus group "A"
"""
data_subset = df[[grouper_column, values_column]].copy()
data_subset = data_subset.dropna()
groups = df[grouper_column].unique()
output = pd.DataFrame()
groups = list(groups)
while len(groups) > 0:
groupA_name = groups.pop(0)
for groupB_name in groups:
sample_A = data_subset[data_subset[grouper_column].eq(groupA_name)][values_column]
sample_B = data_subset[data_subset[grouper_column].eq(groupB_name)][values_column]
sample_A = sample_A.astype(np.float64)
sample_B = sample_B.astype(np.float64)
assert sample_A.shape[0] == sample_B.shape[0]
differences = sample_A - sample_B.values # only the .values of one vector are needed!
basic_stats = ttest_paired(differences, conflevel)
basic_stats.update(
{
"Group A name": groupA_name,
"Group B name": groupB_name,
"Group A number": sample_A.shape[0],
"Group B number": sample_B.shape[0],
"Group A average": sample_A.mean(),
"Group B average": sample_B.mean(),
}
)
output = pd.concat([output, pd.DataFrame(basic_stats, index=[0])])
return output
[docs]
def confidence_interval(df: pd.DataFrame, column_name: str, conflevel: float = 0.95, style: str = "robust") -> tuple:
"""
Calculate the confidence interval, returned as a tuple, for the `column_name` (str) in the
dataframe `df`, for a given confidence level `conflevel` (default: 0.95).
`style`: ['robust'; 'regular']: indicates which style of estimates to use for the center and
spread. Default: 'robust'
Missing values are ignored.
"""
# TODO : http://www.rips-irsp.com/article/10.5334/irsp.82/
data = df[column_name]
n = data.count()
if style.lower() == "robust":
center = data.median()
spread = median_absolute_deviation(data.values, nan_policy="omit")
else:
center = data.mean()
spread = data.std()
c_t = t_value(1 - (1 - conflevel) / 2, n - 1)
return (center - c_t * spread / np.sqrt(n), center + c_t * spread / np.sqrt(n))
def _mad_1d(x: np.ndarray, center: Callable, nan_policy: str) -> float:
"""Taken from `scipy.stats.stats`.
Median absolute deviation for 1-d array x.
This is a helper function for `median_abs_deviation`; it assumes its
arguments have been validated already. In particular, x must be a
1-d numpy array, center must be callable, and if nan_policy is not
'propagate', it is assumed to be 'omit', because 'raise' is handled
in `median_abs_deviation`.
No warning is generated if x is empty or all nan.
"""
isnan = np.isnan(x)
if isnan.any():
if nan_policy == "propagate":
return np.nan
x = x[~isnan]
if x.size == 0:
# MAD of an empty array is nan.
return np.nan
# Edge cases have been handled, so do the basic MAD calculation.
med = center(x)
return np.median(np.abs(x - med))
[docs]
def summary_stats(x: np.ndarray | pd.Series, method: str = "robust") -> dict:
"""
Return summary statistics of the numeric values in vector `x`.
Arguments:
x (Numpy vector or Pandas series): A vector of univariate values to summarize.
Returns:
dict: a summary of the univariate vector. The following outputs are the most interesting:
"center": a measure of the center (average). If method is ``robust``, this is the
median.
"spread": a measure of the spread. If method is ``robust``, this is the Sn, a robust
spread estimate.
"""
if isinstance(x, pd.Series):
x = x.copy(deep=True).values
elif isinstance(x, np.ndarray):
x = x.ravel()
else:
raise TypeError("Expecting a NumPy vector or Pandas series.")
out = {}
out["mean"] = np.nanmean(x)
out["std_ddof0"] = np.nanstd(x, ddof=0)
out["std_ddof1"] = np.nanstd(x, ddof=1)
out["rsd_classical"] = out["std_ddof1"] / out["mean"]
(
out["percentile_05"],
out["percentile_25"],
out["median"],
out["percentile_75"],
out["percentile_95"],
) = np.nanpercentile(x, [5, 25, 50, 75, 95])
out["iqr"] = out["percentile_75"] - out["percentile_25"]
out["min"] = np.nanmin(x)
out["max"] = np.nanmax(x)
out["N_non_missing"] = np.sum(~np.isnan(x))
if method.lower() == "robust":
out["center"], out["spread"] = out["median"], Sn(x)
if ((out["max"] - out["min"]) > 0) and (out["spread"] == 0) and (out["N_non_missing"] > 0):
# Don't fully trust the Sn() yet. It works strangely on quantized data when there is
# little variation. Replace the RSD with the classically calculated version in this
# very specific case. This example shows it: [99, 95, 95, 100, 100, 100, 100, 95, 100,
# 100, 100, 100, 105, 105, 100, 95, 105, 100, 95, 100]
out["center"], out["spread"] = out["mean"], out["std_ddof1"]
else:
out["center"], out["spread"] = out["mean"], out["std_ddof1"]
out["rsd"] = out["spread"] / out["center"]
return out
[docs]
def detect_outliers_esd(
x: np.ndarray | pd.Series, algorithm: str = "esd", max_outliers_detected: int = 1, **kwargs
) -> tuple[list[int], defaultdict[Any, Any]]:
"""
Return a list of indexes of points in the vector `x` which are likely outliers.
A second output (can be ignored) contains the details of the values used to make the decision.
Arguments:
x {list, sequence, NumPy vector/array} -- [A sequence, list or vector which can be
unravelled.]
Keyword Arguments:
algorithm -- Two algorithms are possible to detect outliers: (default: "esd")
'esd': Generalized ESD Test for Outliers.
If `max_outliers_detected=1` this is essentially Grubb's test.
For more details, please see:
https://www.itl.nist.gov/div898/handbook/eda/section3/eda35h3.htm
'cc-robust': Build a robust control-chart for the sequence `x` and
points should lie outside the +/- 3 sigma limits are considered
outliers.
Not Implemented Yet: left here as an idea for the future, but not confirmed yet.
max_outliers_detected -- The maximum number of outliers that
should be detected, as required by the algorithms.
kwargs -- Algorithm dependent arguments. Defaults are shown here.
'esd':
'robust_variant' = True. Uses the median and MAD for the center
and the standard deviation respectively.
'alpha' = 0.05. The significance level of the testing.
"""
algorithm = algorithm.strip().lower()
x = pd.Series(x).reset_index(drop=True)
max_outliers_detected = int(max_outliers_detected)
if algorithm == "esd":
"""
https://www.itl.nist.gov/div898/handbook/eda/section3/eda35h3.htm
Note: the two-sided test is implemented here. p = 1-alpha/(2*(n-i+1))
"""
# 0. Default settings
robust_variant = bool(kwargs.get("robust_variant", True))
alpha = float(kwargs.get("alpha", 0.05))
assert alpha <= 1.0
assert max_outliers_detected <= len(x)
# 1. Run K-S test first to check normality
# 2. https://www.itl.nist.gov/div898/handbook/eda/section3/eda35h3.htm
x = x.copy(deep=True)
extra_out = defaultdict(list)
N = x.shape[0] - pd.isna(x).sum()
for k in range(max_outliers_detected):
i = k + 1
extra_out["i"].append(i)
if robust_variant:
variation = median_absolute_deviation(x)
R = ((x - x.median()) / variation).abs()
else:
variation = x.std()
R = ((x - x.mean()) / variation).abs()
dof = N - i - 1
p = 1 - alpha / (2 * (N - i + 1))
t_s = t_value(p, dof)
lambda_i = (N - i) * t_s / np.sqrt((dof + np.power(t_s, 2)) * (dof + 2))
extra_out["lambda"].append(lambda_i)
g = R.max()
s = g**2 * N * (2 - N) / (g**2 * N - (N - 1) ** 2) # Formula from R-function for the Grubb's calculation
p_value = 0 if s <= 0 else min(N * (1 - t_value_cdf(np.sqrt(s), N - 2)), 1)
R_i_idx = -1 if pd.isna(p_value) else R.idxmax()
extra_out["R_i_idx"].append(R_i_idx)
extra_out["R_i"].append(R.max())
extra_out["p-value"].append(p_value)
if variation > __eps:
# The variation, if zero or small, will fail to drop this index
x = x.drop(R_i_idx)
try:
cutoff_i = np.where(
np.array(extra_out["R_i"]) - np.array(extra_out["lambda"]) >= 0,
)[0][0]
except IndexError:
cutoff_i = -1
# The outlier indices are the points from the start of
# `extra_out['R_i_idx']`, up to, and including, the `cutoff_i` index
# in the list.
extra_out["cutoff"] = cutoff_i
outlier_index = extra_out["R_i_idx"][0 : (cutoff_i + 1)]
return outlier_index, extra_out
else:
return [], defaultdict(dict)
[docs]
def variance_decomposition(df: pd.DataFrame, measured: str, repeat: str) -> dict:
"""
Given a DataFrame `df` of raw data, and an indication of which column is the `measured` value
column, and which is the `repeat` indicator, it will calculate the within and between replicate
standard deviation.
Example
Two measurements on day 1 ``[101, 102]`` and two measurements on day 2 ``[94, 95]``. The
between-day variation can already be expected to be much greater than the within-day variation.
>>> df = pd.DataFrame(data={'Result': [101, 102, 94, 95], 'Repeat': [1, 1, 2, 2]})
Result Repeat
0 101 1
1 102 1
2 94 2
3 95 2
>>> output = within_between_standard_deviation(df, measured="Result", repeat="Repeat")
{'total_ms': 16.666667,
'total_dof': 3,
'within_ms': 0.5,
'within_stddev': 0.70711,
'within_dof': 2,
'between_ms': 49.0,
'between_stddev': 7.0,
'between_dof': 1}
Note
* SSQ = sum of squares
* DOF= degrees of freedom
* MS = mean square = (sum of squares) / (degrees of freedom) = SSQ / DOF = variance
"""
# Overall statistics:
total_ms = max(0, df[measured].var())
total_dof = max(0, df[measured].count() - 1)
# Within a group: calculate the standard deviation (of variance) of each group. If you take
# the average of those variances, pooled, you get an estimate of the within-group variance.
within_ms = 0.0
within_dof = 0
for _, group in df.groupby(repeat):
dof_group_i = max(0, group[measured].count() - 1)
within_dof += dof_group_i
# handling missing data makes for messier code
within_ms = np.nansum([within_ms, group[measured].var() * dof_group_i])
within_ms = 0 if within_dof == 0 else within_ms / within_dof
# Between groups: the ANOVA relationship is such that SSQ(total) = SSQ(between) + SSQ(within).
# Therefore the between-group statistics are found by differencing
between_dof = max(0, total_dof - within_dof)
between_ms = 0 if between_dof == 0 else max(0.0, (total_ms * total_dof - within_ms * within_dof) / between_dof)
return {
"total_ms": total_ms,
"total_dof": total_dof,
"within_ms": within_ms,
"within_stddev": np.sqrt(within_ms),
"within_dof": within_dof,
"between_ms": between_ms,
"between_stddev": np.sqrt(between_ms),
"between_dof": between_dof,
}
# ---------------------------------------------------------------------------
# Migration helpers - old names raise helpful errors
# ---------------------------------------------------------------------------
_RENAMED = {
"median_abs_deviation": "median_absolute_deviation",
"normality_check": "test_normality",
"within_between_standard_deviation": "variance_decomposition",
"outlier_detection_multiple": "detect_outliers_esd",
"ttest_difference_calculate": "ttest_independent",
"ttest_paired_difference_calculate": "ttest_paired",
"ttest_difference": "ttest_independent_from_df",
"ttest_paired_difference": "ttest_paired_from_df",
}
def __getattr__(name: str) -> NoReturn:
if name in _RENAMED:
new = _RENAMED[name]
raise AttributeError(
f"{name!r} has been renamed to {new!r}. "
f"Use: from process_improve.univariate.metrics import {new}"
)
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")