Source code for process_improve.monitoring.control_charts

"""Class for ControlChart: robust control charts with a balance between CUSUM and Shewhart properties."""

import logging
from typing import ClassVar

import numpy as np
import pandas as pd

from ..regression.methods import repeated_median_slope
from ..univariate.metrics import median_absolute_deviation

logger = logging.getLogger(__name__)


[docs] def rho(x: float, k: float = 2.52) -> float: """ Bi-weight rho function. Fixed constant of k=2.52 is from p 289 of the paper https://onlinelibrary.wiley.com/doi/abs/10.1002/for.1125 """ return k if np.abs(x) > k else k * (1 - np.power(1 - np.power(x / k, 2), 3))
[docs] def psi(x: float, k: float = 2.0) -> float: """ Pre-clean based on the Huber y-function. Can be interpreted as replacing unexpected high or low values by a more likely value. From p 288 of the paper https://onlinelibrary.wiley.com/doi/abs/10.1002/for.1125 """ return x if abs(x) < k else k * np.sign(x)
[docs] class ControlChart: """Create control chart instance objects."""
[docs] def __init__(self, style: str = "robust", variant: str = "HW") -> None: """ Create/initialize a control chart. Args: style (str, optional): Which style control chart to calculate. Defaults to "robust". Other choice is 'regular' (i.e. not-robust) calculations. User should then ensure that no outliers are present in the data. variant (str, optional): Many variants of control charts are available. The variant string is compared case-insensitively (it is normalised via ``.strip().lower()`` on assignment), so ``'HW'``, ``'hw'``, and ``'Hw'`` are all equivalent. The default is a Holt-Winters (`'hw'`) chart, with automatic determination of control chart parameters. This chart is a blend of infinite history (CUSUM) charts, and an instantaneous (no history taken into account) Shewhart chart. The exact blend is specified by parameters `ld_1` (lambda 1) and `ld_2` (lambda 2). Other variants are: 'xbar.no.subgroup' [Shewhart chart, with no subgroups]. In other words, each observation is independently plotted on the control chart. 'cusum' (CUmulative SUM) chart, which uses all the history of the chart. """ self.style = style.strip() self.variant = variant.strip().lower() # Will be calculated by the self.calculate_limits() function self.target: float | None = None self._given_target: float | None = None self._given_s: float | None = None self.s: float | None = None # index of elements which are found to be outside +/- 3S self.idx_outside_3S: list[int] = [] self.warm_up: dict[str, float | np.ndarray | pd.Series] = {} self.warm_up_M: int = 0 columns = [ "y", "psi_input", "rho_input", "y_star", "alpha_hat", "beta_hat", "sigma_hat", "error", ] self.df = pd.DataFrame(columns=columns, dtype=np.float64)
[docs] def calculate_limits( # noqa: C901 - branch count is mostly simple input-validation guard clauses self, y: np.ndarray | pd.Series, target: float | None = None, s: float | None = None, **kwargs, ) -> None: """ Find for a given vector `y`, the control chart target and limits. Works for both the Holt-Winters ('hw') and 'xbar.no.subgroup' variants. For the Holt-Winters variant, when there are fewer than min(20, max(10, np.ceil(0.10 * N))) measurements (where N is the length of the input vector), the target and standard deviation are estimated directly from the data and any provided `target` / `s` are ignored for that small-sample case. Otherwise, if `target` and `s` are numeric, those values are used; if not, they are estimated. """ self._given_target = target self._given_s = s logger.debug( "ControlChart.calculate_limits: variant=%s, style=%s, given target=%s, s=%s", self.variant, self.style, target, s, ) if s is not None: self.s = float(s) if not 0.0 < s < 1e300: raise ValueError( f"The given standard deviation must be positive and not excessively large (0 < s < 1e300); got {s}." ) if target is not None: self.target = float(target) self.df["y"] = y.ravel() if isinstance(y, np.ndarray) else pd.Series(y).values.ravel() self.N = self.df.shape[0] # Between M = 10 and 20 samples required to warm-up (calculate summary statistics) self.warm_up["M"] = self.warm_up_M = int(min(20, max(10, np.ceil(0.10 * self.N)))) if (self.warm_up_M > self.N) and self.variant.strip().lower() == "hw": # TO CHECK: Completely handle the case with very few samples. Is everything filled in? # Also check case when some of these samples are NAN, you might have even fewer still. self.target = self._target_calculated_best = self.df["y"].median() self.s = self._tau = self.df["y"].std() self.df["y_star"] = self.df["y"].values self.df["alpha_hat"] = self.target self.df["beta_hat"] = 0 self.df["sigma_hat"] = np.nan self.df["error"] = np.nan return # Check if there are enough training samples: if 2 * self.warm_up_M > self.N: self.train_samples: list[int] = [int(i) for i in np.arange(0, self.N)] else: self.train_samples = [int(i) for i in np.arange(self.warm_up_M, self.N)] self._apply_tuning_kwargs(kwargs) if self.variant.strip().lower() == "hw": if not hasattr(self, "ld_1"): self.ld_1 = None if not hasattr(self, "ld_2"): self.ld_2 = None self._holt_winters_parameter_fit() if self.variant.strip().lower() == "xbar.no.subgroup": self._xbar_no_subgroup_fit() # After whichever fit is completed, check which are outside +/- 3S. # Explicit validation (not assert) so the guard survives `python -O` # and surfaces as a documented ValueError at the tool boundary (SEC-17). if self.target is None or self.s is None: raise ValueError( "Control chart limits could not be estimated; the input is likely " "constant or too short to fit the chosen variant." ) idx_bool = (self.df["y"] - self.target).abs() > 3.0 * self.s self.idx_outside_3S = np.nonzero(idx_bool.to_numpy())[0].tolist()
#: Instance attributes a caller may pin via ``calculate_limits(**kwargs)``: #: the Holt-Winters smoothing lambdas. Everything else is internal state. _TUNING_KWARGS: ClassVar[frozenset[str]] = frozenset({"ld_1", "ld_2"}) def _apply_tuning_kwargs(self, kwargs: dict[str, object]) -> None: """Set caller-pinned tuning parameters, rejecting anything off the allowlist. A blanket ``setattr(self, key, val)`` over ``**kwargs`` would let a caller silently overwrite internal state (``self.s``, ``self.target``, ``self.train_samples``, even a bound method) and would swallow typos. We therefore accept only the documented Holt-Winters smoothing lambdas and raise a clear ``ValueError`` otherwise. """ unknown = set(kwargs) - self._TUNING_KWARGS if unknown: raise ValueError( f"calculate_limits() got unexpected keyword argument(s) {sorted(unknown)}; " f"only {sorted(self._TUNING_KWARGS)} (Holt-Winters smoothing lambdas) are accepted." ) for key, val in kwargs.items(): setattr(self, key, val) def _xbar_no_subgroup_fit(self) -> None: """ Fit the control chart from the data samples, assuming each sample is its own subgroup. The `style` attribute ('regular' | 'robust') switches how the average and standard deviation are calculated. Control chart limits assume the data are normally distributed and independent. In particular, this last assumption can have consequences if not actually met. Limits may be too wide, or too narrow. """ if self.style == "regular": self.target = self.df["y"].mean() self.s = self.df["y"].std() elif self.style == "robust": self.target = self.df["y"].median() self.s = (self.df["y"] - self.target).abs().median() * 1.4826 def _holt_winters_parameter_fit(self) -> None: """ Recommended in the paper: not to fit the lambda_s value, but to use a grid search for the lambda_1 and lambda_2 values. This is done in a 5x5 grid in the code below. """ self.ld_s = ld_s = 0.2 rho_func = np.vectorize(rho) if self.ld_1 and self.ld_2: # User has provided their own lambda_1 and lambda_2 values. for _name, _val in (("Lambda_1", self.ld_1), ("Lambda_2", self.ld_2), ("Lambda_s", self.ld_s)): if _val < 0.0: raise ValueError(f"{_name} must be greater than or equal to zero.") if _val > 1.0: raise ValueError(f"{_name} must be less than or equal to 1.0.") self._holt_winters_warmup_fit(ld_1=self.ld_1, ld_2=self.ld_2, ld_s=self.ld_s) else: # User wants to find an value for ld_1 and ld_2 that best fits the data ld_1_index = np.linspace(0.1, 0.9, num=5, endpoint=True) ld_2_index = np.linspace(0.1, 0.9, num=5, endpoint=True) residuals, _ = np.meshgrid(ld_1_index, ld_2_index) for i, ld_1 in enumerate(ld_1_index): for j, ld_2 in enumerate(ld_2_index): self._holt_winters_warmup_fit(ld_1=ld_1, ld_2=ld_2, ld_s=ld_s) # Apply equation 16 from the paper to the residuals in the 'training' period, # that is the samples after the warm-up period. future_errors = self.df["error"].iloc[np.asarray(self.train_samples, dtype=int)] S_T_median_error = 1.48 * np.median(abs(future_errors)) residuals[i, j] = np.power(S_T_median_error, 2) * np.average( rho_func(future_errors / S_T_median_error) ) # print(f"{i}, {j}, {residuals[i, j]:.5f}") min_idx = np.argmin(residuals) best_ld_1 = ld_1_index[np.unravel_index(min_idx, residuals.shape)[0]] best_ld_2 = ld_2_index[np.unravel_index(min_idx, residuals.shape)[1]] # Store the parameters that were calculated, even if a `target` or `s` were provided. self.ld_1 = best_ld_1 self.ld_2 = best_ld_2 self._residuals_HW = residuals self._holt_winters_warmup_fit(ld_1=best_ld_1, ld_2=best_ld_2, ld_s=ld_s) # Common code for both branches of if-else above future_errors = self.df["error"].iloc[np.asarray(self.train_samples, dtype=int)] S_T_median_error = 1.48 * future_errors.abs().median() # must handle NaNs! resids = np.power(S_T_median_error, 2) * np.nanmean(rho_func(future_errors / S_T_median_error)) # Ensure no negative square root is taken self._tau = np.sqrt(max(0.0, resids)) if self.target is None: # Estimate the target as the median of the y-star (cleaned) y-values self.target = self.df["y_star"].median() else: self._target_calculated_best = self.df["y_star"].median() if self.s is None: self.s = self._tau # or an alternative: self.df["sigma_hat"] is approximately OK # The "delta" emphasizes that it is the deviation from the target. self._delta_UCL_3sigma = +3.0 * self._tau self._delta_LCL_3sigma = -3.0 * self._tau def _holt_winters_warmup_fit(self, ld_1: float = 0.5, ld_2: float = 0.8, ld_s: float = 0.2) -> None: """ See paper: https://onlinelibrary.wiley.com/doi/abs/10.1002/for.1125. Calculates the Holt-Winters fitting and control chart parameters, for given values of the smoothing parameters lambda_1 (how must local history for the level is used, with values approaching 1.0 implying that less history is used), and lambda_2 (history for the trend that is used, with lambda_2 approaching 1.0 implying that historical data is less interesting), and lambda_s, a similar parameter for the moving variance of the sequence. lambda_1 = ld_1 = 0.5 (default): value must be between 0 <= ld_1 <= 1.0 lambda_2 = ld_2 = 0.8 (default): value must be between 0 <= ld_2 <= 1.0 lambda_s = ld_s = 0.2 (default): value must be between 0 <= ld_2 <= 1.0, based on values used in the paper, recommended on page 291. The ideal lambda values (ld_1, ld_2, ld_s) can be found from a grid search. """ df = self.df y_warm_up = df["y"].iloc[0 : self.warm_up_M] self.warm_up["y_zero_robust"] = y_warm_up.median() if isinstance(self.target, float): self.warm_up["alpha_0"] = self.target self.warm_up["beta_0"] = 0.0 else: # p 290 of the paper, https://onlinelibrary.wiley.com/doi/abs/10.1002/for.1125 self.warm_up["beta_0"] = repeated_median_slope(np.arange(self.warm_up_M), y_warm_up.to_numpy()) self.warm_up["alpha_0"] = np.nanmedian(y_warm_up - self.warm_up["beta_0"] * np.arange(self.warm_up_M)) if isinstance(self.s, float): self.warm_up["sigma_0"] = self.s else: # p 290 of the paper, https://onlinelibrary.wiley.com/doi/abs/10.1002/for.1125 warm_up_residuals = y_warm_up - self.warm_up["alpha_0"] - self.warm_up["beta_0"] # Some other method that does not rely on SciPy for 1 function. self.warm_up["sigma_0"] = median_absolute_deviation(np.asarray(warm_up_residuals), nan_policy="omit") self.warm_up["residuals"] = warm_up_residuals # A constant (zero-variance) warm-up window gives sigma_0 = MAD = 0, which # would make rho/psi infinite and silently poison every downstream control # limit with 0/NaN. Fail loudly instead of returning meaningless limits. _residuals_array = warm_up_residuals.dropna().to_numpy() _is_constant = _residuals_array.shape[0] == 0 or (_residuals_array[0] == _residuals_array).all() if not _is_constant and self.warm_up["sigma_0"] == 0: # Corner case: if there are multiple unique values in the warm-up residuals, but the MAD is zero, # then sigma_0 is set to regular standard deviation instead, which is non-zero. # This can happen when the warm-up residuals are symmetrically distributed around the median, # leading to a MAD of zero, but still have variability that can be captured by the standard deviation. self.warm_up["sigma_0"] = warm_up_residuals.std() # A constant (zero-variance) warm-up window gives sigma_0 = MAD = 0, which # would make rho/psi infinite and silently poison every downstream control # limit with 0/NaN. Fail loudly instead of returning meaningless limits. if not np.isfinite(self.warm_up["sigma_0"]) or self.warm_up["sigma_0"] <= 0: raise ValueError( "The Holt-Winters warm-up window has zero (or non-finite) variance " "(sigma_0 = 0), so the control-chart limits would be undefined. " "Supply more representative warm-up data, or pass a positive 's'." ) df.loc[0, "rho_input"] = ( self.warm_up["y_zero_robust"] - self.warm_up["alpha_0"] - self.warm_up["beta_0"] ) / self.warm_up["sigma_0"] df.loc[0, "psi_input"] = df["rho_input"][0] df.loc[0, "y_star"] = self.warm_up["y_zero_robust"] df.loc[0, "alpha_hat"] = self.warm_up["alpha_0"] df.loc[0, "beta_hat"] = self.warm_up["beta_0"] df.loc[0, "sigma_hat"] = self.warm_up["sigma_0"] for i in range(1, self.N): # Cover the warm-up period, and the rest of the data set. We need that for the residual # calculation later anyway. # Error = observed - predicted. Predicted = one-step-ahead prediction error_i = df["y"][i] - (df["alpha_hat"][i - 1] + df["beta_hat"][i - 1]) if np.isnan(error_i): # If there is an error, replace it with the median of the last 10 error estimates # or as many points as available. error_i = df["error"].iloc[max(i - 10, 0) : i].abs().median() rho_i = error_i / df["sigma_hat"][i - 1] prior_variance = np.power(df["sigma_hat"][i - 1], 2) sigma_i = np.sqrt(rho(rho_i) * ld_s * prior_variance + (1.0 - ld_s) * prior_variance) psi_i = error_i / sigma_i y_star_i = psi(psi_i) * sigma_i + df["alpha_hat"][i - 1] + df["beta_hat"][i - 1] alpha_i = ld_1 * y_star_i + (1 - ld_1) * (df["alpha_hat"][i - 1] + df["beta_hat"][i - 1]) beta_i = ld_2 * (alpha_i - df["alpha_hat"][i - 1]) + (1 - ld_2) * df["beta_hat"][i - 1] df.loc[i] = [ df["y"][i], psi_i, rho_i, y_star_i, alpha_i, beta_i, sigma_i, error_i, ]
# Checks: for algorithm debugging: # future_errors = df["error"] # S_T_median_error = 1.48 * future_errors.abs().median() # must handle NaNs! # resids = np.power(S_T_median_error, 2) * \ # np.nanmean((future_errors / S_T_median_error).apply(rho)) # print(np.sqrt(max(0.0, resids)))