# (c) Kevin Dunn, 2010-2026. MIT License. Based on own private work over the years.
"""Projection to Latent Structures (PLS) regression estimator (ENG-01).
The sklearn-compatible :class:`PLS` regressor (NIPALS, missing-data aware), with
cross-validation, prediction intervals, diagnostics, confidence limits and
plotting bound as convenience methods after ``fit()``.
"""
from __future__ import annotations
import logging
import time
import typing
import warnings
import numpy as np
import pandas as pd
from scipy.stats import t as t_dist
from sklearn.base import BaseEstimator, RegressorMixin, TransformerMixin, clone
from sklearn.metrics import r2_score
from sklearn.model_selection import BaseCrossValidator, KFold, RepeatedKFold, check_cv
from sklearn.utils import Bunch
from sklearn.utils.validation import check_is_fitted, validate_data
from tqdm import tqdm
from .._linalg import safe_inverse
from ..univariate.metrics import detect_outliers_esd
from ._base import _LatentVariableModel, _LazyFrame
from ._common import (
Q2_MIN_INCREMENT,
DataMatrix,
NotEnoughVarianceError,
SelectionRule,
SpecificationWarning,
_align_to_fit_features,
_model_method,
_select_n_components,
epsqrt,
)
from ._nipals import quick_regress, ssq, terminate_check
from ._preprocessing import MCUVScaler
from .plots import (
coefficient_plot as _coefficient_plot,
)
from .plots import (
predictions_vs_observed_plot as _predictions_vs_observed_plot,
)
logger = logging.getLogger(__name__)
def _vandervoet_randomization(
per_obs_sse: np.ndarray,
*,
total_rmsecv: np.ndarray,
n_permutations: int = 999,
alpha: float = 0.01,
random_state: int | None = None,
) -> tuple[int, np.ndarray]:
"""Van der Voet (1994) randomization test for PLS component selection.
Compares every candidate model against the reference (argmin-RMSECV)
model under the null that the two have the same predictive ability.
For each observation the paired difference of squared residuals
``D_i = sse[a, i] - sse[a*, i]`` is computed; under the null its sign
is random, so the permutation distribution of ``T = sum_i D_i`` is
obtained by flipping each ``D_i``'s sign with probability 1/2 over
``n_permutations`` draws. The *p*-value is the right-tail probability
of seeing a sum as large as the observed one (``T_obs >= T_perm``);
the recommendation is the smallest ``a`` whose ``p > alpha`` -
statistically indistinguishable from the reference, but more
parsimonious.
Parameters
----------
per_obs_sse : np.ndarray of shape (n_components, n_samples)
Out-of-fold per-observation squared total residual at every
component count, summed across Y columns. Rows that are NaN
(observation never held out) are dropped.
total_rmsecv : np.ndarray of shape (n_components,)
Pooled total RMSECV per component count; used to pick the
reference model ``a*`` = ``nanargmin(total_rmsecv) + 1``.
n_permutations : int, default 999
Number of sign-flip permutations.
alpha : float, default 0.01
Significance level. Smaller values are more parsimonious.
random_state : int, optional
Seed for reproducible permutations.
Returns
-------
recommended : int
Smallest 1-based component count with ``p > alpha``.
p_values : np.ndarray of shape (n_components,)
Right-tail *p*-value per candidate; the reference model gets
``1.0`` by construction (paired differences are all zero).
References
----------
Van der Voet, H. (1994). Comparing the predictive accuracy of
models using a simple randomization test. *Chemom. Intell. Lab.
Syst.*, 25(2), 313-323.
"""
a_count = per_obs_sse.shape[0]
a_ref = int(np.nanargmin(total_rmsecv))
rng = np.random.default_rng(random_state)
p_values = np.zeros(a_count)
p_values[a_ref] = 1.0
sse_ref = per_obs_sse[a_ref]
for a in range(a_count):
if a == a_ref:
continue
d = per_obs_sse[a] - sse_ref
# Drop observations with NaN (a custom splitter may have left some
# rows unheld), since the paired difference is undefined there.
d = d[np.isfinite(d)]
if d.size == 0:
p_values[a] = 1.0
continue
t_obs = float(d.sum())
signs = rng.choice([-1.0, 1.0], size=(n_permutations, d.size))
t_perm = (signs * d).sum(axis=1)
# Right-tail probability under the null. Add 1 to both numerator
# and denominator (the "permutation test +1" correction) so the
# p-value is strictly positive even at the extreme.
p_values[a] = float((np.sum(t_perm >= t_obs) + 1) / (n_permutations + 1))
recommended = a_ref + 1 # fall back to the reference if nothing qualifies
for a in range(a_count):
if p_values[a] > alpha:
recommended = a + 1
break
return recommended, p_values
[docs]
class PLS(_LatentVariableModel, RegressorMixin, TransformerMixin, BaseEstimator):
"""Projection to Latent Structures (PLS) regression with diagnostics.
Implements PLS via the NIPALS algorithm with production diagnostics: SPE,
Hotelling's T², score contributions, and outlier detection. The API mirrors
:class:`PCA` so that ``model.scores_``, ``model.spe_``, and
``model.detect_outliers()`` work identically for both model types.
Parameters
----------
n_components : int
Number of latent components to extract.
scale : bool, default=True
Whether to scale X and Y to unit variance (sklearn internal scaling).
When using ``MCUVScaler`` externally, set ``scale=False`` to avoid
double scaling.
max_iter : int, default=1000
Maximum number of iterations for the NIPALS algorithm.
tol : float, default=sqrt(machine epsilon)
Convergence tolerance for the NIPALS algorithm.
copy : bool, default=True
Whether to copy X and Y before fitting.
missing_data_settings : dict or None, default=None
Settings for missing data algorithms (NIPALS/TSR for PLS).
Keys: ``md_method`` (``"tsr"``, ``"scp"``, ``"nipals"``),
``md_tol``, ``md_max_iter``.
Attributes (after fitting)
--------------------------
scores_ : pd.DataFrame of shape (n_samples, n_components)
X-block score matrix (T). This is the primary score matrix; equivalent
to ``x_scores`` in older versions.
y_scores_ : pd.DataFrame of shape (n_samples, n_components)
Y-block score matrix (U).
x_loadings_ : pd.DataFrame of shape (n_features, n_components)
X-block loading matrix (P).
y_loadings_ : pd.DataFrame of shape (n_targets, n_components)
Y-block loading matrix (C).
x_weights_ : pd.DataFrame of shape (n_features, n_components)
X-block weight matrix (W).
y_weights_ : pd.DataFrame of shape (n_targets, n_components)
Y-block weight matrix.
direct_weights_ : pd.DataFrame of shape (n_features, n_components)
Direct (W*) weights: ``W (P'W)^{-1}``. Used for direct projection
``T = X @ W*``.
beta_coefficients_ : pd.DataFrame of shape (n_features, n_targets)
Regression coefficients linking X directly to Y.
predictions_ : pd.DataFrame of shape (n_samples, n_targets)
Y predictions from the training data.
spe_ : pd.DataFrame of shape (n_samples, n_components)
Per-row SPE diagnostic; stored as the square root of the row
sum-of-squared X-residuals (so it is on the residual scale, not the
squared scale).
hotellings_t2_ : pd.DataFrame of shape (n_samples, n_components)
Cumulative Hotelling's T² statistic.
r2_per_component_ : pd.Series of length n_components
Fractional R² (on Y) explained by each component.
r2_cumulative_ : pd.Series of length n_components
Cumulative R² (on Y) after each component.
r2_per_variable_ : pd.DataFrame of shape (n_features, n_components)
Per-variable cumulative R² for X after each component.
r2y_per_variable_ : pd.DataFrame of shape (n_targets, n_components)
Per-variable R² for Y after each component.
rmse_ : pd.DataFrame of shape (n_targets, n_components)
Root mean squared error of Y predictions per component.
explained_variance_ : np.ndarray of shape (n_components,)
Variance explained by each component in X.
scaling_factor_for_scores_ : pd.Series of length n_components
Standard deviation per score (sqrt of explained variance).
has_missing_data_ : bool
Whether the training data contained missing values.
fitting_info_ : dict
Timing and iteration info from the fitting algorithm.
See Also
--------
PCA : Principal Component Analysis.
MCUVScaler : Mean-center unit-variance scaler.
References
----------
Abdi, "Partial least squares regression and projection on latent structure
regression (PLS Regression)", 2010, DOI: 10.1002/wics.51
Examples
--------
>>> import pandas as pd
>>> from process_improve.multivariate.methods import PLS, MCUVScaler
>>> X = pd.DataFrame({"A": [1, 2, 3, 4], "B": [4, 3, 2, 1]})
>>> Y = pd.DataFrame({"y": [2.1, 3.9, 6.2, 7.8]})
>>> pls = PLS(n_components=1)
>>> pls = pls.fit(MCUVScaler().fit_transform(X), MCUVScaler().fit_transform(Y))
>>> pls.scores_.shape
(4, 1)
"""
def __init__( # noqa: PLR0913
self,
n_components: int,
*,
scale: bool = True,
max_iter: int = 1000,
tol: float = epsqrt,
copy: bool = True,
# Own extra inputs, for the case when there is missing data
missing_data_settings: dict | None = None,
):
self.n_components: int = n_components
self.scale = scale
self.max_iter = max_iter
self.tol = tol
self.copy = copy
self.missing_data_settings = missing_data_settings
def __sklearn_tags__(self):
"""Declare sklearn capability tags (sklearn 1.6+).
- ``input_tags.allow_nan=True`` because the NIPALS fit threads
missing data through; ``fit`` and ``predict`` both pass
``ensure_all_finite="allow-nan"`` to ``validate_data``.
- ``target_tags.multi_output=True`` because the X / Y blocks
can both be multi-column (multi-target Y is the default
chemometric PLS case).
"""
tags = super().__sklearn_tags__()
tags.input_tags.allow_nan = True
tags.target_tags.multi_output = True
return tags
[docs]
def get_feature_names_out(self, input_features=None) -> np.ndarray: # noqa: ANN001, ARG002
"""Return the output column names of :meth:`transform`.
:class:`PLS`'s ``transform`` returns the X scores (T matrix),
labelled ``["T1", "T2", ..., "T{n_components}"]``. The
``input_features`` argument is accepted (Pipeline introspection
passes it through) but unused: the output column count is the
fitted ``n_components``, not the input feature count.
Used by :meth:`set_output` (sklearn 1.2+) to label the
:class:`~pandas.DataFrame` view of the scores when
``set_output(transform="pandas")`` is on, and by Pipeline
introspection.
"""
check_is_fitted(self, "direct_weights_")
return np.asarray([f"T{a}" for a in self._component_names])
# ENG-17: the 13 shared convenience methods, hotellings_t2_limit,
# ellipse_coordinates and the rename __getattr__ are inherited from
# _LatentVariableModel. PLS keeps only its two PLS-specific plot methods and
# supplies its own rename map.
predictions_vs_observed_plot = _model_method(_predictions_vs_observed_plot)
coefficient_plot = _model_method(_coefficient_plot)
_ATTRIBUTE_RENAMES: typing.ClassVar[dict[str, str]] = {
"x_scores": "scores_",
"y_scores": "y_scores_",
"x_weights": "x_weights_",
"y_weights": "y_weights_",
"x_loadings": "x_loadings_",
"y_loadings": "y_loadings_",
"direct_weights": "direct_weights_",
"beta_coefficients": "beta_coefficients_",
"predictions": "predictions_",
"squared_prediction_error": "spe_",
"hotellings_t2": "hotellings_t2_",
"R2": "r2_per_component_",
"R2cum": "r2_cumulative_",
"R2X_cum": "r2_per_variable_",
"R2Y_cum": "r2y_per_variable_",
"RMSE": "rmse_",
"explained_variance": "explained_variance_",
"scaling_factor_for_scores": "scaling_factor_for_scores_",
"extra_info": "fitting_info_",
"has_missing_data": "has_missing_data_",
"N": "n_samples_",
"K": "n_features_in_",
"M": "n_targets_",
"A": "n_components",
}
_RENAME_CONTEXT: typing.ClassVar[str] = "PLS"
# Y-side fitted attributes: ndarrays while NIPALS fills them in, then wrapped
# into the documented public DataFrames at the end of fit().
y_scores_: np.ndarray | pd.DataFrame
y_weights_: np.ndarray | pd.DataFrame
y_loadings_: np.ndarray | pd.DataFrame
# Fitted diagnostics: per-component arrays or scalar totals.
fitting_info_: dict[str, np.ndarray | int | float]
# ENG-18: public DataFrame views built lazily from the private ndarrays.
scores_ = _LazyFrame("_scores", index="_sample_index", columns="_component_names")
spe_ = _LazyFrame("_spe", index="_sample_index", columns="_component_names")
x_loadings_ = _LazyFrame("_x_loadings", index="_feature_names", columns="_component_names")
x_weights_ = _LazyFrame("_x_weights", index="_feature_names", columns="_component_names")
def _fit_nipals( # noqa: PLR0915, C901
self,
X: DataMatrix,
Y: DataMatrix,
A: int,
settings: dict,
sample_weight: np.ndarray | None = None,
) -> None:
"""Fit PLS via the NIPALS algorithm, handling missing data transparently.
Parameters
----------
X : DataMatrix
Training X data (N x K).
Y : DataMatrix
Training Y data (N x M).
A : int
Number of components to extract.
settings : dict
Algorithm settings with keys ``md_method``, ``md_tol``, ``md_max_iter``.
sample_weight : np.ndarray of shape (N,), optional
Row weights, non-negative finite floats. When provided, X and Y
are ``sqrt(w)``-rescaled at NIPALS entry so the cross-products
``X' u`` and ``Y' t`` become weighted (see #394). Loadings,
weights, and beta are invariant under the rescale; scores are
restored to the original sample scale at the end via
``X @ direct_weights`` (cleaner than dividing by ``sqrt(w)``,
which would NaN at zero-weight rows).
"""
N = self.n_samples_
K = self.n_features_in_
M = self.n_targets_
md_method = settings.get("md_method", "nipals").lower()
if md_method == "tsr":
raise NotImplementedError("TSR for PLS not implemented yet")
if md_method == "pmp":
raise NotImplementedError("PMP for PLS not implemented yet")
Xd = np.asarray(X, dtype=float).copy()
Yd = np.asarray(Y, dtype=float).copy()
# Weighted fit (#394): sqrt(w)-rescale X and Y up front. The NIPALS
# cross-products X' u, Y' t, X' X etc. all become weighted sums; the
# inner loop is otherwise unchanged. We keep X_orig / Y_orig around
# so we can rebuild original-scale scores after the loop (the
# alternative -- dividing the NIPALS-recovered scores by sqrt(w) --
# NaNs out at zero-weight rows, which the test plan explicitly
# requires to behave like "fit on the remaining half").
X_orig: np.ndarray | None = None
Y_orig: np.ndarray | None = None
if sample_weight is not None:
sqrt_w = np.sqrt(sample_weight).reshape(-1, 1)
X_orig = Xd.copy()
Y_orig = Yd.copy()
Xd = Xd * sqrt_w
Yd = Yd * sqrt_w
self._scores = np.zeros((N, A))
self.y_scores_ = np.zeros((N, A))
self._x_weights = np.zeros((K, A))
self.y_weights_ = np.zeros((M, A))
self._x_loadings = np.zeros((K, A))
self.y_loadings_ = np.zeros((M, A))
self.fitting_info_ = {
"timing": np.zeros(A) * np.nan,
"iterations": np.zeros(A) * np.nan,
}
for a in range(A):
start_time = time.time()
itern = 0
start_SSX_col = ssq(Xd, axis=0)
start_SSY_col = ssq(Yd, axis=0)
if np.sum(start_SSX_col) < epsqrt:
emsg = (
"There is no variance left in the data array for X: cannot "
f"compute any more components beyond component {a}."
)
raise NotEnoughVarianceError(emsg)
if np.sum(start_SSY_col) < epsqrt:
emsg = (
"There is no variance left in the data array for Y: cannot "
f"compute any more components beyond component {a}."
)
raise NotEnoughVarianceError(emsg)
# Seed u_a from the column of Y with the greatest sum-of-squares
# (variance, for mean-centred data) rather than the arbitrary first
# column (#195): NIPALS converges to the same component for any
# non-degenerate seed, but the highest-variance column needs fewer
# iterations and is more robust. The deterministic sign convention
# applied below makes the fitted sign independent of this seed.
# (Replace NaN with 0 for the missing-data path.)
start_col = int(np.argmax(start_SSY_col))
u_a_guess = Yd[:, [start_col]].copy()
u_a_guess[np.isnan(u_a_guess)] = 0
u_a = u_a_guess + 1.0
while not terminate_check(u_a_guess, u_a, iterations=itern, settings=settings):
u_a_guess = u_a.copy()
# 1: w_a = X'u_a / (u_a'u_a)
w_a = quick_regress(Xd, u_a)
# 2: Normalize w_a to unit length
w_a = w_a / np.sqrt(ssq(w_a))
# 3: t_a = X w_a / (w_a'w_a)
t_a = quick_regress(Xd, w_a)
# 4: c_a = Y't_a / (t_a't_a)
c_a = quick_regress(Yd, t_a)
# 5: u_a = Y c_a / (c_a'c_a)
u_a = quick_regress(Yd, c_a)
itern += 1
timing_arr = typing.cast("np.ndarray", self.fitting_info_["timing"])
iterations_arr = typing.cast("np.ndarray", self.fitting_info_["iterations"])
timing_arr[a] = time.time() - start_time
iterations_arr[a] = itern
logger.debug(
"PLS NIPALS: component %d converged in %d iterations (md_tol=%g)",
a + 1,
itern,
settings["md_tol"],
)
if itern > settings["md_max_iter"]:
warnings.warn(
"PLS NIPALS: maximum number of iterations reached!",
SpecificationWarning,
stacklevel=2,
)
# 6: Compute loadings and deflate
p_a = quick_regress(Xd, t_a)
Xd = Xd - np.dot(t_a, p_a.T)
Yd = Yd - np.dot(t_a, c_a.T)
# Flip signs so largest-magnitude loading element is positive
max_el_idx = np.argmax(np.abs(p_a))
if np.sign(p_a[max_el_idx]) < 1:
t_a *= -1.0
u_a *= -1.0
w_a *= -1.0
p_a *= -1.0
c_a *= -1.0
self._scores[:, a] = t_a.flatten()
self.y_scores_[:, a] = u_a.flatten()
self._x_weights[:, a] = w_a.flatten()
self._x_loadings[:, a] = p_a.flatten()
self.y_loadings_[:, a] = c_a.flatten()
# In PLS mode A (PLSRegression), y_weights == y_loadings
self.y_weights_[:, a] = c_a.flatten()
# Weighted fit post-fix (#394): the NIPALS scores stored above are on
# the sqrt(w)-rescaled rows (T_w = sqrt(W) @ T_orig). Recover
# original-scale scores by re-projecting the unweighted X / Y via the
# converged loadings: T = X @ W (P'W)^{-1}, the same identity that
# makes direct_weights_ meaningful. This rebuild correctly handles
# zero-weight rows (which NIPALS leaves as identically zero, but
# whose natural projected score is X[i] @ direct_weights).
if sample_weight is not None:
assert X_orig is not None
assert Y_orig is not None
direct_weights = self._x_weights @ safe_inverse(
self._x_loadings.T @ self._x_weights, what="(x_loadings' @ x_weights)"
)
self._scores = X_orig @ direct_weights
# y_scores are diagnostic only; the deflation identity
# u_a = Y_a c_a / (c_a' c_a) gives the natural y-side scores
# using the unweighted (deflated) Y.
Y_deflated = Y_orig.copy()
for a in range(A):
c_a = self.y_loadings_[:, [a]]
denom = float((c_a.T @ c_a).item())
if denom > 0:
self.y_scores_[:, a] = (Y_deflated @ c_a / denom).flatten()
Y_deflated = Y_deflated - (self._scores[:, [a]] @ c_a.T)
[docs]
def fit( # noqa: PLR0912, PLR0915, C901
self, X: DataMatrix, Y: DataMatrix, sample_weight: np.ndarray | None = None,
) -> PLS:
"""
Fit a projection to latent structures (PLS) model to the data.
Parameters
----------
X : array-like, shape (n_samples, n_features)
Training data, where `n_samples` is the number of samples (rows)
and `n_features` is the number of features (columns).
Y : array-like, shape (n_samples, n_targets)
Training data, where `n_samples` is the number of samples (rows)
and `n_targets` is the number of target outputs (columns).
sample_weight : array-like of shape (n_samples,), optional
Non-negative row weights for a weighted PLS fit (#394). NIPALS
is run on ``sqrt(w)``-rescaled X and Y, which is equivalent to
weighting the cross-products ``X' W u`` and ``Y' W t``. Loadings,
weights and beta are computed correctly; scores are returned on
the original sample scale. Zero weights effectively exclude the
corresponding rows (``sample_weight=[1,1,0,0,1]`` reproduces the
unweighted fit on rows ``[0,1,4]``). Forwarded to ``score()`` /
``r2_score`` for any caller that also threads it through.
Returns
-------
PLS
Model object.
References
----------
Abdi, "Partial least squares regression and projection on latent structure
regression (PLS Regression)", 2010, DOI: 10.1002/wics.51
"""
# Accept 1-D Y (the shape sklearn Pipelines pass for single-target
# regression) by promoting to (N, 1) so the rest of fit can rely on
# the 2-D shape.
if hasattr(Y, "ndim") and Y.ndim == 1:
Y = Y.to_frame() if isinstance(Y, pd.Series) else pd.DataFrame(np.asarray(Y).reshape(-1, 1))
# Validate sample_weight up front so the error path is the same
# regardless of whether the caller passes ndarray or list / scalar.
# We defer the row-count check until after validate_data has set
# n_samples_.
if sample_weight is not None:
sample_weight = np.asarray(sample_weight, dtype=float).ravel()
if not np.all(np.isfinite(sample_weight)):
raise ValueError("sample_weight must be finite (no NaN / inf).")
if np.any(sample_weight < 0):
raise ValueError("sample_weight must be non-negative.")
# Capture DataFrame metadata before validate_data converts X to ndarray
# so the downstream DataFrame view keeps its row/column labels.
sample_index: pd.Index | None = X.index if isinstance(X, pd.DataFrame) else None
feature_columns: pd.Index | None = X.columns if isinstance(X, pd.DataFrame) else None
X_arr = validate_data(
self,
X,
reset=True,
accept_sparse=False,
ensure_min_samples=2,
ensure_min_features=1,
dtype="numeric",
ensure_all_finite="allow-nan",
)
if feature_columns is None:
feature_columns = pd.RangeIndex(X_arr.shape[1]) # type: ignore[assignment]
if sample_index is None:
sample_index = pd.RangeIndex(X_arr.shape[0]) # type: ignore[assignment]
X = pd.DataFrame(X_arr, index=sample_index, columns=feature_columns)
if not isinstance(Y, pd.DataFrame):
Y = pd.DataFrame(Y, index=sample_index)
self.n_samples_: int = X.shape[0]
# n_features_in_ is set by validate_data; reassert for clarity.
self.n_features_in_: int = X.shape[1]
# Fitted flag, defaulted here (not in __init__) so __init__ sets only the
# constructor parameters, per sklearn convention (ENG-07). _fit_nipals
# flips it to True if missing data is detected.
self.has_missing_data_ = False
Ny: int = Y.shape[0]
self.n_targets_: int = Y.shape[1]
if Ny != self.n_samples_:
raise ValueError(
f"The X and Y arrays must have the same number of rows: X has {self.n_samples_} and Y has {Ny}."
)
if sample_weight is not None and sample_weight.shape[0] != self.n_samples_:
raise ValueError(
f"sample_weight has {sample_weight.shape[0]} entries; expected {self.n_samples_} to match X / Y."
)
N = self.n_samples_
K = self.n_features_in_
M = self.n_targets_
# The remainder of fit() uses the pandas DataFrame API (.isna / .index /
# .columns); the NIPALS core in _fit_nipals already np.asarray-copies the
# numeric values. Narrow the static type accordingly (the DataFrame path
# is unchanged at runtime).
assert isinstance(X, pd.DataFrame)
assert isinstance(Y, pd.DataFrame)
# Check if number of components is supported against maximum requested
min_dim = min(N, K)
A = min_dim if self.n_components is None else int(self.n_components)
if min_dim < A:
warn = (
"The requested number of components is more than can be "
"computed from data. The maximum number of components is "
f"the minimum of either the number of rows ({N}) or "
f"the number of columns ({K})."
)
warnings.warn(warn, SpecificationWarning, stacklevel=2)
A = self.n_components = min_dim
if np.any(Y.isna()) or np.any(X.isna()):
self.has_missing_data_ = True
# Default to the NIPALS path because TSR / PMP for PLS are still
# NotImplementedError in _fit_nipals; NIPALS handles per-cell NaN
# directly via skipna sums inside the NIPALS iterations.
default_mds = dict(md_method="nipals", md_tol=epsqrt, md_max_iter=self.max_iter)
if isinstance(self.missing_data_settings, dict):
default_mds.update(self.missing_data_settings)
self.missing_data_settings = default_mds
settings = self.missing_data_settings or {
"md_method": "nipals",
"md_tol": self.tol,
"md_max_iter": self.max_iter,
}
self._fit_nipals(X, Y, A, settings, sample_weight=sample_weight)
# --- Common post-fit path: wrap numpy arrays into pandas ---
# R = W(P'W)^{-1} [KxA]; useful since T = XR
direct_weights = self._x_weights @ safe_inverse(
self._x_loadings.T @ self._x_weights, what="(x_loadings' @ x_weights)"
)
# beta = RC' [KxM]: direct link from k-th X variable to m-th Y variable
beta_coefficients = direct_weights @ self.y_loadings_.T
component_names = list(range(1, A + 1))
# ENG-18: scores_ / x_weights_ / x_loadings_ / spe_ are stored as private
# ndarrays (the source of truth); their public DataFrame views are built
# lazily by the _LazyFrame descriptors from the metadata attrs below.
self._sample_index = X.index
self._feature_names = X.columns
self._component_names = component_names
self.y_scores_ = pd.DataFrame(self.y_scores_, index=Y.index, columns=component_names)
self.y_weights_ = pd.DataFrame(self.y_weights_, index=Y.columns, columns=component_names)
self.y_loadings_ = pd.DataFrame(self.y_loadings_, index=Y.columns, columns=component_names)
self.predictions_ = pd.DataFrame(self._scores @ self.y_loadings_.values.T, index=Y.index, columns=Y.columns)
self.direct_weights_ = pd.DataFrame(direct_weights, index=X.columns, columns=component_names)
self.beta_coefficients_ = pd.DataFrame(beta_coefficients, index=X.columns, columns=Y.columns)
# ``max(1, N-1)`` -- see SEC-21 (#270) sub-item 6.
self.explained_variance_ = np.diag(self._scores.T @ self._scores) / max(1, N - 1)
self.scaling_factor_for_scores_ = pd.Series(
np.sqrt(self.explained_variance_),
index=component_names,
name="Standard deviation per score",
)
self.hotellings_t2_ = pd.DataFrame(
np.zeros(shape=(N, A)),
columns=component_names,
index=X.index.copy(),
)
self._spe = np.zeros((N, A))
self.r2_per_component_ = pd.Series(
np.zeros(shape=(A)),
index=component_names,
name="Output R² per component",
)
self.r2_cumulative_ = pd.Series(
np.zeros(shape=(A)),
index=component_names,
name="Output cumulative R²",
)
self.r2_per_variable_ = pd.DataFrame(
np.zeros(shape=(K, A)),
index=X.columns.copy(),
columns=component_names,
)
self.r2y_per_variable_ = pd.DataFrame(
np.zeros(shape=(M, A)),
index=Y.columns.copy(),
columns=component_names,
)
self.rmse_ = pd.DataFrame(
np.zeros(shape=(M, A)),
index=Y.columns.copy(),
columns=component_names,
)
Xd = X.copy()
Yd = Y.copy()
prior_SSX_col = ssq(Xd.values, axis=0)
prior_SSY_col = ssq(Yd.values, axis=0)
base_variance_Y = np.sum(prior_SSY_col)
for a in range(A):
self.hotellings_t2_.iloc[:, a] = (
self.hotellings_t2_.iloc[:, max(0, a - 1)]
+ (self.scores_.iloc[:, a] / self.scaling_factor_for_scores_.iloc[a]) ** 2
)
Xd = Xd - self.scores_.iloc[:, [a]] @ self.x_loadings_.iloc[:, [a]].T
y_hat = self.scores_.iloc[:, 0 : (a + 1)] @ self.y_loadings_.iloc[:, 0 : (a + 1)].T
row_SSX = ssq(Xd.values, axis=1)
col_SSX = ssq(Xd.values, axis=0)
row_SSY = ssq(y_hat.values, axis=1)
col_SSY = ssq(y_hat.values, axis=0)
self.r2_cumulative_.iloc[a] = np.sum(row_SSY) / base_variance_Y
if a > 0:
self.r2_per_component_.iloc[a] = self.r2_cumulative_.iloc[a] - self.r2_cumulative_.iloc[a - 1]
else:
self.r2_per_component_.iloc[a] = self.r2_cumulative_.iloc[a]
self._spe[:, a] = np.sqrt(row_SSX)
# Per-variable R^2 is undefined for a column with no variance to
# explain; emit NaN there. SEC-21 (#270) sub-item 4.
self.r2_per_variable_.iloc[:, a] = np.where(
prior_SSX_col > 0, 1 - col_SSX / np.where(prior_SSX_col > 0, prior_SSX_col, 1.0), np.nan
)
self.r2y_per_variable_.iloc[:, a] = np.where(
prior_SSY_col > 0, col_SSY / np.where(prior_SSY_col > 0, prior_SSY_col, 1.0), np.nan
)
residuals_y = Yd.to_numpy() - y_hat.to_numpy()
self.rmse_.iloc[:, a] = np.sqrt(np.mean(residuals_y**2, axis=0))
return self
[docs]
def score(self, X: DataMatrix, Y: DataMatrix, sample_weight: np.ndarray | None = None) -> float:
"""Return the R² score for the prediction.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Y : array-like of shape (n_samples, n_targets)
True target values.
sample_weight : array-like of shape (n_samples,), optional
Returns
-------
score : float
R² of ``self.predict(X)`` w.r.t. *Y*.
"""
y_pred = self.predict(X)
return float(r2_score(Y, y_pred, sample_weight=sample_weight))
[docs]
def predict(self, X: DataMatrix) -> pd.DataFrame:
"""Predict Y for new observations.
Returns just the predicted ``y_hat`` so the call satisfies the
scikit-learn :class:`~sklearn.base.RegressorMixin` contract (and
therefore composes inside :class:`~sklearn.pipeline.Pipeline`,
:func:`~sklearn.model_selection.cross_val_score`, and
:class:`~sklearn.model_selection.GridSearchCV`). For the rich
diagnostic view (scores, Hotelling's T², SPE, plus ``y_hat``),
see :meth:`diagnose`.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Returns
-------
y_hat : pd.DataFrame of shape (n_samples, n_targets)
Predicted target values, indexed by ``X``'s rows and labelled
with the target column names captured during ``fit``.
See Also
--------
diagnose : richer per-prediction diagnostics.
Examples
--------
>>> y_pred = pls.predict(X_new)
>>> diag = pls.diagnose(X_new) # for scores / T² / SPE
"""
return self.diagnose(X).y_hat
[docs]
def diagnose(self, X: DataMatrix) -> Bunch:
"""Project new data and compute predictions plus diagnostics.
This is the rich view that :meth:`predict` used to return before
1.35.0: alongside ``y_hat`` it reports the X scores, cumulative
Hotelling's T², and SPE for every row of ``X`` so the user can
flag out-of-model observations *and* read their predicted Y from
one call.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Returns
-------
result : sklearn.utils.Bunch
With keys ``scores``, ``hotellings_t2``, ``spe``, ``y_hat``.
See Also
--------
predict : sklearn-compatible call returning just ``y_hat``.
Examples
--------
>>> result = pls.diagnose(scaler_x.transform(X_new))
>>> result.y_hat # Predicted Y values
>>> result.spe # SPE for each new observation
>>> result.hotellings_t2 # T² for each new observation
"""
check_is_fitted(self, "scores_")
# Realign reordered DataFrame columns before validate_data.
if isinstance(X, pd.DataFrame):
X = _align_to_fit_features(X, self._feature_names)
sample_index: pd.Index | None = X.index if isinstance(X, pd.DataFrame) else None
feature_columns: pd.Index | None = X.columns if isinstance(X, pd.DataFrame) else None
X_arr = validate_data(
self,
X,
reset=False,
accept_sparse=False,
dtype="numeric",
ensure_all_finite="allow-nan",
)
if feature_columns is None:
feature_columns = self._feature_names
if sample_index is None:
sample_index = pd.RangeIndex(X_arr.shape[0]) # type: ignore[assignment]
X = pd.DataFrame(X_arr, index=sample_index, columns=feature_columns)
scores = X @ self.direct_weights_
# Hotelling's T² (cumulative over all components)
t2_values = np.sum(np.power((scores / self.scaling_factor_for_scores_.to_numpy()), 2), axis=1)
t2 = pd.Series(t2_values, index=X.index, name="Hotelling's T²")
# SPE: residual after X reconstruction
X_hat = scores @ self._x_loadings.T
residuals = X - X_hat
spe_values = pd.Series(np.sqrt(np.power(residuals, 2).sum(axis=1)), index=X.index, name="SPE")
# Y predictions
y_hat = scores @ self.y_loadings_.T
return Bunch(scores=scores, hotellings_t2=t2, spe=spe_values, y_hat=y_hat)
[docs]
@classmethod
def select_n_components( # noqa: C901, PLR0912, PLR0913, PLR0915
cls,
X: DataMatrix,
Y: DataMatrix,
*,
max_components: int | None = None,
cv: int | BaseCrossValidator = 5,
n_repeats: int | None = None,
random_state: int | None = None,
selection_rule: SelectionRule = "1se",
scale_inside_folds: bool = True,
min_q2_increase: float = Q2_MIN_INCREMENT,
n_permutations: int = 999,
alpha: float = 0.01,
stability_threshold: float = 0.6,
**pls_kwargs,
) -> Bunch:
"""Select the number of PLS components via cross-validation.
Fits PLS models on cross-validation training folds and evaluates the
out-of-fold prediction error for every component count
``1, 2, ..., max_components``. Reports per-fold and pooled RMSECV plus
the validated cumulative R² curves, and recommends a component count
from one of three rules (see ``selection_rule`` below).
The defaults are the research-backed combination: the
one-standard-error rule on top of repeated, shuffled K-fold CV, with
:class:`MCUVScaler` re-fit inside every training fold so test data
never leaks into the centring/scaling estimates.
Unlike the calibration statistics stored on a fitted model
(``rmse_``, ``r2_cumulative_``), the metrics returned here estimate
performance on unseen data and are therefore suitable for choosing
``n_components``.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Training X. With the default ``scale_inside_folds=True`` the raw,
unscaled X may be passed; scaling is fit inside every training
fold.
Y : array-like of shape (n_samples, n_targets)
Training Y. Same treatment as ``X`` under ``scale_inside_folds``.
max_components : int, optional
Maximum number of components to evaluate. Default is the largest
value supported by every cross-validation training fold,
``min(min_fold_size, n_features)``.
cv : int or sklearn CV splitter, default 5
If an integer, used as the ``n_splits`` of a shuffled
:class:`~sklearn.model_selection.KFold` (or
:class:`~sklearn.model_selection.RepeatedKFold` when
``n_repeats > 1``). Any sklearn splitter object (e.g.
``KFold(10, shuffle=True)`` or ``LeaveOneOut()``) is also accepted
and is used as-is (``n_repeats`` is then ignored).
n_repeats : int, optional
Number of times the K-fold split is repeated with a fresh shuffle,
used only when ``cv`` is an integer. Default ``10`` (giving a
``cv * 10`` per-fold sample for the 1-SE rule); pass ``1`` to
disable repeats. Repeated K-fold's standard errors are slightly
optimistic because test folds overlap across repeats; that is fine
for the 1-SE *selection* rule but should not be reported as an
unbiased generalisation variance.
random_state : int, optional
Seed forwarded to ``KFold`` / ``RepeatedKFold`` for reproducible
shuffling. Ignored when ``cv`` is a pre-built splitter.
selection_rule : {"1se", "min", "q2_increment", "randomization"}, default "1se"
How the recommended component count is chosen.
See :data:`~process_improve.multivariate._common.SelectionRule`
for the rule semantics. ``"1se"`` is the default; ``"min"`` is the
argmin RMSECV (the pre-1.28 default, prone to running to the
maximum component count); ``"q2_increment"`` is the Wold's-R-style
cumulative-Q² threshold; ``"randomization"`` is Van der Voet's
(1994) permutation test (uses ``n_permutations`` and ``alpha``)
that picks the smallest model whose predictive ability is
statistically indistinguishable from the reference (argmin
RMSECV) one.
scale_inside_folds : bool, default True
When True (the default), fit a fresh :class:`MCUVScaler` on each
training fold's X and Y, apply it to the held-out rows, fit PLS in
scaled space, then inverse-transform the predictions so RMSECV is
reported on the original Y scale. This removes the centring /
scaling leakage of the prior default. Set to False to keep the
pre-1.28 behaviour, in which case ``X`` and ``Y`` should already
be scaled; a :class:`SpecificationWarning` is emitted.
min_q2_increase : float, default 0.01
Threshold used only when ``selection_rule="q2_increment"``: the
smallest increase in cumulative validated :math:`Q^2_Y` that
justifies keeping an extra component.
n_permutations : int, default 999
Used only when ``selection_rule="randomization"``: number of
sign-flip permutations driving the Van der Voet test.
alpha : float, default 0.01
Used only when ``selection_rule="randomization"``: significance
level. The smallest component count whose Van der Voet *p*-value
exceeds ``alpha`` is recommended. R's ``pls::selectNcomp`` uses
the same default; smaller values pick more parsimonious models.
stability_threshold : float, default 0.6
For the per-repeat stability-selection diagnostic (``"1se"`` /
``"min"`` rules with ``n_repeats > 1`` only): the recommendation
is judged ``selection_is_stable=True`` iff the modal vote share
in ``selection_distribution`` is at least this fraction.
Meinshausen & Bühlmann (2010, *JRSS-B*) suggest 0.6-0.9 for
their variable-selection analogue; we default to the
permissive end.
**pls_kwargs
Additional keyword arguments passed to the ``PLS()`` constructor
(e.g. ``missing_data_settings``).
Returns
-------
result : sklearn.utils.Bunch
With keys:
- ``n_components`` - recommended number of components (int).
- ``rmsecv`` - pooled RMSECV per component count
(pd.DataFrame, indexed ``1..A``; columns are the Y-variable
names plus ``"total"``).
- ``per_fold_rmsecv`` - per-fold total RMSECV (pd.DataFrame,
indexed ``1..A``; one column per fold across all repeats).
Drives the 1-SE rule.
- ``se_rmsecv`` - standard error of the per-fold RMSECV per
component count (pd.Series, indexed ``1..A``).
- ``q2_se`` - standard error on the Q2 scale (the per-fold total
PRESS standard error divided by the total Y sum-of-squares),
i.e. the half-width of a +/-1 SE band around
``r2y_validated["total"]`` (pd.Series, indexed ``1..A``).
- ``r2y_validated`` - validated cumulative :math:`R^2_Y`
(pd.DataFrame, same shape as ``rmsecv``).
- ``r2x_validated`` - validated cumulative :math:`R^2_X`
(pd.DataFrame, indexed ``1..A``; columns are the X-variable
names plus ``"total"``).
- ``press`` - pooled Y prediction error sum of squares per
component count (pd.Series, indexed ``1..A``).
- ``cv_predictions`` - out-of-fold predictions of Y at the
recommended component count, on the original Y scale
(pd.DataFrame). For repeated K-fold, the *first* repeat's
held-out predictions are reported so each row appears exactly
once.
- ``selection_rule`` - the rule used to pick ``n_components``.
- ``randomization_pvalues`` - per-component Van der Voet
right-tail *p*-values when ``selection_rule="randomization"``;
``None`` otherwise.
- ``selection_distribution`` - per-repeat *vote share* over
candidate component counts (pd.Series indexed ``1..A``).
Populated only for ``selection_rule in {"1se", "min"}`` and
``n_repeats > 1``; ``None`` otherwise. A concentrated
distribution signals a confident recommendation; a flat or
multi-modal one flags it for review.
- ``selection_mode`` - the most-voted component count, or
``None`` when ``selection_distribution`` is.
- ``selection_is_stable`` - ``True`` iff the modal vote share
meets ``stability_threshold``; ``None`` when no distribution
was computed.
Notes
-----
The pooled RMSECV in ``rmsecv["total"]`` is the square root of the
total PRESS over all fold-test rows divided by ``(N_eff * M)`` where
``N_eff = N * n_repeats`` under repeated CV; the ``per_fold_rmsecv``
column for fold *f* is the square root of fold-*f*'s sum-of-squared
residuals over its own test rows.
References
----------
Breiman, Friedman, Olshen & Stone (1984), *CART*, sec.3.4.3 (1-SE rule).
Hastie, Tibshirani & Friedman, *ESL*, sec.7.10. Kohavi (1995, IJCAI)
recommends 10-fold stratified CV for model selection.
Examples
--------
>>> from sklearn.model_selection import KFold
>>> # Default: 1-SE on 10 x 5-fold repeated CV with in-fold scaling.
>>> result = PLS.select_n_components(X, Y, max_components=6, random_state=0)
>>> result.n_components, result.selection_rule
>>> # Opt-in to the older argmin-RMSECV rule:
>>> PLS.select_n_components(X, Y, max_components=6, selection_rule="min")
>>> # Caller-supplied splitter (n_repeats is ignored here):
>>> PLS.select_n_components(X, Y, cv=KFold(10, shuffle=True, random_state=0))
"""
if not isinstance(X, pd.DataFrame):
X = pd.DataFrame(X)
if isinstance(Y, pd.Series):
Y = Y.to_frame()
elif not isinstance(Y, pd.DataFrame):
Y = pd.DataFrame(Y)
if not scale_inside_folds:
warnings.warn(
"scale_inside_folds=False leaks centring/scaling estimated on the "
"full dataset into every CV fold, making the reported RMSECV "
"optimistic. The default scale_inside_folds=True is preferred.",
SpecificationWarning,
stacklevel=2,
)
N, K = X.shape
M = Y.shape[1]
if isinstance(cv, int):
if cv < 2:
raise ValueError(f"cv must be >= 2 when given as an int; got {cv}.")
repeats = 10 if n_repeats is None else int(n_repeats)
if repeats < 1:
raise ValueError(f"n_repeats must be >= 1; got {repeats}.")
splitter: BaseCrossValidator
if repeats == 1:
splitter = KFold(n_splits=cv, shuffle=True, random_state=random_state)
else:
splitter = RepeatedKFold(n_splits=cv, n_repeats=repeats, random_state=random_state)
splits = list(splitter.split(X, Y))
first_repeat_fold_count = cv
else:
splits = list(check_cv(cv).split(X, Y))
first_repeat_fold_count = len(splits)
if not splits:
raise ValueError("The cross-validation splitter produced no folds.")
n_folds_total = len(splits)
min_train_size = min(len(train_idx) for train_idx, _ in splits)
# Centring inside a fold removes one DoF, so a globally-centred matrix
# restricted to (min_train_size) rows and re-centred has rank at most
# min_train_size - 1. Cap A accordingly when in-fold scaling is on.
upper = min(min_train_size - (1 if scale_inside_folds else 0), K)
if max_components is None:
max_components = upper
A = min(int(max_components), upper)
if A < 1:
raise ValueError("No components can be evaluated; the data or folds are too small.")
component_index = pd.Index(range(1, A + 1), name="n_components")
press_y = np.zeros((A, M))
press_x = np.zeros((A, K))
# Per-fold total-RMSECV across every fold-fit (n_folds_total columns).
# Drives the 1-SE rule's standard error.
per_fold_rmse = np.full((A, n_folds_total), np.nan)
# Per-fold total PRESS (sum of squared Y residuals over the fold), on the
# same linear scale as the Q2 normalisation; used to put a +/-1 SE band
# on the Q2 curve, mirroring PCA's se_press -> q2_se rescaling.
per_fold_press_total = np.full((A, n_folds_total), np.nan)
# Out-of-fold predictions: with repeated K-fold each row appears in
# multiple test folds, so populate only from the *first* repeat (which
# covers every row exactly once) to preserve the existing semantic that
# cv_predictions has one row per observation.
oof = np.full((A, N, M), np.nan)
x_columns = list(X.columns)
y_columns = list(Y.columns)
x_values = X.to_numpy()
y_values = Y.to_numpy()
for fold_idx, (train_idx, test_idx) in enumerate(splits):
X_train_raw = X.iloc[train_idx]
Y_train_raw = Y.iloc[train_idx]
X_test_raw = X.iloc[test_idx]
if scale_inside_folds:
scaler_x = MCUVScaler().fit(X_train_raw)
scaler_y = MCUVScaler().fit(Y_train_raw)
X_train = scaler_x.transform(X_train_raw)
Y_train = scaler_y.transform(Y_train_raw)
X_test_scaled = scaler_x.transform(X_test_raw).to_numpy()
y_centre = scaler_y.center_.to_numpy()
y_scale = scaler_y.scale_.to_numpy()
x_centre = scaler_x.center_.to_numpy()
x_scale = scaler_x.scale_.to_numpy()
else:
X_train = X_train_raw
Y_train = Y_train_raw
X_test_scaled = X_test_raw.to_numpy()
y_centre = np.zeros(M)
y_scale = np.ones(M)
x_centre = np.zeros(K)
x_scale = np.ones(K)
model = cls(n_components=A, **pls_kwargs).fit(X_train, Y_train)
scores_test = X_test_scaled @ model.direct_weights_.to_numpy()
# ``y_values`` and ``x_values`` are on the ORIGINAL (un-scaled)
# input data: RMSECV / r2y_validated / r2x_validated are all
# reported on the input scale.
y_test = y_values[test_idx]
x_test = x_values[test_idx]
n_test = len(test_idx)
y_loadings = typing.cast("pd.DataFrame", model.y_loadings_).to_numpy() # shape (M, A)
x_loadings = model.x_loadings_.to_numpy() # shape (K, A)
for a in range(1, A + 1):
y_hat_scaled = scores_test[:, :a] @ y_loadings[:, :a].T
x_hat_scaled = scores_test[:, :a] @ x_loadings[:, :a].T
# inverse-transform: x_scale[None, :] broadcasts (n_test, K).
y_hat = y_hat_scaled * y_scale + y_centre
x_hat = x_hat_scaled * x_scale + x_centre
residuals_y = y_test - y_hat
fold_press = float(np.nansum(residuals_y ** 2))
press_y[a - 1] += np.nansum(residuals_y ** 2, axis=0)
press_x[a - 1] += np.nansum((x_test - x_hat) ** 2, axis=0)
per_fold_rmse[a - 1, fold_idx] = np.sqrt(fold_press / max(1, n_test * M))
per_fold_press_total[a - 1, fold_idx] = fold_press
if fold_idx < first_repeat_fold_count:
oof[a - 1, test_idx, :] = y_hat
# With repeated CV the total PRESS sums residuals across n_repeats
# passes over every observation; divide by N_eff = N * n_repeats to
# keep the RMSECV scale comparable to a single-pass CV.
n_eff = N * max(1, n_folds_total // max(1, first_repeat_fold_count))
tss_y = np.nansum((y_values - np.nanmean(y_values, axis=0)) ** 2, axis=0)
tss_x = np.nansum((x_values - np.nanmean(x_values, axis=0)) ** 2, axis=0)
rmsecv = pd.DataFrame(
np.column_stack([np.sqrt(press_y / n_eff), np.sqrt(press_y.sum(axis=1) / (n_eff * M))]),
index=component_index,
columns=[*y_columns, "total"],
)
def _validated_r2(press: np.ndarray, tss: np.ndarray) -> np.ndarray:
per_var = np.where(tss > 0, 1.0 - press / (tss * max(1, n_folds_total // first_repeat_fold_count)), np.nan)
total = np.where(
tss.sum() > 0,
1.0 - press.sum(axis=1) / (tss.sum() * max(1, n_folds_total // first_repeat_fold_count)),
np.nan,
)
return np.column_stack([per_var, total])
r2y_validated = pd.DataFrame(
_validated_r2(press_y, tss_y),
index=component_index,
columns=[*y_columns, "total"],
)
r2x_validated = pd.DataFrame(
_validated_r2(press_x, tss_x),
index=component_index,
columns=[*x_columns, "total"],
)
press = pd.Series(press_y.sum(axis=1), index=component_index, name="PRESS")
per_fold_rmsecv = pd.DataFrame(
per_fold_rmse,
index=component_index,
columns=[f"fold_{i + 1}" for i in range(n_folds_total)],
)
# Standard error across folds (and repeats). ``ddof=1`` for the
# sample SE; with a single fold (n_folds_total == 1) the SE is NaN
# and the 1-SE rule degenerates to argmin gracefully.
with warnings.catch_warnings():
warnings.simplefilter("ignore", RuntimeWarning)
se_values = np.nanstd(per_fold_rmse, axis=1, ddof=1) / np.sqrt(
np.maximum(1, np.sum(~np.isnan(per_fold_rmse), axis=1))
)
se_rmsecv = pd.Series(se_values, index=component_index, name="SE(RMSECV)")
# Q2 standard error: the standard error of the per-fold total PRESS,
# rescaled by the same total Y sum-of-squares that normalises the
# validated Q2 (Q2_Y_total = 1 - PRESS / tss_y.sum()). This is the
# half-width of a +/-1 SE band around the ``r2y_validated["total"]``
# curve, computed the same way as PCA's ``q2_se``.
ss_y_total = float(tss_y.sum())
with warnings.catch_warnings():
warnings.simplefilter("ignore", RuntimeWarning)
se_press_total = np.nanstd(per_fold_press_total, axis=1, ddof=1) / np.sqrt(
np.maximum(1, np.sum(~np.isnan(per_fold_press_total), axis=1))
)
q2_se_values = se_press_total / ss_y_total if ss_y_total > 0 else se_press_total * np.nan
q2_se = pd.Series(q2_se_values, index=component_index, name="SE(Q2)")
# If every CV fold produced NaN (e.g. zero-variance Y per fold) the
# cross-validation never converged to anything we can judge. Raise so
# the caller knows, rather than silently returning ``1``.
# SEC-21 (#270) sub-item 8.
total_rmsecv = rmsecv["total"].to_numpy()
if np.all(np.isnan(total_rmsecv)):
raise RuntimeError(
"Cross-validation produced NaN total-RMSECV for every component count; "
"no recommendation can be made. Likely cause: a per-fold zero-variance Y "
"column, or every fold trivially degenerate."
)
# Per-observation squared total residual at every component count.
# Drives Van der Voet's randomization test; cheap to compute and
# otherwise diagnostic (rows of oof[a] that are NaN - typically
# because some custom splitter never held them out - are skipped via
# nansum).
per_obs_sse = np.nansum((y_values[None, :, :] - oof) ** 2, axis=2) # (A, N)
randomization_pvalues: pd.Series | None = None
if selection_rule == "randomization":
recommended, p_values = _vandervoet_randomization(
per_obs_sse,
total_rmsecv=total_rmsecv,
n_permutations=n_permutations,
alpha=alpha,
random_state=random_state,
)
randomization_pvalues = pd.Series(
p_values, index=component_index, name="p-value (Van der Voet)"
)
else:
recommended = _select_n_components(
selection_rule,
mean_error=total_rmsecv,
se_error=se_values,
q2_cumulative=r2y_validated["total"].to_numpy(),
min_q2_increase=min_q2_increase,
)
cv_predictions = pd.DataFrame(oof[recommended - 1], index=Y.index, columns=Y.columns)
# Stability selection: re-apply the chosen rule per repeat and
# tabulate how often each component count wins. A multi-modal or
# flat distribution flags the recommendation as low-confidence.
# Only meaningful when the rule operates on the per-fold RMSE
# curve (1se/min) and we ran more than one repeat; q2_increment
# and randomization don't decompose per repeat without more
# bookkeeping than they're worth at this stage.
n_repeats_effective = max(1, n_folds_total // max(1, first_repeat_fold_count))
selection_distribution: pd.Series | None = None
selection_mode: int | None = None
selection_is_stable: bool | None = None
if (
selection_rule in ("1se", "min")
and n_repeats_effective > 1
and not np.all(np.isnan(per_fold_rmse))
):
votes: list[int] = []
for r in range(n_repeats_effective):
cols = slice(r * first_repeat_fold_count, (r + 1) * first_repeat_fold_count)
fold_subset = per_fold_rmse[:, cols]
if np.all(np.isnan(fold_subset)):
continue
with warnings.catch_warnings():
warnings.simplefilter("ignore", RuntimeWarning)
mean_r = np.nanmean(fold_subset, axis=1)
se_r = np.nanstd(fold_subset, axis=1, ddof=1) / np.sqrt(
np.maximum(1, np.sum(~np.isnan(fold_subset), axis=1))
)
# The dispatcher needs q2_cumulative even for non-q2 rules;
# pass a dummy zeros array since 1se/min don't read it.
pick = _select_n_components(
selection_rule,
mean_error=mean_r,
se_error=se_r,
q2_cumulative=np.zeros_like(mean_r),
min_q2_increase=min_q2_increase,
)
votes.append(int(pick))
if votes:
counts = pd.Series(votes).value_counts().sort_index()
dist = (counts / counts.sum()).reindex(component_index, fill_value=0.0)
dist.name = "vote_share"
dist.index.name = "n_components"
selection_distribution = dist
selection_mode = int(dist.idxmax())
selection_is_stable = bool(dist.max() >= stability_threshold)
return Bunch(
n_components=recommended,
rmsecv=rmsecv,
per_fold_rmsecv=per_fold_rmsecv,
se_rmsecv=se_rmsecv,
q2_se=q2_se,
r2y_validated=r2y_validated,
r2x_validated=r2x_validated,
press=press,
cv_predictions=cv_predictions,
selection_rule=selection_rule,
randomization_pvalues=randomization_pvalues,
selection_distribution=selection_distribution,
selection_mode=selection_mode,
selection_is_stable=selection_is_stable,
)
[docs]
@classmethod
def nested_cv( # noqa: PLR0913, PLR0915
cls,
X: DataMatrix,
Y: DataMatrix,
*,
max_components: int | None = None,
outer_cv: int | BaseCrossValidator = 5,
inner_cv: int = 5,
n_inner_repeats: int = 10,
selection_rule: SelectionRule = "1se",
scale_inside_folds: bool = True,
min_q2_increase: float = Q2_MIN_INCREMENT,
n_permutations: int = 999,
alpha: float = 0.01,
random_state: int | None = None,
**pls_kwargs,
) -> Bunch:
"""Nested cross-validation for an honest PLS performance estimate.
Outer loop splits the data into outer-train / outer-test; the inner
loop runs :meth:`select_n_components` on the outer-train (with the
configured ``selection_rule`` over ``inner_cv * n_inner_repeats``
folds) to pick the component count; a final PLS is fit on the
outer-train at that count and used to predict the outer-test. The
accumulated out-of-fold predictions give RMSEP that is *not*
optimism-biased by the selection decision - the headline number to
report when a clean test set is not available.
Parameters
----------
X, Y : array-like
Training data. Treated as in :meth:`select_n_components` (raw
if ``scale_inside_folds=True``, pre-scaled otherwise).
max_components : int, optional
Forwarded to the inner :meth:`select_n_components`.
outer_cv : int or sklearn splitter, default 5
Number of outer folds (or a custom splitter).
inner_cv : int, default 5
Number of inner folds passed to :meth:`select_n_components`.
n_inner_repeats : int, default 10
Number of inner-CV repeats per outer fold; the inner
``random_state`` is offset by the outer-fold index so each
outer fold sees a fresh inner shuffle.
selection_rule : str, default "1se"
Selection rule applied inside the inner loop. See
:data:`~process_improve.multivariate._common.SelectionRule`.
scale_inside_folds : bool, default True
Mirrors :meth:`select_n_components`. Also applied to the final
outer-train fit, with the test-fold predictions inverse-
transformed to the original Y scale before RMSEP accumulates.
min_q2_increase, n_permutations, alpha
Forwarded to the inner :meth:`select_n_components` per rule.
random_state : int, optional
Seed for the outer-fold shuffle and the inner CV. The inner
seed is offset per outer fold so each outer split sees a fresh
shuffled inner CV.
**pls_kwargs
Forwarded to :class:`PLS` for both the inner CV and the final
outer-train fits.
Returns
-------
result : sklearn.utils.Bunch
With keys:
- ``rmsep`` - honest held-out RMSEP per Y column plus a
``"total"`` entry (pd.Series).
- ``q2y`` - validated :math:`Q^2_Y` per Y column plus
``"total"`` (pd.Series).
- ``cv_predictions`` - out-of-fold predictions of Y at the
per-outer-fold selected component counts (pd.DataFrame on
the original Y scale).
- ``selected_components_per_fold`` - list of inner
recommendations, one per outer fold.
- ``selected_components_distribution`` - vote share over
candidate counts (pd.Series).
Notes
-----
Runtime is roughly ``outer_cv * inner_cv * n_inner_repeats *
max_components`` PLS fits. With the defaults that is 5 * 5 * 10 *
max_components fits per call; for ``max_components=10`` and a
moderate dataset that completes in seconds. Drop ``n_inner_repeats``
if you need to bring it down further.
Examples
--------
>>> from process_improve.multivariate import PLS
>>> result = PLS.nested_cv(X, Y, max_components=8, random_state=0)
>>> result.rmsep["total"]
"""
if not isinstance(X, pd.DataFrame):
X = pd.DataFrame(X)
if isinstance(Y, pd.Series):
Y = Y.to_frame()
elif not isinstance(Y, pd.DataFrame):
Y = pd.DataFrame(Y)
N = X.shape[0]
M = Y.shape[1]
if isinstance(outer_cv, int):
if outer_cv < 2:
raise ValueError(f"outer_cv must be >= 2 when given as an int; got {outer_cv}.")
outer_splitter: BaseCrossValidator = KFold(
n_splits=outer_cv, shuffle=True, random_state=random_state
)
else:
outer_splitter = outer_cv
outer_splits = list(outer_splitter.split(X, Y))
if not outer_splits:
raise ValueError("The outer cross-validation splitter produced no folds.")
y_columns = list(Y.columns)
y_values = Y.to_numpy()
oof_predictions = np.full((N, M), np.nan)
selected_components_per_fold: list[int] = []
for outer_idx, (train_idx, test_idx) in enumerate(outer_splits):
X_outer_train = X.iloc[train_idx]
Y_outer_train = Y.iloc[train_idx]
X_outer_test = X.iloc[test_idx]
inner_seed = None if random_state is None else int(random_state) + outer_idx
inner_result = cls.select_n_components(
X_outer_train,
Y_outer_train,
max_components=max_components,
cv=inner_cv,
n_repeats=n_inner_repeats,
selection_rule=selection_rule,
scale_inside_folds=scale_inside_folds,
min_q2_increase=min_q2_increase,
n_permutations=n_permutations,
alpha=alpha,
random_state=inner_seed,
**pls_kwargs,
)
n_comp_inner = int(inner_result.n_components)
selected_components_per_fold.append(n_comp_inner)
# Final outer-train fit at the inner-selected component count.
if scale_inside_folds:
scaler_x = MCUVScaler().fit(X_outer_train)
scaler_y = MCUVScaler().fit(Y_outer_train)
X_train_s = scaler_x.transform(X_outer_train)
Y_train_s = scaler_y.transform(Y_outer_train)
X_test_s = scaler_x.transform(X_outer_test).to_numpy()
y_centre = scaler_y.center_.to_numpy()
y_scale = scaler_y.scale_.to_numpy()
else:
X_train_s = X_outer_train
Y_train_s = Y_outer_train
X_test_s = X_outer_test.to_numpy()
y_centre = np.zeros(M)
y_scale = np.ones(M)
model = cls(n_components=n_comp_inner, **pls_kwargs).fit(X_train_s, Y_train_s)
scores_test = X_test_s @ model.direct_weights_.to_numpy()
y_loadings = typing.cast("pd.DataFrame", model.y_loadings_).to_numpy()
y_hat_scaled = scores_test @ y_loadings.T
oof_predictions[test_idx, :] = y_hat_scaled * y_scale + y_centre
# RMSEP from the out-of-fold predictions (each row is predicted by
# exactly one outer fold).
residuals = y_values - oof_predictions
mask = ~np.isnan(oof_predictions).any(axis=1)
n_valid = int(mask.sum())
if n_valid == 0:
raise RuntimeError(
"Nested CV produced no covered observations; check the outer splitter."
)
per_y_press = np.nansum(residuals[mask] ** 2, axis=0)
rmsep_per_y = np.sqrt(per_y_press / n_valid)
rmsep_total = np.sqrt(np.nansum(residuals[mask] ** 2) / (n_valid * M))
rmsep = pd.Series(
np.concatenate([rmsep_per_y, [rmsep_total]]),
index=[*y_columns, "total"],
name="RMSEP",
)
# Validated Q^2_Y per column and total, against the column mean.
col_means = np.nanmean(y_values, axis=0)
tss_per_y = np.nansum((y_values - col_means) ** 2, axis=0)
with warnings.catch_warnings():
warnings.simplefilter("ignore", RuntimeWarning)
q2y_per_y = np.where(tss_per_y > 0, 1.0 - per_y_press / tss_per_y, np.nan)
q2y_total = (
1.0 - residuals[mask].astype(float).__pow__(2).sum() / tss_per_y.sum()
if tss_per_y.sum() > 0
else np.nan
)
q2y = pd.Series(
np.concatenate([q2y_per_y, [q2y_total]]),
index=[*y_columns, "total"],
name="Q2Y",
)
cv_predictions = pd.DataFrame(oof_predictions, index=Y.index, columns=Y.columns)
counts = pd.Series(selected_components_per_fold).value_counts().sort_index()
distribution = counts / counts.sum()
distribution.name = "vote_share"
distribution.index.name = "n_components"
return Bunch(
rmsep=rmsep,
q2y=q2y,
cv_predictions=cv_predictions,
selected_components_per_fold=selected_components_per_fold,
selected_components_distribution=distribution,
)
[docs]
def score_contributions(
self,
t_start: np.ndarray | pd.Series,
t_end: np.ndarray | pd.Series | None = None,
components: list[int] | None = None,
*,
weighted: bool = False,
) -> pd.Series:
"""Contribution of each X variable to a score-space movement.
Identical to ``PCA.score_contributions`` but uses the PLS X-loadings
(P matrix) for back-projection.
Parameters
----------
t_start : array-like of shape (n_components,)
Score vector of the observation of interest.
t_end : array-like of shape (n_components,), optional
Reference point in score space. Default is the model center (zeros).
components : list of int, optional
**1-based** component indices. Default: all components.
weighted : bool, default False
If True, scale by 1/sqrt(explained_variance) for T² contributions.
Returns
-------
contributions : pd.Series of shape (n_features,)
Examples
--------
>>> pls = PLS(n_components=3).fit(X_scaled, Y_scaled)
>>> contrib = pls.score_contributions(pls.scores_.iloc[0].values)
>>> contrib.abs().sort_values(ascending=False).head() # top contributors
See Also
--------
observation_contributions : The per-observation counterpart. Despite
the similar name, the two answer different questions and are not
interchangeable. ``score_contributions`` is *per-variable*: it
decomposes a single observation's movement in score space back
onto the original X variables, answering "which **variables**
explain why this observation sits where it does?". It returns one
signed value per variable. ``observation_contributions`` is
*per-observation*: it reports each observation's share of a
component's total inertia, answering "which **observations** most
strongly shape this component?". It returns a non-negative
sample-by-component table whose columns each sum to 1. The two are
orthogonal views of the same score matrix - one decomposes across
variables, the other across observations.
"""
check_is_fitted(self, "x_loadings_")
t_start = np.asarray(t_start, dtype=float)
t_end = np.zeros(self.n_components) if t_end is None else np.asarray(t_end, dtype=float)
idx = np.arange(self.n_components) if components is None else np.array(components) - 1
dt = t_end[idx] - t_start[idx]
if weighted:
# ``explained_variance_[a] == 0`` for a degenerate component
# would silently produce inf/NaN weighted contributions.
# Clamp the divisor so weighting is a no-op on such components.
# SEC-21 (#270) sub-item 5.
ev = np.asarray(self.explained_variance_)[idx]
dt = dt / np.sqrt(np.where(ev > epsqrt, ev, 1.0))
P = self._x_loadings[:, idx].T
contributions = dt @ P
return pd.Series(contributions, index=self._feature_names, name="score_contributions")
[docs]
def detect_outliers(self, conf_level: float = 0.95) -> list[dict]:
"""Detect outlier observations using SPE and Hotelling's T² diagnostics.
Same approach as ``PCA.detect_outliers``: combines statistical limits
with the robust generalized ESD test.
Parameters
----------
conf_level : float, default 0.95
Confidence level in [0.8, 0.999].
Returns
-------
outliers : list of dict
Sorted from most severe to least. Each dict contains
``observation``, ``outlier_types``, ``spe``, ``hotellings_t2``,
``spe_limit``, ``hotellings_t2_limit``, ``severity``.
Examples
--------
>>> pls = PLS(n_components=3).fit(X_scaled, Y_scaled)
>>> outliers = pls.detect_outliers(conf_level=0.95)
>>> for o in outliers:
... print(f"{o['observation']}: {o['outlier_types']}")
"""
check_is_fitted(self, "spe_")
if not (0.8 <= conf_level <= 0.999):
raise ValueError(f"conf_level must be between 0.8 and 0.999, got {conf_level}.")
N = self.n_samples_
spe_values = self.spe_.iloc[:, -1]
t2_values = self.hotellings_t2_.iloc[:, -1]
spe_lim = self.spe_limit(conf_level=conf_level)
t2_lim = self.hotellings_t2_limit(conf_level=conf_level)
max_outliers = max(1, N // 5)
alpha = 1 - conf_level
spe_outlier_idx, _ = detect_outliers_esd(
spe_values.to_numpy(), algorithm="esd", max_outliers_detected=max_outliers, alpha=alpha
)
t2_outlier_idx, _ = detect_outliers_esd(
t2_values.to_numpy(), algorithm="esd", max_outliers_detected=max_outliers, alpha=alpha
)
spe_flagged = set(spe_outlier_idx)
t2_flagged = set(t2_outlier_idx)
for i in range(N):
if spe_values.iloc[i] > spe_lim:
spe_flagged.add(i)
if t2_values.iloc[i] > t2_lim:
t2_flagged.add(i)
all_flagged = spe_flagged | t2_flagged
results = []
for i in all_flagged:
types = []
if i in spe_flagged:
types.append("spe")
if i in t2_flagged:
types.append("hotellings_t2")
spe_val = float(spe_values.iloc[i])
t2_val = float(t2_values.iloc[i])
severity = max(spe_val / spe_lim, t2_val / t2_lim)
results.append(
{
"observation": spe_values.index[i],
"outlier_types": types,
"spe": spe_val,
"hotellings_t2": t2_val,
"spe_limit": spe_lim,
"hotellings_t2_limit": t2_lim,
"severity": round(severity, 4),
}
)
results.sort(key=lambda d: d["severity"], reverse=True)
return results
[docs]
def cross_validate( # noqa: PLR0912, PLR0913, PLR0915, C901
self,
X: DataMatrix,
Y: DataMatrix,
*,
cv: int | str = "loo",
n_bootstrap: int = 0,
conf_level: float = 0.95,
random_state: int | None = None,
show_progress: bool = True,
sample_weight: np.ndarray | None = None,
) -> Bunch:
"""Cross-validate the PLS model and compute error bars for beta coefficients.
Refits the model on data subsets (jackknife, K-fold, or bootstrap),
collects ``beta_coefficients_`` from each refit, and computes
confidence intervals. Also returns cross-validated predictions
and prediction-error metrics (RMSE, Q²).
Parameters
----------
X : array-like of shape (n_samples, n_features)
Predictor matrix (same data used for ``fit``).
Y : array-like of shape (n_samples, n_targets)
Response matrix (same data used for ``fit``).
cv : int or ``"loo"``, default ``"loo"``
Cross-validation strategy:
* ``"loo"`` - leave-one-out (jackknife). Produces N resamples.
* ``int`` - number of folds for K-fold CV.
n_bootstrap : int, default 0
If > 0, use bootstrap resampling instead of CV folds.
The value specifies the number of bootstrap rounds.
Overrides the ``cv`` parameter when set.
conf_level : float, default 0.95
Confidence level for the beta-coefficient intervals, in (0, 1).
random_state : int or None, default None
Random seed for reproducibility (K-fold shuffle and bootstrap).
show_progress : bool, default True
Whether to display a ``tqdm`` progress bar.
Returns
-------
result : :class:`~sklearn.utils.Bunch`
Dictionary-like object with the following keys:
**Beta-coefficient uncertainty**
beta_samples : np.ndarray of shape (n_resamples, n_features, n_targets)
Raw beta coefficients from every resample.
beta_mean : pd.DataFrame of shape (n_features, n_targets)
Mean beta across resamples.
beta_std : pd.DataFrame of shape (n_features, n_targets)
Standard error of the beta coefficients.
beta_ci_lower : pd.DataFrame of shape (n_features, n_targets)
Lower bound of the confidence interval.
beta_ci_upper : pd.DataFrame of shape (n_features, n_targets)
Upper bound of the confidence interval.
significant : pd.DataFrame of shape (n_features, n_targets)
``True`` where the confidence interval excludes zero.
**Prediction metrics**
y_hat_cv : pd.DataFrame of shape (n_samples, n_targets)
Cross-validated predictions (out-of-fold). Only available for
jackknife and K-fold; ``None`` for bootstrap.
press : float
Prediction Error Sum of Squares (sum over all Y elements).
Only for jackknife / K-fold.
rmse_cv : pd.Series of length n_targets
Root-mean-square error per Y variable (cross-validated).
Only for jackknife / K-fold.
q_squared : pd.Series of length n_targets
Cross-validated R² (Q²) per Y variable.
Only for jackknife / K-fold.
**Metadata**
n_resamples : int
Number of resamples performed.
method : str
``"jackknife"``, ``"kfold"``, or ``"bootstrap"``.
conf_level : float
The confidence level used.
Examples
--------
>>> from process_improve.multivariate import PLS, MCUVScaler
>>> scaler_x = MCUVScaler().fit(X)
>>> scaler_y = MCUVScaler().fit(Y)
>>> X_s, Y_s = scaler_x.transform(X), scaler_y.transform(Y)
>>> pls = PLS(n_components=2).fit(X_s, Y_s)
>>> cv_results = pls.cross_validate(X_s, Y_s, cv="loo")
>>> cv_results.beta_mean # mean beta across LOO resamples
>>> cv_results.significant # which betas are significantly != 0
>>> cv_results.q_squared # cross-validated R²
"""
check_is_fitted(self, "beta_coefficients_")
X = pd.DataFrame(X) if not isinstance(X, pd.DataFrame) else X
Y = pd.DataFrame(Y) if not isinstance(Y, pd.DataFrame) else Y
N, _K = X.shape
M = Y.shape[1]
if X.shape[0] != Y.shape[0]:
raise ValueError(f"X and Y must have the same number of rows, got {X.shape[0]} and {Y.shape[0]}.")
if not (0.5 < conf_level < 1.0):
raise ValueError(f"conf_level must be between 0.5 and 1.0, got {conf_level}.")
# --- Determine resampling strategy ---
use_bootstrap = n_bootstrap > 0
if use_bootstrap:
method = "bootstrap"
elif cv == "loo":
method = "jackknife"
else:
method = "kfold"
# Validate sample_weight up front (#394). The per-fold fits below
# subset it by training index, so the user's weights stay
# row-aligned to X / Y throughout.
if sample_weight is not None:
sample_weight = np.asarray(sample_weight, dtype=float).ravel()
if sample_weight.shape[0] != N:
raise ValueError(
f"sample_weight has {sample_weight.shape[0]} entries; expected {N} to match X / Y."
)
def _fold_weights(idx: np.ndarray) -> np.ndarray | None:
return None if sample_weight is None else sample_weight[idx]
# --- Collect beta coefficients (and out-of-fold predictions for CV) ---
beta_collection: list[np.ndarray] = []
y_hat_cv = np.full((N, M), np.nan) if not use_bootstrap else None
rng = np.random.default_rng(random_state)
if use_bootstrap:
iterator = tqdm(range(n_bootstrap), desc="Bootstrap", disable=not show_progress)
for _ in iterator:
train_idx = rng.choice(N, size=N, replace=True)
sub_model = clone(self).fit(
X.iloc[train_idx], Y.iloc[train_idx], sample_weight=_fold_weights(train_idx),
)
beta_collection.append(sub_model.beta_coefficients_.values)
elif method == "jackknife":
assert y_hat_cv is not None # only None when use_bootstrap is True
iterator = tqdm(range(N), desc="Jackknife (LOO)", disable=not show_progress)
for i in iterator:
train_idx = np.concatenate([np.arange(i), np.arange(i + 1, N)])
sub_model = clone(self).fit(
X.iloc[train_idx], Y.iloc[train_idx], sample_weight=_fold_weights(train_idx),
)
beta_collection.append(sub_model.beta_coefficients_.values)
pred = sub_model.predict(X.iloc[[i]])
y_hat_cv[i, :] = pred.values.ravel()
else: # K-fold
assert y_hat_cv is not None # only None when use_bootstrap is True
n_resamples = int(cv)
kf = KFold(n_splits=n_resamples, shuffle=True, random_state=random_state)
desc = f"{n_resamples}-Fold CV"
for train_idx, test_idx in tqdm(kf.split(X), total=n_resamples, desc=desc, disable=not show_progress):
sub_model = clone(self).fit(
X.iloc[train_idx], Y.iloc[train_idx], sample_weight=_fold_weights(train_idx),
)
beta_collection.append(sub_model.beta_coefficients_.values)
pred = sub_model.predict(X.iloc[test_idx])
y_hat_cv[test_idx, :] = pred.values
beta_samples = np.array(beta_collection) # (n_resamples, K, M)
actual_n_resamples = beta_samples.shape[0]
# --- Beta-coefficient statistics ---
beta_mean_arr = beta_samples.mean(axis=0)
if method == "jackknife":
# Jackknife variance: var = (N-1)/N * sum_i (beta_i - beta_mean)^2
jackknife_var = (N - 1) / N * np.sum((beta_samples - beta_mean_arr) ** 2, axis=0)
beta_std_arr = np.sqrt(jackknife_var)
# CI via t-distribution
alpha = 1 - conf_level
t_crit = t_dist.ppf(1 - alpha / 2, df=N - 1)
beta_ci_lower_arr = beta_mean_arr - t_crit * beta_std_arr
beta_ci_upper_arr = beta_mean_arr + t_crit * beta_std_arr
elif method == "kfold":
# For K-fold, use sample std of the K beta estimates
beta_std_arr = beta_samples.std(axis=0, ddof=1)
alpha = 1 - conf_level
t_crit = t_dist.ppf(1 - alpha / 2, df=actual_n_resamples - 1)
beta_ci_lower_arr = beta_mean_arr - t_crit * beta_std_arr
beta_ci_upper_arr = beta_mean_arr + t_crit * beta_std_arr
else: # bootstrap percentile CI
beta_std_arr = beta_samples.std(axis=0, ddof=1)
alpha = 1 - conf_level
beta_ci_lower_arr = np.percentile(beta_samples, 100 * alpha / 2, axis=0)
beta_ci_upper_arr = np.percentile(beta_samples, 100 * (1 - alpha / 2), axis=0)
# Significance: CI does not contain zero
significant_arr = (beta_ci_lower_arr > 0) | (beta_ci_upper_arr < 0)
x_cols = self.beta_coefficients_.index
y_cols = self.beta_coefficients_.columns
beta_mean_df = pd.DataFrame(beta_mean_arr, index=x_cols, columns=y_cols)
beta_std_df = pd.DataFrame(beta_std_arr, index=x_cols, columns=y_cols)
beta_ci_lower_df = pd.DataFrame(beta_ci_lower_arr, index=x_cols, columns=y_cols)
beta_ci_upper_df = pd.DataFrame(beta_ci_upper_arr, index=x_cols, columns=y_cols)
significant_df = pd.DataFrame(significant_arr, index=x_cols, columns=y_cols)
# --- Cross-validated prediction metrics ---
press_val = None
rmse_cv_series = None
q_squared_series = None
y_hat_cv_df = None
if y_hat_cv is not None:
y_hat_cv_df = pd.DataFrame(y_hat_cv, index=Y.index, columns=Y.columns)
residuals = Y.values - y_hat_cv
press_val = float(np.nansum(residuals**2))
ss_total = np.nansum((Y.values - Y.values.mean(axis=0)) ** 2, axis=0)
ss_res = np.nansum(residuals**2, axis=0)
rmse_vals = np.sqrt(np.nanmean(residuals**2, axis=0))
rmse_cv_series = pd.Series(rmse_vals, index=Y.columns, name="RMSE_CV")
q2_vals = 1.0 - ss_res / ss_total
q_squared_series = pd.Series(q2_vals, index=Y.columns, name="Q_squared")
return Bunch(
beta_samples=beta_samples,
beta_mean=beta_mean_df,
beta_std=beta_std_df,
beta_ci_lower=beta_ci_lower_df,
beta_ci_upper=beta_ci_upper_df,
significant=significant_df,
y_hat_cv=y_hat_cv_df,
press=press_val,
rmse_cv=rmse_cv_series,
q_squared=q_squared_series,
n_resamples=actual_n_resamples,
method=method,
conf_level=conf_level,
)
[docs]
def prediction_interval(
self,
X: DataMatrix,
*,
conf_level: float = 0.95,
cv_result: Bunch | None = None,
) -> Bunch:
"""Prediction interval for the Y predictions of new observations.
The interval combines the residual error variance with the leverage of
each new observation in the latent-variable space. For a new
observation the prediction-interval half-width on target ``m`` is
``t * s_E[m] * sqrt(1 + 1/N + T2_new / (N - 1))``
where ``s_E`` is the residual error standard deviation, ``T2_new`` is
the Hotelling's T² of the new observation, ``N`` is the number of
calibration samples, and ``t`` is the Student-t quantile.
Parameters
----------
X : array-like of shape (n_new, n_features)
New observations, pre-processed the same way as the training data.
conf_level : float, default=0.95
Confidence level for the interval, in (0.5, 1.0).
cv_result : sklearn.utils.Bunch or None, default=None
The result of :meth:`cross_validate`. When supplied, its
cross-validated RMSE (``rmse_cv``) is used for the error variance,
which is preferable to the optimistic calibration RMSE used
otherwise.
Returns
-------
sklearn.utils.Bunch
With keys ``y_hat`` (point predictions), ``lower`` and ``upper``
(prediction-interval bounds) - each a DataFrame of shape
(n_new, n_targets) - and ``conf_level``.
"""
check_is_fitted(self, "beta_coefficients_")
if not (0.5 < conf_level < 1.0):
raise ValueError(f"conf_level must be between 0.5 and 1.0, got {conf_level}.")
diagnostics = self.diagnose(X)
y_hat = diagnostics.y_hat
t2_new = np.asarray(diagnostics.hotellings_t2, dtype=float)
n_samples = self.n_samples_
n_components = int(self.n_components)
# Residual error std per Y variable: prefer the cross-validated RMSE
# when a cross_validate() result is supplied (calibration RMSE is
# optimistic for genuinely new observations).
if cv_result is not None:
error_std = np.asarray(cv_result.rmse_cv, dtype=float)
else:
error_std = np.asarray(self.rmse_.iloc[:, -1], dtype=float)
# Leverage of a new observation in the latent space.
leverage = 1.0 / n_samples + t2_new / (n_samples - 1)
df = max(n_samples - n_components - 1, 1)
t_crit = t_dist.ppf(1 - (1 - conf_level) / 2, df)
half_width = t_crit * np.sqrt(1.0 + leverage)[:, None] * error_std[None, :]
lower = pd.DataFrame(y_hat.values - half_width, index=y_hat.index, columns=y_hat.columns)
upper = pd.DataFrame(y_hat.values + half_width, index=y_hat.index, columns=y_hat.columns)
return Bunch(y_hat=y_hat, lower=lower, upper=upper, conf_level=conf_level)