Source code for process_improve.multivariate._mbpls

# (c) Kevin Dunn, 2010-2026. MIT License. Based on own private work over the years.
"""Multi-block PLS (MBPLS) estimator and its randomization test (ENG-01).

Holds :class:`MBPLS`, the hierarchical / superblock multi-block PLS regressor,
and :func:`randomization_test_mbpls`, the permutation test for the significance
of each fitted component.
"""

from __future__ import annotations

import logging
import time
import typing
import warnings

import numpy as np
import pandas as pd
from sklearn.base import BaseEstimator, RegressorMixin, _fit_context
from sklearn.utils import Bunch
from sklearn.utils.validation import check_is_fitted

from ..visualization.themes import REFERENCE_LINE_COLOR
from ._base import _HotellingsT2LimitMixin
from ._common import SpecificationWarning, _nz, epsqrt
from ._limits import spe_calculation
from ._nipals import quick_regress, ssq
from ._preprocessing import MCUVScaler

try:
    import plotly.graph_objects as go
except ImportError:  # pragma: no cover - exercised via env-without-plotly
    from process_improve._extras import _MissingExtra

    go = _MissingExtra("plotly", "plotting")  # type: ignore[assignment]


logger = logging.getLogger(__name__)


[docs] class MBPLS(_HotellingsT2LimitMixin, RegressorMixin, BaseEstimator): r"""Multi-block PLS (hierarchical / superblock formulation). Generic multi-block PLS as described by Westerhuis, Kourti & MacGregor (1998) and Westerhuis & Smilde (2001). Each X-block is preprocessed independently (mean-centred and unit-variance scaled), then divided by ``sqrt(K_b)`` so that blocks of unequal width contribute fairly to the super-score. Parameters ---------- n_components : int Number of latent variables to extract. max_iter : int, default=500 Maximum NIPALS iterations per latent variable. tol : float or None, default=None Convergence tolerance on the change in the Y-block score. If ``None``, ``np.finfo(float).eps ** (6/7)`` is used (matching the legacy multi-block reference implementation). algorithm : str, default="auto" Algorithm to use for fitting the model. - ``"auto"``: dense vectorised hierarchical NIPALS when every block (X and Y) is complete; mask-aware NIPALS when any block contains missing values. - ``"dense"``: dense vectorised hierarchical NIPALS. Raises if any block contains missing values. - ``"nipals"``: mask-aware hierarchical NIPALS. Always uses the NaN-tolerant inner-loop primitives, even when the data is complete (slower than ``"dense"`` but produces equivalent results). missing_data_settings : dict or None, default=None Settings for the iterative ``"nipals"`` path. Keys: ``md_tol`` (convergence tolerance on the score-vector change between iterations), ``md_max_iter`` (maximum NIPALS iterations per component). Defaults to ``{"md_tol": epsqrt, "md_max_iter": 1000}``. Attributes (after fitting) -------------------------- block_names_ : list[str] Ordered list of X-block names (the keys of the input dict). block_widths_ : dict[str, int] Number of variables in each X-block. super_scores_ : pd.DataFrame, shape (n_samples, n_components) Super-block (consensus) X-scores ``T``. super_y_scores_ : pd.DataFrame, shape (n_samples, n_components) Super-block Y-scores ``U``. super_weights_ : pd.DataFrame, shape (n_blocks, n_components) Super-block weights ``w_super``; rows indexed by block name. super_y_loadings_ : pd.DataFrame, shape (n_targets, n_components) Y-block loadings ``c``. block_scores_ : dict[str, pd.DataFrame] Per-block X-scores ``t_b``, each shape ``(n_samples, n_components)``. block_weights_ : dict[str, pd.DataFrame] Per-block X-weights ``w_b``, each shape ``(K_b, n_components)``. Each column has unit norm. block_loadings_ : dict[str, pd.DataFrame] Per-block X-loadings ``p_b`` (used for deflation), each shape ``(K_b, n_components)``. predictions_ : pd.DataFrame, shape (n_samples, n_targets) In-sample Y predictions on the *original* scale. explained_variance_ : np.ndarray, shape (n_components,) Variance of the super-score per component (ddof=1). scaling_factor_for_super_scores_ : pd.Series ``sqrt(explained_variance_)`` per component. fitting_info_ : dict Per-component iteration count and timing. has_missing_data_ : bool Whether any X-block or Y had NaN values. algorithm_ : str The resolved algorithm actually used for the fit. With ``algorithm="auto"``, this is ``"dense"`` for complete data and ``"nipals"`` for NaN-containing data. Notes ----- Block weighting uses the convention :math:`X_b / \sqrt{K_b}` so that every block contributes the same total sum of squares to the super-score, regardless of how many variables it has. Missing data ------------ When any X-block or Y contains NaN entries, the ``"auto"`` algorithm routes to a mask-aware NIPALS variant. The X-block weights, block scores, block loadings used for deflation, Y-block loadings and Y-block scores are each computed as a regression that uses only the observed entries; the masked sum-of-squares is used as the denominator so missing values neither bias the latent direction nor contribute to the score. The mask is preserved across components automatically because deflation propagates NaN through subtraction. This is the standard skip-NaN NIPALS update; see Walczak & Massart (2001) and Arteaga & Ferrer (2002). The fit refuses to run if any X-block or Y has a column with all entries missing, or a row with all entries missing for that block; either case leaves the masked denominator at zero. Drop or impute such rows or columns before fitting. Predict-time score estimation for new observations with NaN (Trimmed Score Regression / Projection to the Model Plane) is a separate follow-up. References ---------- Westerhuis, J. A., Kourti, T. & MacGregor, J. F. *Analysis of multiblock and hierarchical PCA and PLS models.* Journal of Chemometrics, 12 (1998), 301-321. Westerhuis, J. A. & Smilde, A. K. *Deflation in multiblock PLS.* Journal of Chemometrics, 15 (2001), 485-493. Walczak, B. & Massart, D. L. *Dealing with missing data: Part I.* Chemom. Intell. Lab. Syst., 58 (2001), 15-27. Arteaga, F. & Ferrer, A. *Dealing with missing data in MSPC: several methods, different interpretations, some examples.* J. Chemometrics, 16 (2002), 408-418. """ _valid_algorithms: typing.ClassVar[list[str]] = ["auto", "dense", "nipals"] _parameter_constraints: typing.ClassVar = { "n_components": [int], "max_iter": [int], "tol": [float, None], "algorithm": [str], "missing_data_settings": [dict, None], } def __init__( self, n_components: int, *, max_iter: int = 500, tol: float | None = None, algorithm: str = "auto", missing_data_settings: dict | None = None, ): super().__init__() if n_components <= 0: raise ValueError(f"n_components must be positive; got {n_components}.") if max_iter <= 0: raise ValueError(f"max_iter must be positive; got {max_iter}.") self.n_components = n_components self.max_iter = max_iter self.tol = tol self.algorithm = algorithm self.missing_data_settings = missing_data_settings
[docs] @_fit_context(prefer_skip_nested_validation=True) def fit(self, X: dict[str, pd.DataFrame], y: pd.DataFrame) -> MBPLS: # noqa: C901, PLR0912, PLR0915 """Fit the multi-block PLS model. Parameters ---------- X : dict[str, pd.DataFrame] X-blocks. Keys are block names; values are DataFrames sharing the same row index (and row count). Each block is preprocessed independently. y : pd.DataFrame Y-block. Same row index / row count as the X-blocks. """ if not isinstance(X, dict) or len(X) == 0: raise TypeError("X must be a non-empty dict[str, pd.DataFrame].") if not isinstance(y, pd.DataFrame): y = pd.DataFrame(y) for name, block in X.items(): if not isinstance(block, pd.DataFrame): raise TypeError(f"X['{name}'] must be a pandas DataFrame; got {type(block).__name__}.") self.block_names_: list[str] = list(X.keys()) first = X[self.block_names_[0]] n_samples = first.shape[0] for name in self.block_names_: if X[name].shape[0] != n_samples: raise ValueError( f"All X-blocks must have the same row count. Block '{name}' has " f"{X[name].shape[0]} rows; expected {n_samples}." ) if y.shape[0] != n_samples: raise ValueError(f"y has {y.shape[0]} rows; expected {n_samples} to match X-blocks.") self.block_widths_: dict[str, int] = {name: int(X[name].shape[1]) for name in self.block_names_} self._sample_index = first.index self._y_columns = y.columns self._block_columns: dict[str, pd.Index] = {name: X[name].columns for name in self.block_names_} self.n_samples_ = int(n_samples) self.n_targets_ = int(y.shape[1]) self.n_features_in_ = int(sum(self.block_widths_.values())) # feature_names_in_: sklearn convention (#392). Flat concatenation of # all blocks' column names in block-iteration order. Lets # ``Pipeline.get_feature_names_out`` and SHAP / eli5 / model-card # tooling introspect a multiblock fit through the same surface as a # single-block estimator. self.feature_names_in_ = np.concatenate( [self._block_columns[name].to_numpy() for name in self.block_names_] ) n_components = int(self.n_components) n_blocks = len(self.block_names_) self.has_missing_data_ = any(np.any(X[name].isna().values) for name in self.block_names_) or bool( np.any(y.isna().values) ) algo = self.algorithm.lower() if algo not in self._valid_algorithms: raise ValueError( f"Algorithm '{self.algorithm}' is not recognised. Must be one of {self._valid_algorithms}." ) if algo == "auto": algo = "nipals" if self.has_missing_data_ else "dense" if algo == "dense" and self.has_missing_data_: raise ValueError("Algorithm 'dense' cannot handle missing data. Use 'nipals' or 'auto' instead.") self.algorithm_ = algo # Resolve iterative-algorithm settings (used by the 'nipals' path). settings = {"md_tol": epsqrt, "md_max_iter": 1000} if isinstance(self.missing_data_settings, dict): settings.update(self.missing_data_settings) settings["md_max_iter"] = int(settings["md_max_iter"]) if algo == "nipals": if not settings["md_tol"] < 10: raise ValueError("Tolerance should not be too large.") if not settings["md_tol"] > epsqrt**1.95: raise ValueError("Tolerance must exceed machine precision.") # Degeneracy guards: any column or any (block, row) entirely NaN # leaves the masked NIPALS denominator at zero, which would # silently produce a spurious score or loading. Refuse the fit # rather than coerce the user into a misleading result. for name in self.block_names_: values = X[name].values col_all_nan = np.all(np.isnan(values), axis=0) if np.any(col_all_nan): bad = X[name].columns[col_all_nan].tolist() raise ValueError( f"Block '{name}' has columns with all values missing: {bad}. " "Drop these columns before fitting." ) row_all_nan = np.all(np.isnan(values), axis=1) if np.any(row_all_nan): bad_rows = np.where(row_all_nan)[0].tolist() raise ValueError( f"Block '{name}' has rows with all values missing at positions {bad_rows}. " "Drop these observations or impute them before fitting." ) y_values = y.values y_col_all_nan = np.all(np.isnan(y_values), axis=0) if np.any(y_col_all_nan): bad = y.columns[y_col_all_nan].tolist() raise ValueError( f"Y has columns with all values missing: {bad}. Drop these targets before fitting." ) y_row_all_nan = np.all(np.isnan(y_values), axis=1) if np.any(y_row_all_nan): bad_rows = np.where(y_row_all_nan)[0].tolist() raise ValueError( f"Y has rows with all values missing at positions {bad_rows}. " "Drop these observations or impute them before fitting." ) # Preprocess each X-block and Y independently self.preproc_: dict[str, MCUVScaler] = {name: MCUVScaler().fit(X[name]) for name in self.block_names_} self.y_preproc_ = MCUVScaler().fit(y) x_blocks_pp: dict[str, np.ndarray] = { name: self.preproc_[name].transform(X[name]).values.astype(float) for name in self.block_names_ } y_pp = self.y_preproc_.transform(y).values.astype(float) # Algorithmic block weighting: X_b / sqrt(K_b) sqrt_kb = {name: float(np.sqrt(self.block_widths_[name])) for name in self.block_names_} # Storage (numpy arrays during fit; wrapped in pandas at the end) super_scores_np = np.zeros((n_samples, n_components)) super_y_scores_np = np.zeros((n_samples, n_components)) super_weights_np = np.zeros((n_blocks, n_components)) super_y_loadings_np = np.zeros((self.n_targets_, n_components)) block_scores_np: dict[str, np.ndarray] = { name: np.zeros((n_samples, n_components)) for name in self.block_names_ } block_weights_np: dict[str, np.ndarray] = { name: np.zeros((self.block_widths_[name], n_components)) for name in self.block_names_ } block_loadings_np: dict[str, np.ndarray] = { name: np.zeros((self.block_widths_[name], n_components)) for name in self.block_names_ } x_def: dict[str, np.ndarray] = {name: x_blocks_pp[name].copy() for name in self.block_names_} y_def = y_pp.copy() # Initial sums of squares (for R^2 bookkeeping) ssq_x_init = {name: float(np.nansum(x_blocks_pp[name] ** 2)) for name in self.block_names_} ssq_y_init = float(np.nansum(y_pp ** 2)) ssq_x_init_per_var = { name: np.nansum(x_blocks_pp[name] ** 2, axis=0) for name in self.block_names_ } ssq_y_init_per_var = np.nansum(y_pp ** 2, axis=0) # Per-component cumulative R^2 storage (filled inside the loop) r2_x_block_cum = np.zeros((n_blocks, n_components)) r2_x_var_cum: dict[str, np.ndarray] = { name: np.zeros((self.block_widths_[name], n_components)) for name in self.block_names_ } r2_y_cum = np.zeros(n_components) r2_y_var_cum = np.zeros((self.n_targets_, n_components)) block_spe_np: dict[str, np.ndarray] = { name: np.zeros((n_samples, n_components)) for name in self.block_names_ } tol = float(np.finfo(float).eps ** (6 / 7)) if self.tol is None else float(self.tol) timing = np.zeros(n_components) iterations = np.zeros(n_components, dtype=int) rng = np.random.default_rng(0) for a in range(n_components): start = time.time() u_a = rng.standard_normal(n_samples) prev = u_a * 2 local_w: dict[str, np.ndarray] = {} local_t: dict[str, np.ndarray] = {} t_b_summary = np.zeros((n_samples, n_blocks)) t_super = np.zeros(n_samples) w_s = np.zeros(n_blocks) c_a = np.zeros(self.n_targets_) itern = 0 while np.linalg.norm(prev - u_a) > tol and itern < self.max_iter: prev = u_a if algo == "nipals": # Mask-aware NIPALS: each projection is a per-column (or # per-row) regression that uses only the entries that # are not NaN, and divides by the masked sum of squares. # Reuses the same primitives as single-block PCA NIPALS. u_a_col = u_a.reshape(-1, 1) for b_idx, name in enumerate(self.block_names_): w_b = quick_regress(x_def[name], u_a_col).flatten() w_b = w_b / _nz(float(np.sqrt(ssq(w_b.reshape(-1, 1))))) t_b = quick_regress(x_def[name], w_b.reshape(-1, 1)).flatten() / sqrt_kb[name] local_w[name] = w_b local_t[name] = t_b t_b_summary[:, b_idx] = t_b else: for b_idx, name in enumerate(self.block_names_): w_b = x_def[name].T @ u_a / _nz(float(u_a @ u_a)) w_b = w_b / _nz(float(np.linalg.norm(w_b))) t_b = x_def[name] @ w_b / _nz(float(w_b @ w_b)) / sqrt_kb[name] local_w[name] = w_b local_t[name] = t_b t_b_summary[:, b_idx] = t_b w_s = t_b_summary.T @ u_a / _nz(float(u_a @ u_a)) w_s = w_s / _nz(float(np.linalg.norm(w_s))) t_super = t_b_summary @ w_s / _nz(float(w_s @ w_s)) if algo == "nipals": t_super_col = t_super.reshape(-1, 1) c_a = quick_regress(y_def, t_super_col).flatten() u_a = quick_regress(y_def, c_a.reshape(-1, 1)).flatten() else: c_a = y_def.T @ t_super / _nz(float(t_super @ t_super)) u_a = y_def @ c_a / _nz(float(c_a @ c_a)) itern += 1 # Sign convention: largest |w_super| element positive flip_idx = int(np.argmax(np.abs(w_s))) if w_s[flip_idx] < 0: w_s = -w_s t_super = -t_super u_a = -u_a c_a = -c_a for name in self.block_names_: local_w[name] = -local_w[name] local_t[name] = -local_t[name] # Deflate using the super-score t_super_col = t_super.reshape(-1, 1) for name in self.block_names_: if algo == "nipals": p_b = quick_regress(x_def[name], t_super_col).flatten() else: p_b = x_def[name].T @ t_super / _nz(float(t_super @ t_super)) x_def[name] = x_def[name] - np.outer(t_super, p_b) block_loadings_np[name][:, a] = p_b block_weights_np[name][:, a] = local_w[name] block_scores_np[name][:, a] = local_t[name] y_def = y_def - np.outer(t_super, c_a) super_scores_np[:, a] = t_super super_y_scores_np[:, a] = u_a super_weights_np[:, a] = w_s super_y_loadings_np[:, a] = c_a # Track per-block cumulative R^2_X and per-Y-variable cumulative R^2_Y for b_idx, name in enumerate(self.block_names_): ssq_remain_per_var = np.nansum(x_def[name] ** 2, axis=0) # R^2 is undefined for a zero-variance block/column; report NaN # rather than dividing by zero (inf/nan + warning) or returning a # misleading 1.0. r2_x_block_cum[b_idx, a] = ( 1 - np.sum(ssq_remain_per_var) / ssq_x_init[name] if ssq_x_init[name] > 0 else np.nan ) r2_x_var_cum[name][:, a] = np.where( ssq_x_init_per_var[name] > 0, 1 - ssq_remain_per_var / np.where(ssq_x_init_per_var[name] > 0, ssq_x_init_per_var[name], 1.0), np.nan, ) block_spe_np[name][:, a] = np.sqrt(np.nansum(x_def[name] ** 2, axis=1)) ssq_y_remain_per_var = np.nansum(y_def ** 2, axis=0) r2_y_cum[a] = 1 - np.sum(ssq_y_remain_per_var) / ssq_y_init if ssq_y_init > 0 else np.nan r2_y_var_cum[:, a] = np.where( ssq_y_init_per_var > 0, 1 - ssq_y_remain_per_var / np.where(ssq_y_init_per_var > 0, ssq_y_init_per_var, 1.0), np.nan, ) timing[a] = time.time() - start iterations[a] = itern component_names = list(range(1, n_components + 1)) self.super_scores_ = pd.DataFrame(super_scores_np, index=self._sample_index, columns=component_names) self.super_y_scores_ = pd.DataFrame(super_y_scores_np, index=self._sample_index, columns=component_names) self.super_weights_ = pd.DataFrame(super_weights_np, index=self.block_names_, columns=component_names) self.super_y_loadings_ = pd.DataFrame(super_y_loadings_np, index=self._y_columns, columns=component_names) self.block_scores_ = { name: pd.DataFrame(block_scores_np[name], index=self._sample_index, columns=component_names) for name in self.block_names_ } self.block_weights_ = { name: pd.DataFrame( block_weights_np[name], index=self._block_columns[name], columns=component_names ) for name in self.block_names_ } self.block_loadings_ = { name: pd.DataFrame( block_loadings_np[name], index=self._block_columns[name], columns=component_names ) for name in self.block_names_ } # In-sample predictions on the original Y scale y_hat_pp = super_scores_np @ super_y_loadings_np.T y_hat = self.y_preproc_.inverse_transform(pd.DataFrame(y_hat_pp, columns=self._y_columns)) y_hat.index = self._sample_index self.predictions_ = y_hat self.explained_variance_ = np.diag(super_scores_np.T @ super_scores_np) / max(1, n_samples - 1) self.scaling_factor_for_super_scores_ = pd.Series( np.sqrt(self.explained_variance_), index=component_names, name="Standard deviation per super-score" ) converged = iterations < self.max_iter self.fitting_info_ = {"timing": timing, "iterations": iterations, "converged": converged} logger.debug("MBPLS (%s): iterations per component = %s", self.algorithm_, list(iterations)) if not np.all(converged): failed = [int(i + 1) for i, ok in enumerate(converged) if not ok] warnings.warn( f"MBPLS NIPALS did not converge within max_iter={self.max_iter} for " f"component(s) {failed}; results for those components may be unreliable.", SpecificationWarning, stacklevel=2, ) # --- Per-component (incremental) R^2 from cumulative --- r2_x_block_per_a = np.zeros_like(r2_x_block_cum) r2_x_block_per_a[:, 0] = r2_x_block_cum[:, 0] if n_components > 1: r2_x_block_per_a[:, 1:] = np.diff(r2_x_block_cum, axis=1) r2_y_per_a = np.empty_like(r2_y_cum) r2_y_per_a[0] = r2_y_cum[0] if n_components > 1: r2_y_per_a[1:] = np.diff(r2_y_cum) self.r2_x_per_block_cumulative_ = pd.DataFrame( r2_x_block_cum, index=self.block_names_, columns=component_names ) self.r2_x_per_block_per_component_ = pd.DataFrame( r2_x_block_per_a, index=self.block_names_, columns=component_names ) self.r2_x_per_variable_ = { name: pd.DataFrame(r2_x_var_cum[name], index=self._block_columns[name], columns=component_names) for name in self.block_names_ } self.r2_y_cumulative_ = pd.Series(r2_y_cum, index=component_names, name="Cumulative R²Y") self.r2_y_per_component_ = pd.Series(r2_y_per_a, index=component_names, name="R²Y per component") self.r2_y_per_variable_ = pd.DataFrame(r2_y_var_cum, index=self._y_columns, columns=component_names) # --- Per-block VIP and super VIP --- # Per-block VIP_jb = sqrt(K_b * sum_a(r2_x_block_a * w_b[j,a]^2) / sum_a r2_x_block_a) self.block_vip_: dict[str, pd.Series] = {} for b_idx, name in enumerate(self.block_names_): r2 = r2_x_block_per_a[b_idx, :] if np.sum(r2) > 0: w = self.block_weights_[name].values # (K_b, A) vip_b = np.sqrt(self.block_widths_[name] * np.sum(r2 * w**2, axis=1) / np.sum(r2)) else: vip_b = np.zeros(self.block_widths_[name]) self.block_vip_[name] = pd.Series(vip_b, index=self._block_columns[name], name=f"VIP[{name}]") # Super VIP_b = sqrt(B * sum_a(r2_y_a * w_super[b,a]^2) / sum_a r2_y_a) if np.sum(r2_y_per_a) > 0: ws = self.super_weights_.values # (B, A) super_vip = np.sqrt(n_blocks * np.sum(r2_y_per_a * ws**2, axis=1) / np.sum(r2_y_per_a)) else: super_vip = np.zeros(n_blocks) self.super_vip_ = pd.Series(super_vip, index=self.block_names_, name="Super VIP") # --- Per-block SPE (already accumulated) and per-block / super Hotelling's T^2 --- self.block_spe_ = { name: pd.DataFrame(block_spe_np[name], index=self._sample_index, columns=component_names) for name in self.block_names_ } # Cumulative T^2 from block scores and super scores (using per-component score variance) block_t2: dict[str, np.ndarray] = {} for name in self.block_names_: scores_np = self.block_scores_[name].values # (N, A) score_var = np.var(scores_np, axis=0, ddof=1) score_var = np.where(score_var > 0, score_var, 1.0) t2 = np.cumsum((scores_np**2) / score_var, axis=1) block_t2[name] = t2 self.block_hotellings_t2_ = { name: pd.DataFrame(block_t2[name], index=self._sample_index, columns=component_names) for name in self.block_names_ } super_score_var = np.where(self.explained_variance_ > 0, self.explained_variance_, 1.0) super_t2 = np.cumsum((super_scores_np**2) / super_score_var, axis=1) self.super_hotellings_t2_ = pd.DataFrame(super_t2, index=self._sample_index, columns=component_names) return self
[docs] def block_spe_limit(self, block: str, conf_level: float = 0.95) -> float: """SPE limit for one X-block using the Nomikos & MacGregor chi-square approximation. Operates on the same scale as ``block_spe_[block]`` (sqrt of row sum of squares), so the value can be drawn directly on a SPE plot. """ check_is_fitted(self, "block_spe_") if block not in self.block_spe_: raise KeyError(f"Unknown block '{block}'. Known blocks: {list(self.block_spe_)}.") return spe_calculation(self.block_spe_[block].iloc[:, -1].to_numpy(), conf_level=conf_level)
[docs] def super_spe_limit(self, conf_level: float = 0.95) -> float: """SPE limit for the merged super-block (sum of per-block SPE squared).""" check_is_fitted(self, "block_spe_") merged_spe_squared = np.zeros(self.n_samples_) for name in self.block_names_: merged_spe_squared += self.block_spe_[name].iloc[:, -1].to_numpy() ** 2 return spe_calculation(np.sqrt(merged_spe_squared), conf_level=conf_level)
[docs] def spe_contributions(self, X: dict[str, pd.DataFrame]) -> dict[str, pd.DataFrame]: """Per-variable squared residuals for each X-block (SPE contributions). For each new observation and each X-block, reconstruct the block as ``T_super @ P_b^T`` (matching the deflation step used during fit) and return the squared per-variable residuals. Useful for fault diagnosis: the variable with the largest contribution is the most likely culprit for a high SPE. Returns ------- dict[str, pd.DataFrame] One DataFrame per block, shape ``(n_samples, K_b)``. Values are preprocessed-scale squared residuals (centred and scaled inside the model). Sum across columns equals ``block_spe_[b].iloc[:, -1] ** 2``. """ check_is_fitted(self, "block_loadings_") if not isinstance(X, dict): raise TypeError("X must be a dict[str, pd.DataFrame].") missing = set(self.block_names_) - set(X) if missing: raise ValueError(f"Missing X-blocks: {sorted(missing)}.") result = self._project(X) super_scores = result.super_scores.values # (N, A) out: dict[str, pd.DataFrame] = {} sample_index = next(iter(result.block_scores.values())).index for name in self.block_names_: block = X[name] if not isinstance(block, pd.DataFrame): block = pd.DataFrame(block, columns=self._block_columns[name]) x_pp = self.preproc_[name].transform(block).values.astype(float) x_hat = super_scores @ self.block_loadings_[name].values.T residuals_sq = (x_pp - x_hat) ** 2 out[name] = pd.DataFrame(residuals_sq, index=sample_index, columns=self._block_columns[name]) return out
[docs] def score_contributions( self, t_super_start: np.ndarray | pd.Series, t_super_end: np.ndarray | pd.Series | None = None, components: list[int] | None = None, *, weighted: bool = False, ) -> dict[str, pd.Series]: r"""Per-block per-variable contributions to a super-score movement. The multi-block analogue of :meth:`PCA.score_contributions`. Decomposes a super-score-space delta back into preprocessed-scale variable-space deltas, one per X-block. For MBPLS, the super-score at component *a* is built from the per-block scores (which themselves are weighted regressions of the block on ``w_b``), so the back-projection through ``w_super[b,a] * w_b[:,a] / sqrt(K_b)`` gives the variable contribution. Parameters ---------- t_super_start : array-like, shape (n_components,) Super-score row of the observation of interest. Typically a row from ``self.super_scores_`` or from ``predict(X_new).super_scores``. t_super_end : array-like, optional Reference point in super-score space. Defaults to the model centre (zeros). components : list of int, optional **1-based** component indices to decompose over. ``None`` (default) uses all components - appropriate for Hotelling's T² contributions. weighted : bool, default=False If ``True``, divide the super-score delta by ``sqrt(explained_variance_)`` per component before back-projecting, giving contributions to the T² statistic instead of the Euclidean super-score distance. Returns ------- dict[str, pd.Series] One Series per X-block (length ``K_b``), indexed by variable (column) name. """ check_is_fitted(self, "block_weights_") t_start = np.asarray(t_super_start, dtype=float) t_end = np.zeros(self.n_components) if t_super_end is None else np.asarray(t_super_end, dtype=float) idx = np.arange(self.n_components) if components is None else np.array(components) - 1 dt = t_end[idx] - t_start[idx] # (len(idx),) if weighted: # ``explained_variance_[a] == 0`` for a degenerate component # would silently produce inf/NaN weighted contributions. # Clamp the divisor so weighting is a no-op on such components. # SEC-21 (#270) sub-item 5. ev = np.asarray(self.explained_variance_)[idx] dt = dt / np.sqrt(np.where(ev > epsqrt, ev, 1.0)) out: dict[str, pd.Series] = {} for b_idx, name in enumerate(self.block_names_): sqrt_kb = float(np.sqrt(self.block_widths_[name])) ws = self.super_weights_.values[b_idx, idx] # (len(idx),) wb = self.block_weights_[name].values[:, idx] # (K_b, len(idx)) # Effective per-component contribution per variable: w_super[b] * w_b[:,a] / sqrt(K_b) effective = wb * (ws / sqrt_kb) # (K_b, len(idx)) contrib = effective @ dt # (K_b,) out[name] = pd.Series(contrib, index=self._block_columns[name], name=f"score_contributions[{name}]") return out
[docs] def super_score_plot(self, pc_horiz: int = 1, pc_vert: int = 2) -> go.Figure: """Scatter plot of super-scores for two components.""" check_is_fitted(self, "super_scores_") a_max = int(self.n_components) if not (1 <= pc_horiz <= a_max and 1 <= pc_vert <= a_max): raise ValueError(f"pc_horiz and pc_vert must be in 1..{a_max}.") x = self.super_scores_[pc_horiz].to_numpy() y = self.super_scores_[pc_vert].to_numpy() labels = [str(i) for i in self.super_scores_.index] fig = go.Figure( data=[ go.Scatter( x=x, y=y, mode="markers+text", text=labels, textposition="top center", name="Super-scores", ) ] ) fig.update_layout( xaxis_title=f"t_super[{pc_horiz}]", yaxis_title=f"t_super[{pc_vert}]", title=f"MBPLS super-score plot: PC{pc_horiz} vs PC{pc_vert}", ) return fig
[docs] def super_weights_bar_plot(self, component: int = 1) -> go.Figure: """Bar plot of super-weights ``w_super`` for a single component.""" check_is_fitted(self, "super_weights_") a_max = int(self.n_components) if not (1 <= component <= a_max): raise ValueError(f"component must be in 1..{a_max}.") weights = self.super_weights_[component] fig = go.Figure(data=[go.Bar(x=list(weights.index), y=weights.to_numpy(), name=f"w_super[{component}]")]) fig.update_layout( xaxis_title="Block", yaxis_title=f"w_super[{component}]", title=f"MBPLS super-weights, component {component}", ) return fig
[docs] def predictions_vs_observed_plot(self, y_observed: pd.DataFrame, variable: str | None = None) -> go.Figure: """Scatter plot of predicted vs observed Y, with y=x reference and RMSEE annotation. Parameters ---------- y_observed : pd.DataFrame The observed Y on the original scale, same columns as the training Y. variable : str or None, default=None If given, plot only that Y-variable. If ``None``, plot the first one. """ check_is_fitted(self, "predictions_") if variable is None: variable = str(self.predictions_.columns[0]) if variable not in self.predictions_.columns: raise ValueError(f"Unknown Y-variable '{variable}'. Known: {list(self.predictions_.columns)}.") observed = pd.Series(y_observed[variable].values, name="observed").reset_index(drop=True) predicted = pd.Series(self.predictions_[variable].values, name="predicted").reset_index(drop=True) rmsee = float(np.sqrt(np.mean((observed.to_numpy() - predicted.to_numpy()) ** 2))) lo = float(min(observed.min(), predicted.min())) hi = float(max(observed.max(), predicted.max())) pad = 0.05 * (hi - lo) if hi > lo else 1.0 fig = go.Figure( data=[ go.Scatter(x=observed, y=predicted, mode="markers", name="Predicted vs observed"), go.Scatter( x=[lo - pad, hi + pad], y=[lo - pad, hi + pad], mode="lines", line={"color": REFERENCE_LINE_COLOR, "dash": "dash"}, name="y = x", ), ] ) fig.add_annotation( x=lo + 0.05 * (hi - lo), y=hi - 0.05 * (hi - lo), text=f"RMSEE = {rmsee:.4g}", showarrow=False, ) fig.update_layout( xaxis_title=f"Observed: {variable}", yaxis_title=f"Predicted: {variable}", title=f"Predicted vs observed for {variable}", ) return fig
[docs] def display_results(self, show_cumulative: bool = True) -> str: """Format a short text summary of per-block R²X, overall R²Y, iterations and timing.""" check_is_fitted(self, "super_scores_") rows: list[str] = [] rows.append(f"MBPLS model: {self.n_components} component(s), {len(self.block_names_)} X-block(s)") header = " PC | " + " | ".join(f"R²X[{name}]" for name in self.block_names_) + " | R²Y" rows.append(header) rows.append("-" * len(header)) for a in range(self.n_components): cells = [f"{i:>3d}" for i in [a + 1]] for name in self.block_names_: src = self.r2_x_per_block_cumulative_ if show_cumulative else self.r2_x_per_block_per_component_ cells.append(f"{src.loc[name].iloc[a]:>9.4f}") r2y_src = self.r2_y_cumulative_ if show_cumulative else self.r2_y_per_component_ cells.append(f"{r2y_src.iloc[a]:>9.4f}") rows.append(" | ".join(cells)) rows.append("") rows.append(f" Iterations per PC: {list(self.fitting_info_['iterations'])}") rows.append(f" Time per PC (ms): {[round(float(t * 1000), 1) for t in self.fitting_info_['timing']]}") return "\n".join(rows)
[docs] def transform(self, X: dict[str, pd.DataFrame]) -> pd.DataFrame: """Project new data to super-scores using the fitted model.""" check_is_fitted(self, "super_weights_") return self._project(X).super_scores
[docs] def diagnose(self, X: dict[str, pd.DataFrame]) -> Bunch: """Project new data and return the full diagnostics Bunch. Returns a :class:`sklearn.utils.Bunch` with fields ``super_scores`` (DataFrame, n_samples x n_components), ``block_scores`` (dict[str, DataFrame]), ``predictions`` (DataFrame on original Y scale), ``block_spe`` (dict[str, Series], per-block SPE of the new observations) and ``hotellings_t2`` (Series of cumulative Hotelling's T² over all components, per new observation). The rename (since 1.38.4, #395) matches :meth:`PLS.diagnose` and PCA.diagnose; :meth:`predict` is kept as a deprecation shim. """ check_is_fitted(self, "super_weights_") return self._project(X)
[docs] def predict(self, X: dict[str, pd.DataFrame]) -> Bunch: """Forward to :meth:`diagnose`; emits a :class:`DeprecationWarning`. .. deprecated:: 1.38.4 Use :meth:`MBPLS.diagnose` instead. The sklearn-convention ``predict`` name suggests a regression-style ndarray return, but the historical return is the rich diagnostics Bunch. The rename aligns with :meth:`PLS.diagnose` and :meth:`PCA.diagnose` and frees the name for a future contract that returns just the ``predictions`` field. Will be removed in 2.0.0. """ warnings.warn( "MBPLS.predict is deprecated and will be removed in 2.0.0; use MBPLS.diagnose instead.", DeprecationWarning, stacklevel=2, ) return self.diagnose(X)
def _project(self, X: dict[str, pd.DataFrame]) -> Bunch: if not isinstance(X, dict): raise TypeError("X must be a dict[str, pd.DataFrame].") missing = set(self.block_names_) - set(X) if missing: raise ValueError(f"Missing X-blocks for prediction: {sorted(missing)}.") # Preprocess each block x_pp: dict[str, np.ndarray] = {} sample_index: pd.Index | None = None for name in self.block_names_: block = X[name] if not isinstance(block, pd.DataFrame): block = pd.DataFrame(block, columns=self._block_columns[name]) if block.shape[1] != self.block_widths_[name]: raise ValueError( f"Block '{name}' must have {self.block_widths_[name]} columns; got {block.shape[1]}." ) x_pp[name] = self.preproc_[name].transform(block).values.astype(float) if sample_index is None: sample_index = block.index n_components = int(self.n_components) n_new = next(iter(x_pp.values())).shape[0] sqrt_kb = {name: float(np.sqrt(self.block_widths_[name])) for name in self.block_names_} super_scores = np.zeros((n_new, n_components)) block_scores: dict[str, np.ndarray] = { name: np.zeros((n_new, n_components)) for name in self.block_names_ } x_def = {name: x_pp[name].copy() for name in self.block_names_} for a in range(n_components): t_b_row = np.zeros((n_new, len(self.block_names_))) for b_idx, name in enumerate(self.block_names_): w_b = self.block_weights_[name].values[:, a] t_b = x_def[name] @ w_b / sqrt_kb[name] block_scores[name][:, a] = t_b t_b_row[:, b_idx] = t_b w_s = self.super_weights_.values[:, a] t_super = t_b_row @ w_s super_scores[:, a] = t_super for name in self.block_names_: p_b = self.block_loadings_[name].values[:, a] x_def[name] = x_def[name] - np.outer(t_super, p_b) component_names = list(range(1, n_components + 1)) super_scores_df = pd.DataFrame(super_scores, index=sample_index, columns=component_names) block_scores_df = { name: pd.DataFrame(block_scores[name], index=sample_index, columns=component_names) for name in self.block_names_ } y_hat_pp = super_scores @ self.super_y_loadings_.values.T predictions = self.y_preproc_.inverse_transform(pd.DataFrame(y_hat_pp, columns=self._y_columns)) assert sample_index is not None # block_names_ is non-empty, so it was set in the loop predictions.index = sample_index # Per-block SPE for new observations (residual after final deflation) block_spe = { name: pd.Series( np.sqrt(np.nansum(x_def[name] ** 2, axis=1)), index=sample_index, name=f"SPE[{name}]" ) for name in self.block_names_ } super_score_var = np.where(self.explained_variance_ > 0, self.explained_variance_, 1.0) hotellings_t2 = pd.Series( np.sum((super_scores**2) / super_score_var, axis=1), index=sample_index, name="Hotelling's T²" ) return Bunch( super_scores=super_scores_df, block_scores=block_scores_df, predictions=predictions, block_spe=block_spe, hotellings_t2=hotellings_t2, )
[docs] def randomization_test_mbpls( model: MBPLS, X: dict[str, pd.DataFrame], y: pd.DataFrame, n_permutations: int = 200, *, seed: int | None = None, ) -> pd.DataFrame: r"""Randomization (permutation) test for component significance in MBPLS. For each component ``a``, the null hypothesis is "there is no real relationship between X and Y at this component"; the test permutes the rows of ``y``, refits a fresh MBPLS with the same number of components, and recomputes the test statistic. The risk is the fraction of permutations whose statistic equals or exceeds the original model's. Statistic: per-component absolute correlation between the super X-score and the super Y-score, ``|t_super(:,a)' u_super(:,a)| / (||t|| * ||u||)``, matching the legacy ConnectMV randomization-objective for PLS. Parameters ---------- model : MBPLS A fitted MBPLS model. X, y : dict[str, DataFrame], DataFrame The same training data used to fit ``model``. n_permutations : int, default=200 Number of Y-row permutations to evaluate. seed : int or None, default=None Seed for the permutation RNG (``None`` uses non-reproducible randomness). Returns ------- pd.DataFrame Indexed by component ``1..A`` with columns: - ``observed`` : the actual model's per-component statistic. - ``risk_pct`` : fraction (in %) of permutations with statistic ``>= observed``. Low values (e.g. < 5%) suggest the component is significant; values near 50% suggest the component is no better than chance. References ---------- Wiklund, S., Nilsson, D., Eriksson, L., Sjöström, M., Wold, S. & Faber, K. *A randomization test for PLS component selection.* J. Chemometrics, 21 (2007), 427-439. """ check_is_fitted(model, "super_scores_") rng = np.random.default_rng(seed) a_components = int(model.n_components) def _objective(mod: MBPLS) -> np.ndarray: t = mod.super_scores_.values u = mod.super_y_scores_.values out = np.zeros(t.shape[1]) for a in range(t.shape[1]): num = float(np.abs(t[:, a] @ u[:, a])) denom = float(np.linalg.norm(t[:, a]) * np.linalg.norm(u[:, a])) # SEC-33 (#282): float ``==`` zero only catches the exact-zero # case; a sub-eps near-zero denom produced a meaningless ratio # that the permutation test treated as an observed statistic. out[a] = 0.0 if denom <= epsqrt else num / denom return out observed = _objective(model) n_exceed = np.zeros(a_components, dtype=int) n_samples = y.shape[0] for _ in range(int(n_permutations)): perm_idx = rng.permutation(n_samples) y_perm = y.iloc[perm_idx].reset_index(drop=True) # Reset X indices to align row positions (otherwise pandas will # join on index and silently misalign). x_reset = {name: X[name].reset_index(drop=True) for name in X} permuted_model = MBPLS(n_components=a_components).fit(x_reset, y_perm) stat = _objective(permuted_model) n_exceed += (stat >= observed).astype(int) component_names = list(range(1, a_components + 1)) risk_pct = 100.0 * n_exceed / n_permutations return pd.DataFrame( {"observed": observed, "risk_pct": risk_pct}, index=pd.Index(component_names, name="component"), )