Source code for process_improve.univariate.metrics

from __future__ import annotations

import warnings
from collections import defaultdict
from typing import TYPE_CHECKING, Any, Literal, NoReturn

import numpy as np
import pandas as pd
from scipy.stats import shapiro, t
from sklearn.utils import Bunch

if TYPE_CHECKING:
    from collections.abc import Callable

__eps = np.finfo(np.float32).eps


[docs] def t_value(p: float, v: float) -> float: r""" Return the value on the x-axis if you plot the cumulative t-distribution with a fractional area of `p` (p is therefore a fractional value between 0 and 1 on the y-axis) and `v` is the degrees of freedom. Examples -------- Since the cumulative distribution passes symmetrically through the x-axis at 0.0 for any number of degrees of freedom >>> t_value(0.5, v) 0.0 Zero fractional area under the curve is always at :math:`-\infty`: >>> t_value(0.0, v) -Inf 100% fractional area is always at :math:`+\infty`: >>> t_value(1.0, v) +Inf See also -------- t_value_cdf: does the inverse of this function. """ return t.ppf(p, df=v)
[docs] def t_value_cdf(z: float, v: float) -> float: r""" Return the fractional area under the cumulative t-distribution (y-axis value) at the t-value `z` on the x-axis, with `v` degrees of freedom. Examples -------- The cumulative distribution is symmetric through the x-axis at 0.0 for any number of degrees of freedom, so half of the area lies below zero: >>> t_value_cdf(0.0, v) 0.5 Zero fractional area under the curve is at :math:`-\infty`: >>> t_value_cdf(-np.inf, v) 0.0 100% fractional area is at :math:`+\infty`: >>> t_value_cdf(np.inf, v) 1.0 See also -------- t_value: does the inverse of this function. """ return t.cdf(z, df=v)
[docs] def test_normality(x: np.ndarray | pd.Series) -> float: """ Check the p-value of the hypothesis that the data are from a normal distribution. If the p-value is less than the chosen alpha level (e.g. 0.05 or 0.025), then there is evidence that the data tested are NOT normally distributed. On the other hand, if the p-value is greater than the chosen alpha level, then the null hypothesis that the data came from a normally distributed population can not be rejected. NOTE: it does not mean that the data are normally distributed, just that we have nothing better to say about it. See the `Shapiro-Wilk test`_. Implementation: Uses the Shapiro Wilk test directly taken from `scipy.stats.shapiro`_. .. _Shapiro-Wilk test: https://en.wikipedia.org/wiki/Shapiro-Wilk_test .. _scipy.stats.shapiro: https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.shapiro.html """ output = shapiro(x) return output[1]
[docs] def Sn(x: np.ndarray | pd.Series, constant: float = 1.1926) -> np.floating: # noqa: N802 """ Compute a robust scale estimator. The Sn metric is an efficient alternative to MAD. Parameters ---------- x : np.ndarray or pd.Series A vector of values. NaN entries are ignored. constant : float, optional Multiplicative constant that makes the estimator consistent with iid values from a Gaussian distribution with no outliers. Default is 1.1926. Returns ------- np.floating A scalar value, the Sn estimate of spread. Returns NaN if all entries are missing, and 0.0 when only a single non-missing value is supplied. Notes ----- Tested against once of the most reliable open-source packages, written by some of the most respected names in the area of robust methods: [1]_ and [2]_. Disadvantages of MAD: * It does not have particularly high efficiency for data that is in fact normal (37%). In comparison, the median has 64% efficiency for normal data. * The MAD statistic also has an implicit assumption of symmetry. That is, it measures the distance from a measure of central location (the median). References ---------- .. [1] https://cran.r-project.org/web/packages/robustbase/ .. [2] Rousseeuw, Peter J.; Croux, Christophe (December 1993), "Alternatives to the Median Absolute Deviation", Journal of the American Statistical Association, American Statistical Association, 88 (424): 1273-1283, https://dx.doi.org/10.2307/2291267 """ arr = np.asarray(x, dtype=np.float64) n = np.sum(~np.isnan(arr)) if n == 0: return np.float64(np.nan) elif n == 1: return np.float64(0.0) # Remove NaNs and compute all pairwise absolute differences via broadcasting clean = arr[~np.isnan(arr)] diffs = np.abs(clean[:, None] - clean[None, :]) medians = np.median(diffs, axis=1) if n <= 9: # Correction factors for n = 2 to 9: correction = [0.743, 1.851, 0.954, 1.351, 0.993, 1.198, 1.005, 1.131][n - 2] elif n % 2: correction = n / (n - 0.9) # odd number of cases else: correction = 1.0 return constant * np.median(medians) * correction
def _contains_nan(a: np.ndarray, nan_policy: str = "propagate") -> tuple[bool, str]: """From scipy.stats.stats.""" policies = ["propagate", "raise", "omit"] if nan_policy not in policies: raise ValueError(f"nan_policy must be one of {{{', '.join(f'{s!r}' for s in policies)}}}") try: # Calling np.sum to avoid creating a huge array into memory # e.g. np.isnan(a).any() with np.errstate(invalid="ignore"): contains_nan = np.isnan(np.sum(a)) except TypeError: # This can happen when attempting to sum things which are not # numbers (e.g. as in the function `mode`). Try an alternative method: try: contains_nan = any(np.isnan(v) for v in a.ravel()) except TypeError: # pragma: no cover # Don't know what to do. Fall back to omitting nan values and # issue a warning. contains_nan = False nan_policy = "omit" warnings.warn( "The input array could not be properly checked for nan values. nan values will be ignored.", RuntimeWarning, stacklevel=2, ) if contains_nan and nan_policy == "raise": raise ValueError("The input contains nan values") return (contains_nan, nan_policy)
[docs] def ttest_independent(sample_A: pd.Series, sample_B: pd.Series, conflevel: float = 0.995) -> dict: """Core calculation for a test of differences between the average of A and the average of B. No checking of inputs. Parameters ---------- sample_A : iterable Vector of n_a measurements. sample_B : iterable Vector of n_b measurements. conflevel : float Value between 0 and 1 (closer to 1.0), that gives the level of confidence required for the 2-sided test. Returns ------- dict Outcomes from the statistical test. """ axis: Literal[0] = 0 v1, v2 = sample_A.var(axis=axis, ddof=1), sample_B.var(axis=axis, ddof=1) n_A, n_B = sample_A.shape[axis], sample_B.shape[axis] m1, m2 = sample_A.mean(), sample_B.mean() df = n_A + n_B - 2.0 ct = abs(t_value(p=(1 - conflevel) / 2.0, v=df)) d = m2 - m1 svar = ((n_A - 1) * v1 + (n_B - 1) * v2) / df with np.errstate(divide="ignore", invalid="ignore"): sd_z_variate = np.sqrt(svar * (np.divide(1.0, n_A) + np.divide(1.0, n_B))) z_variate = np.divide(d, sd_z_variate) confint_lo = d - ct * sd_z_variate confint_hi = d + ct * sd_z_variate return { "Group A number": int(n_A), "Group B number": int(n_B), "Group A average": sample_A.mean(), "Group B average": sample_B.mean(), "z value": z_variate, "ConfInt: Lo": confint_lo, "ConfInt: Hi": confint_hi, "p value": 2 * t_value_cdf(-np.abs(z_variate), df), "Degrees of freedom": df, "Pooled standard deviation": sd_z_variate, }
[docs] def ttest_independent_from_df( df: pd.DataFrame, grouper_column: str, values_column: str, conflevel: float = 0.995, ) -> pd.DataFrame: """ Calculate the t-test for differences between two or more groups and returns a confidence interval for the difference. The test is for UNPAIRED differences. The dataframe `df` contains a `grouper_column` with 2 or more unique values (e.g. 'A' and 'B'). All unique values of the `grouper_column` are used, and t-tests are done between the values in the `values_column`. Args: df (pd.DataFrame): Dataframe of the values and grouping variable. grouper_column (str): Indicates which column will be grouped on. values_column (str): Which column contains the numeric values to calculate the test on. conflevel (float, optional): [description]. Defaults to 0.995. Output: Dataframe with columns containing the statistical outputs of the t-test, including: 1. Group "A" name 2. Group "B" name 3. Group "A" mean 4. Group "B" mean 5. z-value for the difference between group "B" minus group "A" 6. p-value for this z-value 7. Confidence interval low value for difference between group "B" minus group "A" 8. Confidence interval high value for difference between group "B" minus group "A" Example: df : has 3 levels in the grouper variable; ['Marco', 'Pete', 'Sam'] Output will have 3 rows: Group A name Group B name Marco Pete Marco Sam Pete Sam """ data_subset = df[[grouper_column, values_column]].copy() data_subset = data_subset.dropna() output = pd.DataFrame() groups = list(df[grouper_column].unique()) while len(groups) > 0: groupA_name = groups.pop(0) for groupB_name in groups: sample_A = data_subset[data_subset[grouper_column].eq(groupA_name)][values_column] sample_B = data_subset[data_subset[grouper_column].eq(groupB_name)][values_column] sample_A = sample_A.astype(np.float64) sample_B = sample_B.astype(np.float64) basic_stats = ttest_independent(sample_A, sample_B, conflevel) basic_stats.update( { "Group A name": groupA_name, "Group B name": groupB_name, } ) output = pd.concat([output, pd.DataFrame(basic_stats, index=[0])]) return output
[docs] def ttest_paired(differences: pd.Series, conflevel: float = 0.995) -> dict: """Core calculation for a test of paired differences. Parameters ---------- differences : pd.Series The paired differences (e.g. ``sample_A - sample_B``) for which the test is run. conflevel : float, optional Value between 0 and 1 (closer to 1.0), that gives the level of confidence required for the 2-sided test. Default is 0.995. Returns ------- dict Outcomes from the statistical test, with keys: - ``"Differences mean"``: the mean of the input ``differences``. - ``"z value"``: the test statistic (mean divided by its standard error). - ``"ConfInt: Lo"``, ``"ConfInt: Hi"``: lower and upper bounds of the two-sided confidence interval around the mean difference at the requested ``conflevel``. - ``"p value"``: two-sided p-value from the t-distribution. - ``"Degrees of freedom"``: ``n - 1`` for ``n`` paired observations. - ``"Standard deviation"``: the **standard error of the mean difference**, i.e. ``sample_std / sqrt(n)`` (the scale used to build the confidence interval), NOT the sample standard deviation of ``differences``. """ # A paired t-test needs at least 2 differences (so dof >= 1 and the # 1/n term inside sqrt is finite). SEC-24 (#273). if differences.shape[0] < 2: raise ValueError( f"ttest_paired requires at least 2 paired observations; " f"got {differences.shape[0]}." ) diff_mean = differences.mean() diff_svar = differences.std(ddof=1) dof = differences.shape[0] - 1 # n-1 d # By the central limit theorem, the `differences` values should be normally distributed # with average (central value) given by mean of group A values, minus mean of group B values. # Scale factor is the standard deviations of `differences`: estimated, therefore t-distribution ct = abs(t_value(p=(1 - conflevel) / 2.0, v=dof)) with np.errstate(divide="ignore", invalid="ignore"): sd_z_variate = diff_svar * np.sqrt(np.divide(1.0, dof + 1)) z_variate = np.divide(diff_mean, sd_z_variate) confint_lo = diff_mean - ct * sd_z_variate confint_hi = diff_mean + ct * sd_z_variate return { "Differences mean": diff_mean, "z value": z_variate, "ConfInt: Lo": confint_lo, "ConfInt: Hi": confint_hi, "p value": 2 * t_value_cdf(-np.abs(z_variate), dof), "Degrees of freedom": dof, "Standard deviation": sd_z_variate, }
[docs] def ttest_paired_from_df( df: pd.DataFrame, grouper_column: str, values_column: str, conflevel: float = 0.995, ) -> pd.DataFrame: """ Calculate the t-test for paired differences between two or more groups and returns a confidence interval for the difference. The test is for PAIRED differences. The differences is always defined as the A values minus the B values: after - before, or A - B. The dataframe `df` contains a `grouper_column` with 2 or more unique values (e.g. 'A' and 'B'). All unique values of the `grouper_column` are used, and t-tests are done between the values in the `values_column`. When selecting the columns, the number of values per column must be the same. Args: df (pd.DataFrame): Dataframe of the values and grouping variable. grouper_column (str): Indicates which column will be grouped on. values_column (str): Which column contains the numeric values to calculate the test on. conflevel (float, optional): [description]. Defaults to 0.995. Output: Dataframe with columns containing the statistical outputs of the t-test, including: 1. Group A name 2. Group B name 3. Group A mean 4. Group B mean 5. Differences mean: average difference between the groups (not the same as the difference of the averages from items 3 and 4 above) 6. z-value for the difference between group "B" minus group "A" 7. p-value for this z-value 8. Confidence interval low value for difference between group "B" minus group "A" 9. Confidence interval high value for difference between group "B" minus group "A" """ data_subset = df[[grouper_column, values_column]].copy() data_subset = data_subset.dropna() output = pd.DataFrame() groups = list(df[grouper_column].unique()) while len(groups) > 0: groupA_name = groups.pop(0) for groupB_name in groups: sample_A = data_subset[data_subset[grouper_column].eq(groupA_name)][values_column] sample_B = data_subset[data_subset[grouper_column].eq(groupB_name)][values_column] sample_A = sample_A.astype(np.float64) sample_B = sample_B.astype(np.float64) if sample_A.shape[0] != sample_B.shape[0]: raise ValueError( "Paired t-test requires both groups to have the same number of samples; " f"got {sample_A.shape[0]} and {sample_B.shape[0]}." ) differences = sample_A - sample_B.to_numpy() # only the values of one vector are needed! basic_stats = ttest_paired(differences, conflevel) basic_stats.update( { "Group A name": groupA_name, "Group B name": groupB_name, "Group A number": sample_A.shape[0], "Group B number": sample_B.shape[0], "Group A average": sample_A.mean(), "Group B average": sample_B.mean(), } ) output = pd.concat([output, pd.DataFrame(basic_stats, index=[0])]) return output
[docs] def confidence_interval(df: pd.DataFrame, column_name: str, conflevel: float = 0.95, style: str = "robust") -> tuple: """ Calculate the confidence interval, returned as a tuple, for the `column_name` (str) in the dataframe `df`, for a given confidence level `conflevel` (default: 0.95). `style`: ['robust'; 'regular']: indicates which style of estimates to use for the center and spread. Default: 'robust' Missing values are ignored. """ # TODO : http://www.rips-irsp.com/article/10.5334/irsp.82/ data = df[column_name] n = data.count() # A t-CI needs at least 2 non-missing observations to compute a # spread and (n - 1) > 0 degrees of freedom. ``t.ppf(., -1)`` / # ``spread / sqrt(0)`` silently yielded NaN / inf in earlier # versions; reject up front. SEC-24 (#273). if n < 2: raise ValueError( f"confidence_interval requires at least 2 non-missing values " f"in column {column_name!r}; got {n}." ) if style.lower() == "robust": center = data.median() spread = median_absolute_deviation(data.to_numpy(), nan_policy="omit") else: center = data.mean() spread = data.std() c_t = t_value(1 - (1 - conflevel) / 2, n - 1) return (center - c_t * spread / np.sqrt(n), center + c_t * spread / np.sqrt(n))
def _mad_1d(x: np.ndarray, center: Callable, nan_policy: str) -> float: """Taken from `scipy.stats.stats`. Median absolute deviation for 1-d array x. This is a helper function for `median_abs_deviation`; it assumes its arguments have been validated already. In particular, x must be a 1-d numpy array, center must be callable, and if nan_policy is not 'propagate', it is assumed to be 'omit', because 'raise' is handled in `median_abs_deviation`. No warning is generated if x is empty or all nan. """ isnan = np.isnan(x) if isnan.any(): if nan_policy == "propagate": return np.nan x = x[~isnan] if x.size == 0: # MAD of an empty array is nan. return np.nan # Edge cases have been handled, so do the basic MAD calculation. med = center(x) return np.median(np.abs(x - med))
[docs] def median_absolute_deviation( x: np.ndarray, axis: int = 0, center: Callable = np.median, scale: str | float = "normal", nan_policy: str = "omit", ) -> np.ndarray | float: r""" Taken from `scipy.stats.stats`: we want the same functionality, but with a slightly different default function signature. * `scale='normal'` instead of `scale=1.0`. * `nan_policy='omit'` instead of `nan_policy='propogate'` Compute the median absolute deviation of the data along the given axis. The median absolute deviation (MAD_) computes the median over the absolute deviations from the median. It is a measure of dispersion similar to the standard deviation but more `robust to outliers`_. The MAD of an empty array is ``np.nan``. Parameters ---------- x : array_like Input array or object that can be converted to an array. axis : int or None, optional Axis along which the range is computed. Default is 0. Axis == None will not be accepted anymore. center : callable, optional A function that will return the central value. The default is to use np.median. Any user defined function used will need to have the function signature ``func(arr, axis)``. scale : scalar or str, optional The numerical value of scale will be divided out of the final result. The default is ``"normal"``, which results in `scale` being the inverse of the standard normal quantile function at 0.75, which is approximately 0.67449 (so the returned value is consistent with the standard deviation for normally distributed data). Any positive float may also be passed. Array-like scale is also allowed, as long as it broadcasts correctly to the output such that ``out / scale`` is a valid operation. The output dimensions depend on the input array, `x`, and the `axis` argument. nan_policy : {'propagate', 'raise', 'omit'}, optional Defines how to handle when input contains nan. The following options are available (default is 'omit'): * 'propagate': returns nan * 'raise': throws an error * 'omit': performs the calculations ignoring nan values Returns ------- mad : scalar or ndarray If the input contains integers or floats of smaller precision than ``np.float64``, then the output data-type is ``np.float64``. Otherwise, the output data-type is the same as that of the input. See Also -------- numpy.std, numpy.var, numpy.median, scipy.stats.iqr, scipy.stats.tmean, scipy.stats.tstd, scipy.stats.tvar Notes ----- The `center` argument only affects the calculation of the central value around which the MAD is calculated. That is, passing in ``center=np.mean`` will calculate the MAD around the mean - it will not calculate the *mean* absolute deviation. The input array may contain `inf`, but if `center` returns `inf`, the corresponding MAD for that data will be `nan`. References ---------- .. _MAD: "Median absolute deviation", https://en.wikipedia.org/wiki/Median_absolute_deviation .. _robust to outliers: "Robust measures of scale", https://en.wikipedia.org/wiki/Robust_measures_of_scale Examples -------- When comparing the behavior of `median_abs_deviation` with ``np.std``, the latter is affected when we change a single value of an array to have an outlier value while the MAD hardly changes: >>> from scipy import stats >>> x = stats.norm.rvs(size=100, scale=1, random_state=123456) >>> x.std() 0.9973906394005013 >>> stats.median_abs_deviation(x) 0.82832610097857 >>> x[0] = 345.6 >>> x.std() 34.42304872314415 >>> stats.median_abs_deviation(x) 0.8323442311590675 Axis handling example: >>> x = np.array([[10, 7, 4], [3, 2, 1]]) >>> x array([[10, 7, 4], [ 3, 2, 1]]) >>> stats.median_abs_deviation(x) array([3.5, 2.5, 1.5]) >>> stats.median_abs_deviation(x, axis=None) <-- syntax of `axis=None` will be depricated 2.0 Scale normal example: >>> x = stats.norm.rvs(size=1000000, scale=2, random_state=123456) >>> stats.median_abs_deviation(x) 1.3487398527041636 >>> stats.median_abs_deviation(x, scale='normal') 1.9996446978061115 """ if axis is None: raise ValueError("axis=None is now deprecated. Unravel the array.") if not callable(center): raise TypeError(f"The argument 'center' must be callable. The given value {center!r} is not callable.") # An error may be raised here, so fail-fast, before doing lengthy # computations, even though `scale` is not used until later if isinstance(scale, str): if scale.lower() == "normal": scale = 0.6744897501960817 # special.ndtri(0.75) else: raise ValueError(f"{scale} is not a valid scale value.") x = np.asarray(x) # Consistent with `np.var` and `np.std`. if not x.size: nan_shape = tuple(item for i, item in enumerate(x.shape) if i != axis) if nan_shape == (): # Return nan, not array(nan) return np.nan return np.full(nan_shape, np.nan) # pragma: no cover contains_nan, nan_policy = _contains_nan(x, nan_policy) if contains_nan: mad = np.apply_along_axis(_mad_1d, axis, x, center, nan_policy) else: # Wrap the call to center() in expand_dims() so it acts like # keepdims=True was used. med = np.expand_dims(center(x, axis=axis), axis) mad = np.median(np.abs(x - med), axis=axis) return mad / scale
[docs] def biweight_midvariance(x: np.ndarray | pd.Series, nan_policy: str = "omit") -> float: """Return the Mosteller-Tukey robust scale (biweight midvariance) of ``x``. The biweight midvariance is a robust, highly efficient estimator of the variance: it down-weights observations far from the median and ignores gross outliers entirely. Parameters ---------- x : np.ndarray or pd.Series One-dimensional sample of numeric values. nan_policy : {"omit", "propagate"}, optional ``"omit"`` (default) drops missing values; ``"propagate"`` returns ``nan`` if any value is missing. Returns ------- float The robust variance estimate. Returns ``0.0`` when the MAD is zero (e.g. a constant sample), and ``nan`` for an empty sample. References ---------- Mosteller and Tukey, *Data Analysis and Regression*, pp. 207-208, 1977. """ a = np.asarray(x, dtype=float).ravel() isnan = np.isnan(a) if isnan.any(): if nan_policy == "propagate": return float("nan") a = a[~isnan] n = a.size if n == 0: return float("nan") location = np.median(a) spread_mad = np.median(np.abs(a - location)) if spread_mad == 0: return 0.0 ui = (a - location) / (6.0 * spread_mad) valid = ui**2 <= 1.0 a_valid, ui_valid = a[valid], ui[valid] numerator = (a_valid - location) ** 2 * (1 - ui_valid**2) ** 4 denominator = (1 - ui_valid**2) * (1 - 5 * ui_valid**2) return float(n * numerator.sum() / denominator.sum() ** 2)
[docs] def holm_bonferroni(p_values: np.ndarray | pd.Series | list, alpha: float = 0.05) -> Bunch: """Holm-Bonferroni step-down correction for multiple comparisons. Holm's method controls the family-wise error rate while being uniformly more powerful than the plain Bonferroni correction. It is the recommended post-hoc correction for a family of pairwise comparisons. Parameters ---------- p_values : array-like The raw (uncorrected) p-values of the individual comparisons. alpha : float, optional Family-wise significance level, by default 0.05. Returns ------- sklearn.utils.Bunch A bunch with, in the same order as the input: * ``p_adjusted``: the Holm-adjusted p-values. * ``reject``: boolean array, ``True`` where the null hypothesis is rejected at level ``alpha``. * ``alpha``: the family-wise level used. References ---------- Holm, "A simple sequentially rejective multiple test procedure", Scandinavian Journal of Statistics, 6, 65-70, 1979. """ p = np.asarray(p_values, dtype=float).ravel() m = p.size if m == 0: return Bunch(p_adjusted=np.array([]), reject=np.array([], dtype=bool), alpha=alpha) order = np.argsort(p) p_sorted = p[order] # Step-down weights m, m-1, ..., 1; the running max enforces monotonicity. weights = m - np.arange(m) adjusted_sorted = np.minimum(np.maximum.accumulate(weights * p_sorted), 1.0) p_adjusted = np.empty(m) p_adjusted[order] = adjusted_sorted return Bunch(p_adjusted=p_adjusted, reject=p_adjusted <= alpha, alpha=alpha)
[docs] def summary_stats(x: np.ndarray | pd.Series, method: str = "robust") -> dict: """ Return summary statistics of the numeric values in vector ``x``. Parameters ---------- x : numpy.ndarray or pandas.Series A vector of univariate values to summarize. method : str, optional If ``"robust"`` (the default), the reported center is the median and the spread is the Sn robust estimate; otherwise the mean and the sample standard deviation are used. Returns ------- dict A summary of the univariate vector. The most useful keys are ``"center"`` (a measure of the center, e.g. the median for the robust method) and ``"spread"`` (a measure of the spread, e.g. the Sn robust estimate for the robust method). """ if isinstance(x, pd.Series): values = x.copy(deep=True).to_numpy() elif isinstance(x, np.ndarray): values = x.ravel() else: raise TypeError("Expecting a NumPy vector or Pandas series.") x = values out = {} out["mean"] = np.nanmean(x) out["std_ddof0"] = np.nanstd(x, ddof=0) out["std_ddof1"] = np.nanstd(x, ddof=1) out["rsd_classical"] = out["std_ddof1"] / out["mean"] ( out["percentile_05"], out["percentile_25"], out["median"], out["percentile_75"], out["percentile_95"], ) = np.nanpercentile(x, [5, 25, 50, 75, 95]) out["iqr"] = out["percentile_75"] - out["percentile_25"] out["min"] = np.nanmin(x) out["max"] = np.nanmax(x) out["N_non_missing"] = np.sum(~np.isnan(x)) if method.lower() == "robust": out["center"], out["spread"] = out["median"], Sn(x) if ((out["max"] - out["min"]) > 0) and (out["spread"] == 0) and (out["N_non_missing"] > 0): # Don't fully trust the Sn() yet. It works strangely on quantized data when there is # little variation. Replace the RSD with the classically calculated version in this # very specific case. This example shows it: [99, 95, 95, 100, 100, 100, 100, 95, 100, # 100, 100, 100, 105, 105, 100, 95, 105, 100, 95, 100] out["center"], out["spread"] = out["mean"], out["std_ddof1"] else: out["center"], out["spread"] = out["mean"], out["std_ddof1"] out["rsd"] = out["spread"] / out["center"] return out
[docs] def detect_outliers_esd( x: np.ndarray | pd.Series, algorithm: str = "esd", max_outliers_detected: int = 1, **kwargs ) -> tuple[list[int], defaultdict[Any, Any]]: """ Return a list of indexes of points in the vector `x` which are likely outliers. A second output (can be ignored) contains the details of the values used to make the decision. Arguments: x {list, sequence, NumPy vector/array} -- [A sequence, list or vector which can be unravelled.] Keyword Arguments: algorithm -- Two algorithms are possible to detect outliers: (default: "esd") 'esd': Generalized ESD Test for Outliers. If `max_outliers_detected=1` this is essentially Grubb's test. For more details, please see: https://www.itl.nist.gov/div898/handbook/eda/section3/eda35h3.htm 'cc-robust': Build a robust control-chart for the sequence `x` and points should lie outside the +/- 3 sigma limits are considered outliers. Not Implemented Yet: left here as an idea for the future, but not confirmed yet. max_outliers_detected -- The maximum number of outliers that should be detected, as required by the algorithms. kwargs -- Algorithm dependent arguments. Defaults are shown here. 'esd': 'robust_variant' = True. Uses the median and MAD for the center and the standard deviation respectively. 'alpha' = 0.05. The significance level of the testing. """ algorithm = algorithm.strip().lower() x = pd.Series(x).reset_index(drop=True) max_outliers_detected = int(max_outliers_detected) if algorithm == "esd": """ https://www.itl.nist.gov/div898/handbook/eda/section3/eda35h3.htm Note: the two-sided test is implemented here. p = 1-alpha/(2*(n-i+1)) """ # 0. Default settings robust_variant = bool(kwargs.get("robust_variant", True)) alpha = float(kwargs.get("alpha", 0.05)) if alpha > 1.0: raise ValueError(f"alpha must be <= 1.0, got {alpha}.") if max_outliers_detected > len(x): raise ValueError( f"max_outliers_detected ({max_outliers_detected}) cannot exceed the sample size ({len(x)})." ) # 1. Run K-S test first to check normality # 2. https://www.itl.nist.gov/div898/handbook/eda/section3/eda35h3.htm x = x.copy(deep=True) extra_out: defaultdict[str, list[Any]] = defaultdict(list) N = x.shape[0] - pd.isna(x).sum() for k in range(max_outliers_detected): i = k + 1 extra_out["i"].append(i) if robust_variant: variation = median_absolute_deviation(x.to_numpy()) R = ((x - x.median()) / variation).abs() else: variation = x.std() R = ((x - x.mean()) / variation).abs() dof = N - i - 1 p = 1 - alpha / (2 * (N - i + 1)) t_s = t_value(p, dof) lambda_i = (N - i) * t_s / np.sqrt((dof + np.power(t_s, 2)) * (dof + 2)) extra_out["lambda"].append(lambda_i) g = R.max() s = g**2 * N * (2 - N) / (g**2 * N - (N - 1) ** 2) # Formula from R-function for the Grubb's calculation p_value = 0 if s <= 0 else min(N * (1 - t_value_cdf(np.sqrt(s), N - 2)), 1) R_i_idx = -1 if pd.isna(p_value) else R.idxmax() extra_out["R_i_idx"].append(R_i_idx) extra_out["R_i"].append(R.max()) extra_out["p-value"].append(p_value) if variation > __eps: # The variation, if zero or small, will fail to drop this index x = x.drop(R_i_idx) try: cutoff_i = np.where( np.array(extra_out["R_i"]) - np.array(extra_out["lambda"]) >= 0, )[0][0] except IndexError: cutoff_i = -1 # The outlier indices are the points from the start of # `extra_out['R_i_idx']`, up to, and including, the `cutoff_i` index # in the list. extra_out["cutoff"] = cutoff_i outlier_index = extra_out["R_i_idx"][0 : (cutoff_i + 1)] return outlier_index, extra_out else: return [], defaultdict(dict)
[docs] def tietjen_moore_test( # noqa: PLR0913 x: np.ndarray | pd.Series, n_outliers: int, *, two_sided: bool = True, alpha: float = 0.05, n_simulations: int = 10000, random_state: int | None = None, ) -> Bunch: """Tietjen-Moore test for a *specified* number of outliers. Tests the null hypothesis "there are no outliers" against the alternative "the ``n_outliers`` most extreme observations are outliers". Unlike the generalised ESD test, the number of suspected outliers must be fixed in advance. The test statistic has no closed-form critical value, so it is obtained by simulation under the normal null. Parameters ---------- x : np.ndarray or pd.Series One-dimensional sample. Missing values are dropped. n_outliers : int The number of suspected outliers to test for (1 <= n_outliers < N). two_sided : bool, optional If ``True`` (default) the test looks for outliers on either tail (the observations with the largest absolute deviation from the mean). If ``False`` it tests only the ``n_outliers`` largest observations. alpha : float, optional Significance level, by default 0.05. n_simulations : int, optional Number of Monte-Carlo samples used to estimate the critical value. random_state : int or None, optional Seed for the simulation, for reproducibility. Returns ------- sklearn.utils.Bunch With ``statistic``, ``critical_value``, ``reject`` (``True`` when the outliers are significant), ``outlier_indices`` (positions in the missing-value-removed sample), ``n_outliers`` and ``alpha``. References ---------- Tietjen and Moore, "Some Grubbs-type statistics for the detection of several outliers", Technometrics, 14, 583-597, 1972. See also the NIST handbook, section 3.5.h.3. """ a = np.asarray(x, dtype=float).ravel() a = a[~np.isnan(a)] n = a.size if not (1 <= n_outliers < n): raise ValueError(f"n_outliers must be between 1 and {n - 1}, got {n_outliers}.") def _statistic(sample: np.ndarray) -> float: ranking = np.argsort(np.abs(sample - sample.mean())) if two_sided else np.argsort(sample) kept = sample[ranking[: n - n_outliers]] denominator = np.sum((sample - sample.mean()) ** 2) return float(np.sum((kept - kept.mean()) ** 2) / denominator) observed = _statistic(a) # Simulate the null distribution: all observations i.i.d. standard normal. rng = np.random.default_rng(random_state) simulated = np.array([_statistic(rng.standard_normal(n)) for _ in range(n_simulations)]) critical_value = float(np.quantile(simulated, alpha)) outlier_indices = ( np.argsort(np.abs(a - a.mean()))[n - n_outliers :] if two_sided else np.argsort(a)[n - n_outliers :] ) # A small statistic indicates that the removed points really are outliers. return Bunch( statistic=observed, critical_value=critical_value, reject=observed < critical_value, outlier_indices=np.sort(outlier_indices), n_outliers=n_outliers, alpha=alpha, )
[docs] def distribution_fit( x: np.ndarray | pd.Series, distribution: str = "norm", alpha: float = 0.05, ) -> Bunch: """Check how well a sample fits a named distribution. Fits the parameters of the requested ``scipy.stats`` distribution by maximum likelihood and runs a Kolmogorov-Smirnov goodness-of-fit test (NIST handbook, section 3.5.7). Parameters ---------- x : np.ndarray or pd.Series One-dimensional sample. Missing values are dropped. distribution : str, optional Name of any continuous ``scipy.stats`` distribution, by default ``"norm"``. alpha : float, optional Significance level for the ``fits_well`` verdict, by default 0.05. Returns ------- sklearn.utils.Bunch With ``distribution``, fitted ``parameters``, ``ks_statistic``, ``ks_pvalue``, ``fits_well`` (``True`` when the fit is not rejected at level ``alpha``) and the sample size ``n``. Notes ----- Because the distribution parameters are estimated from the same data, the KS p-value is conservative (the true Type-I error is smaller than ``alpha``); it remains a useful screening check. """ from scipy import stats as scipy_stats # noqa: PLC0415 a = np.asarray(x, dtype=float).ravel() a = a[~np.isnan(a)] dist = getattr(scipy_stats, distribution) parameters = dist.fit(a) # Compare against the frozen, fully-parameterised distribution. Passing the # frozen ``.cdf`` (which applies loc/scale internally) rather than the # distribution name plus ``args`` keeps this working across scipy versions, # where the latter form can forward loc/scale positionally to the raw cdf. ks_statistic, ks_pvalue = scipy_stats.kstest(a, dist(*parameters).cdf) return Bunch( distribution=distribution, parameters=parameters, ks_statistic=float(ks_statistic), ks_pvalue=float(ks_pvalue), fits_well=bool(ks_pvalue > alpha), n=int(a.size), )
[docs] def variance_decomposition(df: pd.DataFrame, measured: str, repeat: str) -> dict: """ Given a DataFrame `df` of raw data, and an indication of which column is the `measured` value column, and which is the `repeat` indicator, it will calculate the within and between replicate standard deviation. Example Two measurements on day 1 ``[101, 102]`` and two measurements on day 2 ``[94, 95]``. The between-day variation can already be expected to be much greater than the within-day variation. >>> df = pd.DataFrame(data={'Result': [101, 102, 94, 95], 'Repeat': [1, 1, 2, 2]}) Result Repeat 0 101 1 1 102 1 2 94 2 3 95 2 >>> output = within_between_standard_deviation(df, measured="Result", repeat="Repeat") {'total_ms': 16.666667, 'total_dof': 3, 'within_ms': 0.5, 'within_stddev': 0.70711, 'within_dof': 2, 'between_ms': 49.0, 'between_stddev': 7.0, 'between_dof': 1} Note * SSQ = sum of squares * DOF= degrees of freedom * MS = mean square = (sum of squares) / (degrees of freedom) = SSQ / DOF = variance """ # Overall statistics: total_ms = max(0, df[measured].var()) total_dof = max(0, df[measured].count() - 1) # Within a group: calculate the standard deviation (of variance) of each group. If you take # the average of those variances, pooled, you get an estimate of the within-group variance. within_ms = 0.0 within_dof = 0 for _, group in df.groupby(repeat): dof_group_i = max(0, group[measured].count() - 1) within_dof += dof_group_i # handling missing data makes for messier code within_ms = np.nansum([within_ms, group[measured].var() * dof_group_i]) within_ms = 0 if within_dof == 0 else within_ms / within_dof # Between groups: the ANOVA relationship is such that SSQ(total) = SSQ(between) + SSQ(within). # Therefore the between-group statistics are found by differencing between_dof = max(0, total_dof - within_dof) between_ms = 0 if between_dof == 0 else max(0.0, (total_ms * total_dof - within_ms * within_dof) / between_dof) return { "total_ms": total_ms, "total_dof": total_dof, "within_ms": within_ms, "within_stddev": np.sqrt(within_ms), "within_dof": within_dof, "between_ms": between_ms, "between_stddev": np.sqrt(between_ms), "between_dof": between_dof, }
# --------------------------------------------------------------------------- # Migration helpers - old names raise helpful errors # --------------------------------------------------------------------------- _RENAMED = { "median_abs_deviation": "median_absolute_deviation", "normality_check": "test_normality", "within_between_standard_deviation": "variance_decomposition", "outlier_detection_multiple": "detect_outliers_esd", "ttest_difference_calculate": "ttest_independent", "ttest_paired_difference_calculate": "ttest_paired", "ttest_difference": "ttest_independent_from_df", "ttest_paired_difference": "ttest_paired_from_df", } def __getattr__(name: str) -> NoReturn: """Raise a helpful error when a renamed module attribute is accessed.""" if name in _RENAMED: new = _RENAMED[name] raise AttributeError( f"{name!r} has been renamed to {new!r}. " f"Use: from process_improve.univariate.metrics import {new}" ) raise AttributeError(f"module {__name__!r} has no attribute {name!r}")