Source code for process_improve.experiments.evaluate

# (c) Kevin Dunn, 2010-2026. MIT License.

"""Design evaluation: quality metrics for experimental designs.

Provides :func:`evaluate_design`, which computes properties and quality metrics
of an existing design matrix.  Supported metrics include efficiency values
(D/I/G), prediction variance, VIF, condition number, power analysis, alias
structure, confounding, resolution, defining relation, clear effects, minimum
aberration, and degrees of freedom.

Example
-------
>>> from process_improve.experiments import evaluate_design, generate_design, Factor
>>> factors = [Factor(name="A", low=0, high=10), Factor(name="B", low=0, high=10)]
>>> result = generate_design(factors, design_type="full_factorial", center_points=0)
>>> metrics = evaluate_design(result, model="interactions", metric=["d_efficiency", "vif"])
"""

from __future__ import annotations

import itertools
import logging
from collections.abc import Callable
from dataclasses import dataclass
from typing import Any

import numpy as np
import pandas as pd
from patsy import build_design_matrices, dmatrix
from patsy.design_info import DesignInfo
from scipy import stats
from statsmodels.stats.outliers_influence import variance_inflation_factor

from process_improve.experiments.factor import DesignResult
from process_improve.experiments.models import validate_formula_is_safe, validate_identifier_is_safe

logger = logging.getLogger(__name__)

# ---------------------------------------------------------------------------
# Internal context shared across metric computations
# ---------------------------------------------------------------------------


@dataclass
class _EvalRequest:
    """Caller-supplied evaluation request.

    Bundles every piece of metric-independent input so
    :func:`_build_context` can accept a single dataclass instead of
    nine positional / keyword arguments (ENG-25 / #307: removes one
    ``noqa: PLR0913``).
    """

    design_df: pd.DataFrame
    factor_names: list[str]
    model: str | None
    generators: list[str] | None
    defining_relation: list[str] | None
    resolution: int | None
    effect_size: float | None
    alpha: float
    sigma: float | None
    region: str = "cuboidal"
    n_samples: int = 100_000
    include_vertices: bool = True
    random_seed: int = 42
    fds_resolution: int | None = None


@dataclass
class _EvalContext:
    """Shared evaluation context, computed once and reused by all metrics."""

    X: np.ndarray
    column_names: list[str]
    factor_names: list[str]
    design_df: pd.DataFrame
    design_info: DesignInfo  # patsy DesignInfo of the fitted model matrix
    N: int
    p: int
    XtX: np.ndarray
    XtX_inv: np.ndarray | None  # None when X'X is singular
    is_singular: bool
    generators: list[str] | None
    defining_relation: list[str] | None
    resolution: int | None
    effect_size: float | None
    alpha: float
    sigma: float | None
    region: str = "cuboidal"
    n_samples: int = 100_000
    include_vertices: bool = True
    random_seed: int = 42
    fds_resolution: int | None = None


# ---------------------------------------------------------------------------
# Model matrix construction
# ---------------------------------------------------------------------------

_ROMAN = {3: "III", 4: "IV", 5: "V", 6: "VI", 7: "VII", 8: "VIII"}


def _build_model_matrix(
    design_df: pd.DataFrame,
    model: str | None,
    factor_names: list[str],
) -> tuple[np.ndarray, list[str], DesignInfo]:
    """Build the expanded model matrix *X* using patsy.

    Parameters
    ----------
    design_df : DataFrame
        Design matrix with one column per factor (coded units).
    model : str or None
        ``"main_effects"``, ``"interactions"``, ``"quadratic"``, an explicit
        patsy formula, or *None* (defaults to ``"interactions"``).
    factor_names : list[str]
        Ordered factor names.

    Returns
    -------
    X : ndarray of shape (N, p)
        The model matrix including the intercept column.
    column_names : list[str]
        Human-readable names for each column of *X*.
    design_info : patsy.DesignInfo
        The patsy design info describing the expansion.  Pass it to
        :func:`patsy.build_design_matrices` to expand *new* factor-space points
        through the identical model (see :func:`_expand_points`).
    """
    if model is None:
        model = "interactions"

    # Factor names are interpolated into the formula; reject non-identifiers.
    for name in factor_names:
        validate_identifier_is_safe(name)

    # Map shorthand names to patsy right-hand-side formulas
    joined = " + ".join(factor_names)
    if model == "main_effects":
        rhs = joined
    elif model == "interactions":
        rhs = f"({joined}) ** 2"
    elif model == "quadratic":
        squared = " + ".join(f"I({f} ** 2)" for f in factor_names)
        rhs = f"({joined}) ** 2 + {squared}"
    elif "~" in model:
        # Explicit formula with response side - strip LHS
        rhs = model.split("~", 1)[1].strip()
    else:
        # Assume it is already a valid RHS formula
        rhs = model

    # Patsy evaluates each term as Python, so a custom ``model`` is a code-
    # execution vector. Permit only safe column arithmetic, with I()/Q() (SEC-14).
    validate_formula_is_safe(rhs, design_df.columns, allow_transforms=True)
    dm = dmatrix(rhs, design_df, return_type="dataframe")
    X = np.asarray(dm, dtype=float)
    column_names = list(dm.columns)
    return X, column_names, dm.design_info


def _build_context(req: _EvalRequest) -> _EvalContext:
    """Build the shared evaluation context."""
    X, column_names, design_info = _build_model_matrix(req.design_df, req.model, req.factor_names)
    N, p = X.shape
    XtX = X.T @ X

    rank = np.linalg.matrix_rank(XtX)
    is_singular = rank < p
    XtX_inv: np.ndarray | None = None
    if not is_singular:
        XtX_inv = np.linalg.inv(XtX)

    return _EvalContext(
        X=X,
        column_names=column_names,
        factor_names=req.factor_names,
        design_df=req.design_df,
        design_info=design_info,
        N=N,
        p=p,
        XtX=XtX,
        XtX_inv=XtX_inv,
        is_singular=is_singular,
        generators=req.generators,
        defining_relation=req.defining_relation,
        resolution=req.resolution,
        effect_size=req.effect_size,
        alpha=req.alpha,
        sigma=req.sigma,
        region=req.region,
        n_samples=req.n_samples,
        include_vertices=req.include_vertices,
        random_seed=req.random_seed,
        fds_resolution=req.fds_resolution,
    )


# ---------------------------------------------------------------------------
# Metric implementations
# ---------------------------------------------------------------------------


def _compute_d_efficiency(ctx: _EvalContext) -> dict[str, Any]:
    """D-efficiency: 100 * det(X'X)^(1/p) / N."""
    if ctx.is_singular:
        return {"d_efficiency": None, "note": "Design is rank-deficient for the specified model."}
    sign, logdet = np.linalg.slogdet(ctx.XtX)
    if sign <= 0:
        return {"d_efficiency": None, "note": "X'X has non-positive determinant."}
    d_eff = 100.0 * np.exp(logdet / ctx.p) / ctx.N
    return {"d_efficiency": float(d_eff)}


def _prediction_variance_at_points(X_points: np.ndarray, XtX_inv: np.ndarray) -> np.ndarray:
    """Compute d(x) = x' (X'X)^-1 x for each row of X_points."""
    # (X_points @ XtX_inv) element-wise * X_points, summed per row
    return np.sum((X_points @ XtX_inv) * X_points, axis=1)


def _expand_points(ctx: _EvalContext, points: np.ndarray) -> np.ndarray:
    """Expand raw factor-space points through the *fitted* model matrix.

    Uses the stored patsy :class:`~patsy.design_info.DesignInfo` so the columns
    of the returned matrix match :attr:`_EvalContext.X` exactly (same terms,
    same order).  This is what keeps region / grid evaluation consistent with
    the fit; rebuilding from an inferred shorthand model is what previously
    produced a column-count mismatch for explicit reduced formulas.

    Parameters
    ----------
    ctx : _EvalContext
        The shared evaluation context (carries the fitted ``design_info``).
    points : ndarray of shape (M, k)
        Raw factor-space points, one column per factor in ``ctx.factor_names``.

    Returns
    -------
    ndarray of shape (M, p)
        The model matrix for *points*, with the same columns as ``ctx.X``.
    """
    df_points = pd.DataFrame(np.asarray(points, dtype=float), columns=ctx.factor_names)
    (expanded,) = build_design_matrices([ctx.design_info], df_points, return_type="matrix")
    return np.asarray(expanded, dtype=float)


def _cube_vertices(k: int) -> np.ndarray:
    """Return all ``2**k`` cube vertices (corners) of ``[-1, 1]^k``."""
    return np.array(list(itertools.product([-1.0, 1.0], repeat=k)), dtype=float)


def _region_points(
    factor_names: list[str],
    region: str,
    n_samples: int,
    include_vertices: bool,
    random_seed: int,
) -> np.ndarray:
    """Sample raw factor-space points over the design region.

    Parameters
    ----------
    factor_names : list[str]
        Ordered factor names (only the count ``k`` matters here).
    region : {"cuboidal", "spherical"}
        ``"cuboidal"`` samples uniformly in ``[-1, 1]^k``; ``"spherical"``
        samples uniformly inside the ball of radius ``sqrt(k)`` (the sphere
        that circumscribes the unit cube).
    n_samples : int
        Number of random interior samples to draw.
    include_vertices : bool
        When *True*, append all ``2**k`` cube vertices to the sample set.  The
        worst-case prediction variance for second-order models very often sits
        at (or near) a corner, so the corners are always represented in the
        G / FDS statistics.
    random_seed : int
        Seed for the NumPy random generator (full reproducibility).

    Returns
    -------
    ndarray of shape (M, k)
        The sampled points, with the cube vertices appended last when
        *include_vertices* is set.
    """
    k = len(factor_names)
    rng = np.random.default_rng(random_seed)
    if region == "cuboidal":
        pts = rng.uniform(-1.0, 1.0, size=(n_samples, k))
    elif region == "spherical":
        # Uniform in the radius-sqrt(k) ball: direction * radius, radius ~ U^(1/k).
        directions = rng.normal(size=(n_samples, k))
        directions /= np.linalg.norm(directions, axis=1, keepdims=True)
        radii = np.sqrt(k) * rng.uniform(0.0, 1.0, size=(n_samples, 1)) ** (1.0 / k)
        pts = directions * radii
    else:
        raise ValueError(f"Unknown region={region!r}.  Choose 'cuboidal' or 'spherical'.")

    if include_vertices and k > 0:
        pts = np.vstack([pts, _cube_vertices(k)])
    return pts


def _region_prediction_variance(ctx: _EvalContext) -> np.ndarray:
    """Prediction variance ``d(x) = x' (X'X)^-1 x`` over the design region.

    Single source of truth for the region-based metrics (I / G efficiency and
    the FDS curve): all of them read from this one sorted array, sampled with
    the region settings carried on *ctx*.
    """
    assert ctx.XtX_inv is not None  # callers guard on ``ctx.is_singular``
    points = _region_points(
        ctx.factor_names,
        region=ctx.region,
        n_samples=ctx.n_samples,
        include_vertices=ctx.include_vertices,
        random_seed=ctx.random_seed,
    )
    X_region = _expand_points(ctx, points)
    return _prediction_variance_at_points(X_region, ctx.XtX_inv)


def _compute_g_efficiency(ctx: _EvalContext) -> dict[str, Any]:
    """G-efficiency: 100 * p / (N * max prediction variance over design region)."""
    if ctx.is_singular:
        return {"g_efficiency": None, "note": "Design is rank-deficient for the specified model."}

    pv = _region_prediction_variance(ctx)
    max_pv = float(np.max(pv))

    g_eff = 100.0 * ctx.p / (ctx.N * max_pv) if max_pv > 0 else None
    return {
        "g_efficiency": float(g_eff) if g_eff is not None else None,
        "max_prediction_variance": max_pv,
    }


def _compute_i_efficiency(ctx: _EvalContext) -> dict[str, Any]:
    """I-efficiency: 100 * p / (N * average prediction variance over design region)."""
    if ctx.is_singular:
        return {"i_efficiency": None, "note": "Design is rank-deficient for the specified model."}

    pv = _region_prediction_variance(ctx)
    avg_pv = float(np.mean(pv))

    i_eff = 100.0 * ctx.p / (ctx.N * avg_pv) if avg_pv > 0 else None
    return {
        "i_efficiency": float(i_eff) if i_eff is not None else None,
        "average_prediction_variance": avg_pv,
    }


def _term_order(name: str) -> int:
    """Classify a model-matrix column by its polynomial order.

    Returns ``0`` for the intercept, ``1`` for a main effect, and ``2`` for a
    second-order term (a pure quadratic ``I(x ** 2)`` or a two-factor
    interaction ``x:y``).  Sufficient for the response-surface models this
    module evaluates; higher powers are reported as their interaction depth.
    """
    if name.lower() == "intercept" or name == "1":
        return 0
    if ":" in name:
        return name.count(":") + 1
    if "**" in name or name.startswith("I("):
        return 2
    return 1


def _compute_a_optimality(ctx: _EvalContext) -> dict[str, Any]:
    """A-optimality: ``trace((X'X)^-1)`` (lower is better).

    The trace of the inverse information matrix is the sum (so, up to a
    constant, the average) of the coefficient variances.  ``a_efficiency`` is a
    normalised score in the same spirit as ``d_efficiency`` (higher is better).
    """
    if ctx.is_singular:
        return {"a_optimality": None, "note": "Design is rank-deficient for the specified model."}

    assert ctx.XtX_inv is not None
    trace = float(np.trace(ctx.XtX_inv))
    a_eff = 100.0 * ctx.p / (ctx.N * trace) if trace > 0 else None
    return {
        "a_optimality": trace,
        "a_efficiency": float(a_eff) if a_eff is not None else None,
    }


def _compute_e_optimality(ctx: _EvalContext) -> dict[str, Any]:
    """E-optimality: smallest eigenvalue of ``X'X`` (higher is better).

    The minimum eigenvalue measures how well the worst-estimated direction in
    parameter space is supported by the design.  ``e_efficiency`` scales it by
    the run count for comparison across designs of different size.
    """
    min_eig = float(np.linalg.eigvalsh(ctx.XtX).min())
    return {
        "e_optimality": min_eig,
        "e_efficiency": 100.0 * min_eig / ctx.N if ctx.N > 0 else None,
    }


def _compute_correlation(ctx: _EvalContext) -> dict[str, Any]:
    """Pairwise correlation summary among the model's second-order terms.

    The pure-quadratic columns ``x_i^2`` have a non-zero mean, so a raw Pearson
    correlation between them is inflated by that shared offset and depends on
    the coding.  To get a coding-invariant measure, each second-order column is
    first residualised against the intercept-and-main-effect block (the
    content those columns share by construction) and the correlations are taken
    on the residuals.

    Returns ``max_abs_r``, ``mean_abs_r``, the full correlation ``matrix`` (as
    nested lists), and the ordered ``terms``.
    """
    second_idx = [i for i, c in enumerate(ctx.column_names) if _term_order(c) == 2]
    base_idx = [i for i, c in enumerate(ctx.column_names) if _term_order(c) in (0, 1)]
    if len(second_idx) < 2:
        return {
            "correlation": {
                "max_abs_r": 0.0,
                "mean_abs_r": 0.0,
                "matrix": [[1.0]] if second_idx else [],
                "terms": [ctx.column_names[i] for i in second_idx],
                "note": "Fewer than two second-order terms; no pairwise correlation.",
            }
        }

    second = ctx.X[:, second_idx]
    base = ctx.X[:, base_idx]
    # Residualise the second-order columns against [intercept, main effects].
    resid = second - base @ (np.linalg.pinv(base) @ second)

    with np.errstate(invalid="ignore", divide="ignore"):
        corr = np.corrcoef(resid, rowvar=False)
    corr = np.nan_to_num(corr, nan=0.0)
    corr = np.atleast_2d(corr)

    m = corr.shape[0]
    iu = np.triu_indices(m, k=1)
    off_diag = np.abs(corr[iu])
    max_abs = float(off_diag.max()) if off_diag.size else 0.0
    mean_abs = float(off_diag.mean()) if off_diag.size else 0.0
    return {
        "correlation": {
            "max_abs_r": max_abs,
            "mean_abs_r": mean_abs,
            "matrix": corr.tolist(),
            "terms": [ctx.column_names[i] for i in second_idx],
        }
    }


def _omitted_two_factor_interactions(ctx: _EvalContext) -> tuple[np.ndarray, list[str]]:
    """Build the two-factor-interaction columns *not* already in the model.

    Returns the ``(N, q)`` matrix of raw ``x_i * x_j`` products and the matching
    ``"A:B"`` term names, for every factor pair whose interaction is absent from
    the fitted model matrix.
    """
    present = {c.replace(" ", "") for c in ctx.column_names}
    cols: list[np.ndarray] = []
    names: list[str] = []
    factors = ctx.factor_names
    values = {f: ctx.design_df[f].to_numpy(dtype=float) for f in factors}
    for a, b in itertools.combinations(factors, 2):
        if f"{a}:{b}" in present or f"{b}:{a}" in present:
            continue
        cols.append(values[a] * values[b])
        names.append(f"{a}:{b}")
    if not cols:
        return np.empty((ctx.N, 0)), []
    return np.column_stack(cols), names


def _compute_alias_matrix(ctx: _EvalContext) -> dict[str, Any]:
    """General alias (bias) matrix ``A = (X1'X1)^-1 X1' X2``.

    With the fitted model ``X1`` and a set of potential extra terms ``X2``
    (default: the two-factor interactions not already in the model), the least
    squares estimate of the fitted coefficients is biased by
    ``E[b1] = beta1 + A @ beta2``.  This generalises :func:`alias_structure`
    (which only handles two-level fractional factorials) to any design / model.

    Returns the ``matrix`` (nested lists), the ``model_terms`` (rows) and
    ``alias_terms`` (columns), the worst single bias ``max_abs``, the maximum
    over the main-effect rows ``max_abs_main_effect_rows``, and the Frobenius
    norm ``frobenius_norm``.
    """
    if ctx.is_singular:
        return {"alias_matrix": None, "note": "Design is rank-deficient for the specified model."}

    assert ctx.XtX_inv is not None
    X2, alias_terms = _omitted_two_factor_interactions(ctx)
    if X2.shape[1] == 0:
        return {
            "alias_matrix": {
                "matrix": [],
                "model_terms": list(ctx.column_names),
                "alias_terms": [],
                "max_abs": 0.0,
                "max_abs_main_effect_rows": 0.0,
                "frobenius_norm": 0.0,
                "note": "No two-factor interactions outside the model to alias against.",
            }
        }

    alias = ctx.XtX_inv @ ctx.X.T @ X2
    main_rows = [i for i, c in enumerate(ctx.column_names) if _term_order(c) == 1]
    max_abs = float(np.max(np.abs(alias)))
    max_main = float(np.max(np.abs(alias[main_rows, :]))) if main_rows else 0.0
    return {
        "alias_matrix": {
            "matrix": alias.tolist(),
            "model_terms": list(ctx.column_names),
            "alias_terms": alias_terms,
            "max_abs": max_abs,
            "max_abs_main_effect_rows": max_main,
            "frobenius_norm": float(np.linalg.norm(alias)),
        }
    }


_FDS_QUANTILES = (0.0, 0.01, 0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99, 1.0)


def _compute_fds(ctx: _EvalContext) -> dict[str, Any]:
    """Fraction-of-design-space (FDS) distribution of the prediction variance.

    Samples the scaled prediction variance ``d(x) = x' (X'X)^-1 x`` over the
    whole design region (not just the design points), returning quantiles of
    the curve together with the region average (I / V-optimality) and maximum
    (G-optimality) in ``sigma^2`` units, plus the run-count-scaled SPV variants
    (multiplied by ``N``).  The region settings are echoed back for
    reproducibility.

    When ``ctx.fds_resolution`` is set, a dense ``curve`` sub-dict is added with
    ``fraction``, ``prediction_variance``, and ``scaled_prediction_variance``
    arrays of that length, evaluated on evenly spaced fractions in ``[0, 1]``
    (the endpoints are the minimum and maximum prediction variance) - suitable
    for drawing a smooth FDS plot.  The coarse 11-point ``quantiles`` summary is
    always present for backward compatibility.
    """
    if ctx.is_singular:
        return {"fds": None, "note": "Design is rank-deficient for the specified model."}

    pv = np.sort(_region_prediction_variance(ctx))
    avg = float(pv.mean())
    mx = float(pv.max())
    quantiles = {f"{q:g}": float(v) for q, v in zip(_FDS_QUANTILES, np.quantile(pv, _FDS_QUANTILES), strict=True)}
    payload: dict[str, Any] = {
        "region": ctx.region,
        "n_samples": ctx.n_samples,
        "include_vertices": ctx.include_vertices,
        "random_seed": ctx.random_seed,
        "fds_resolution": ctx.fds_resolution,
        "quantiles": quantiles,
        "average_prediction_variance": avg,
        "max_prediction_variance": mx,
        "scaled_average_prediction_variance": avg * ctx.N,
        "scaled_max_prediction_variance": mx * ctx.N,
    }
    if ctx.fds_resolution is not None:
        if ctx.fds_resolution < 2:
            raise ValueError(f"fds_resolution must be at least 2, got {ctx.fds_resolution}.")
        fractions = np.linspace(0.0, 1.0, ctx.fds_resolution)
        curve = np.quantile(pv, fractions)  # non-decreasing; endpoints are min and max
        payload["curve"] = {
            "fraction": fractions.tolist(),
            "prediction_variance": curve.tolist(),
            "scaled_prediction_variance": (curve * ctx.N).tolist(),
        }
    return {"fds": payload}


def _compute_prediction_variance(ctx: _EvalContext) -> dict[str, Any]:
    """Prediction variance d(x_i) = x_i' (X'X)^-1 x_i at each design point."""
    if ctx.is_singular:
        return {"prediction_variance": None, "note": "Design is rank-deficient for the specified model."}

    # ``XtX_inv`` is non-None whenever the design is not singular (guarded above).
    assert ctx.XtX_inv is not None
    pv = _prediction_variance_at_points(ctx.X, ctx.XtX_inv)
    pv_list = [float(v) for v in pv]
    return {
        "prediction_variance": pv_list,
        "mean": float(np.mean(pv)),
        "max": float(np.max(pv)),
        "min": float(np.min(pv)),
    }


def _compute_vif(ctx: _EvalContext) -> dict[str, Any]:
    """Variance Inflation Factor for each model term (excluding intercept)."""
    if ctx.is_singular:
        return {"vif": None, "note": "Design is rank-deficient for the specified model."}

    vif_dict: dict[str, float] = {}
    for i, name in enumerate(ctx.column_names):
        if name.lower() == "intercept" or name == "1":
            continue
        vif_val = variance_inflation_factor(ctx.X, i)
        vif_dict[name] = float(vif_val)
    return {"vif": vif_dict}


def _compute_condition_number(ctx: _EvalContext) -> dict[str, float]:
    """Condition number of the model matrix X."""
    cn = float(np.linalg.cond(ctx.X))
    return {"condition_number": cn}


def _compute_power(ctx: _EvalContext) -> dict[str, Any]:
    """Statistical power for detecting each model term."""
    if ctx.is_singular:
        return {"power": None, "note": "Design is rank-deficient for the specified model."}

    sigma = ctx.sigma if ctx.sigma is not None else 1.0
    df_resid = ctx.N - ctx.p

    if df_resid <= 0:
        return {"power": None, "note": "No residual degrees of freedom (saturated model)."}

    assert ctx.XtX_inv is not None  # guaranteed by not is_singular
    diag_inv = np.diag(ctx.XtX_inv)

    if ctx.effect_size is not None:
        # Single power value per term
        power_dict: dict[str, float] = {}
        for i, name in enumerate(ctx.column_names):
            if name.lower() == "intercept" or name == "1":
                continue
            ncp = (ctx.effect_size**2) / (sigma**2 * diag_inv[i])
            f_crit = stats.f.ppf(1.0 - ctx.alpha, dfn=1, dfd=df_resid)
            pwr = 1.0 - stats.ncf.cdf(f_crit, dfn=1, dfd=df_resid, nc=ncp)
            power_dict[name] = float(pwr)
        return {"power": power_dict}

    # No effect_size: generate power curves over a range of effect sizes
    effect_sizes = np.linspace(0.5 * sigma, 3.0 * sigma, 20)
    power_curves: dict[str, list[dict[str, float]]] = {}
    for i, name in enumerate(ctx.column_names):
        if name.lower() == "intercept" or name == "1":
            continue
        curve = []
        for es in effect_sizes:
            ncp = (es**2) / (sigma**2 * diag_inv[i])
            f_crit = stats.f.ppf(1.0 - ctx.alpha, dfn=1, dfd=df_resid)
            pwr = 1.0 - stats.ncf.cdf(f_crit, dfn=1, dfd=df_resid, nc=ncp)
            curve.append({"effect_size": float(es), "power": float(pwr)})
        power_curves[name] = curve
    return {"power_curves": power_curves, "sigma": float(sigma)}


def _compute_degrees_of_freedom(ctx: _EvalContext) -> dict[str, Any]:
    """Degrees of freedom breakdown."""
    df_model = ctx.p - 1  # excluding intercept
    df_residual = ctx.N - ctx.p
    df_total = ctx.N - 1

    # Detect replicates by counting distinct rows
    design_rounded = np.round(ctx.design_df[ctx.factor_names].values, decimals=10)
    n_distinct = len(set(map(tuple, design_rounded)))
    has_replicates = n_distinct < ctx.N

    result: dict[str, Any] = {
        "degrees_of_freedom": {
            "model": df_model,
            "residual": df_residual,
            "total": df_total,
        }
    }
    if has_replicates:
        df_pure_error = ctx.N - n_distinct
        df_lack_of_fit = n_distinct - ctx.p if n_distinct > ctx.p else 0
        result["degrees_of_freedom"]["pure_error"] = df_pure_error
        result["degrees_of_freedom"]["lack_of_fit"] = df_lack_of_fit

    return result


# ---------------------------------------------------------------------------
# Alias / confounding metrics (GF(2) arithmetic on generator words)
# ---------------------------------------------------------------------------


def _parse_word(word: str, factor_names: list[str]) -> frozenset[int]:
    """Parse a word like ``"ABCE"`` into a frozenset of factor indices.

    Also handles ``"I=ABCE"`` format (strips the ``I=`` prefix).
    """
    word = word.strip().removeprefix("I=")
    if word == "I":
        return frozenset()

    # Try multi-char factor names first (when names are longer than 1 char)
    name_to_idx = {name: i for i, name in enumerate(factor_names)}

    # If all factor names are single chars, parse character-by-character
    if all(len(n) == 1 for n in factor_names):
        indices = set()
        for ch in word:
            if ch in name_to_idx:
                indices.add(name_to_idx[ch])
        return frozenset(indices)

    # Multi-char names: try to match greedily (longest first)
    remaining = word
    indices = set()
    sorted_names = sorted(name_to_idx.keys(), key=len, reverse=True)
    while remaining:
        matched = False
        for name in sorted_names:
            if remaining.startswith(name):
                indices.add(name_to_idx[name])
                remaining = remaining[len(name) :]
                matched = True
                break
        if not matched:
            remaining = remaining[1:]  # skip unrecognized character
    return frozenset(indices)


def _word_to_str(indices: frozenset[int], factor_names: list[str]) -> str:
    """Convert factor indices back to a word string."""
    if not indices:
        return "I"
    sorted_indices = sorted(indices)
    return "".join(factor_names[i] for i in sorted_indices)


def _multiply_words(w1: frozenset[int], w2: frozenset[int]) -> frozenset[int]:
    """Multiply two words in GF(2) - symmetric difference of factor sets."""
    return w1.symmetric_difference(w2)


def _defining_relation_from_generators(
    generators: list[str], factor_names: list[str]
) -> list[frozenset[int]]:
    """Compute the full defining relation from generator strings.

    Each generator like ``"D=ABC"`` produces the word ``ABCD``.  The full
    defining relation is the closure under GF(2) multiplication of all
    generator words and their products (all non-empty subsets).
    """
    # Parse each generator into a defining word
    base_words: list[frozenset[int]] = []
    for gen in generators:
        parts = gen.split("=")
        lhs = parts[0].strip()
        rhs = parts[1].strip() if len(parts) > 1 else ""
        lhs_idx = _parse_word(lhs, factor_names)
        rhs_idx = _parse_word(rhs, factor_names)
        word = _multiply_words(lhs_idx, rhs_idx)
        base_words.append(word)

    # Generate all non-empty subsets and their products
    all_words: set[frozenset[int]] = set()
    for r in range(1, len(base_words) + 1):
        for subset in itertools.combinations(base_words, r):
            product: frozenset[int] = frozenset()
            for w in subset:
                product = _multiply_words(product, w)
            if product:  # exclude identity
                all_words.add(product)

    return sorted(all_words, key=lambda w: (len(w), sorted(w)))


def _compute_defining_relation(ctx: _EvalContext) -> dict[str, Any]:
    """Compute or return the defining relation."""
    if ctx.defining_relation:
        return {"defining_relation": ctx.defining_relation}

    if not ctx.generators:
        return {"defining_relation": None, "note": "No generators available. Not a fractional factorial design."}

    words = _defining_relation_from_generators(ctx.generators, ctx.factor_names)
    relation = [f"I={_word_to_str(w, ctx.factor_names)}" for w in words]
    return {"defining_relation": relation}


def _compute_resolution(ctx: _EvalContext) -> dict[str, Any]:
    """Design resolution = minimum word length in the defining relation."""
    if ctx.resolution is not None:
        roman = _ROMAN.get(ctx.resolution, str(ctx.resolution))
        return {"resolution": ctx.resolution, "roman": roman}

    if not ctx.generators:
        return {"resolution": None, "roman": None, "note": "Not a fractional factorial design."}

    words = _defining_relation_from_generators(ctx.generators, ctx.factor_names)
    if not words:
        return {"resolution": None, "roman": None, "note": "No defining relation words found."}

    res = min(len(w) for w in words)
    roman = _ROMAN.get(res, str(res))
    return {"resolution": res, "roman": roman}


def _compute_alias_structure(ctx: _EvalContext) -> dict[str, Any]:
    """Alias structure: which effects are aliased with which others.

    Uses GF(2) arithmetic when generators are available; falls back to
    correlation-based detection otherwise.
    """
    if ctx.generators:
        return _alias_structure_from_generators(ctx)
    return _alias_structure_from_correlation(ctx)


def _alias_structure_from_generators(ctx: _EvalContext) -> dict[str, Any]:
    """Compute alias chains using GF(2) multiplication against the defining relation."""
    # Only reached when ``ctx.generators`` is truthy (see ``_compute_alias_structure``).
    assert ctx.generators is not None
    words = _defining_relation_from_generators(ctx.generators, ctx.factor_names)
    if not words:
        return {"alias_structure": []}

    # Build alias chains for main effects and 2-factor interactions
    alias_chains: list[str] = []
    k = len(ctx.factor_names)

    # Main effects
    for i in range(k):
        effect = frozenset([i])
        effect_name = ctx.factor_names[i]
        aliases = []
        for w in words:
            alias = _multiply_words(effect, w)
            alias_name = _word_to_str(alias, ctx.factor_names)
            aliases.append(alias_name)
        # Sort by word length
        aliases.sort(key=lambda s: (len(s), s))
        chain = f"{effect_name} = " + " + ".join(aliases)
        alias_chains.append(chain)

    # 2-factor interactions
    for i, j in itertools.combinations(range(k), 2):
        effect = frozenset([i, j])
        effect_name = _word_to_str(effect, ctx.factor_names)
        aliases = []
        for w in words:
            alias = _multiply_words(effect, w)
            alias_name = _word_to_str(alias, ctx.factor_names)
            aliases.append(alias_name)
        aliases.sort(key=lambda s: (len(s), s))
        chain = f"{effect_name} = " + " + ".join(aliases)
        alias_chains.append(chain)

    return {"alias_structure": alias_chains}


def _is_intercept_col(name: str) -> bool:
    """Check if a column name represents the intercept."""
    return name.lower() == "intercept" or name == "1"


def _find_correlated_aliases(
    X_full: np.ndarray, col_names: list[str], col_idx: int, nonzero: np.ndarray, threshold: float
) -> list[tuple[str, str]]:
    """Find columns highly correlated with column *col_idx*."""
    aliases: list[tuple[str, str]] = []
    for j in range(X_full.shape[1]):
        if j == col_idx or not nonzero[j] or _is_intercept_col(col_names[j]):
            continue
        corr = np.corrcoef(X_full[:, col_idx], X_full[:, j])[0, 1]
        if abs(corr) > threshold:
            sign = "+" if corr > 0 else "-"
            aliases.append((sign, col_names[j]))
    return aliases


def _alias_structure_from_correlation(ctx: _EvalContext) -> dict[str, Any]:
    """Detect aliasing via correlation of model matrix columns."""
    if ctx.p <= 1:
        return {"alias_structure": []}

    # Build an expanded model matrix with higher-order terms for alias detection
    factor_str = " + ".join(ctx.factor_names)
    k = len(ctx.factor_names)
    max_order = min(k, 3)
    rhs = f"({factor_str}) ** {max_order}" if max_order >= 3 else f"({factor_str}) ** 2"

    dm = dmatrix(rhs, ctx.design_df, return_type="dataframe")
    X_full = np.asarray(dm, dtype=float)
    col_names = list(dm.columns)

    stddevs = X_full.std(axis=0)
    nonzero = stddevs > np.sqrt(np.finfo(float).eps)

    alias_chains: list[str] = []
    for i in range(X_full.shape[1]):
        if _is_intercept_col(col_names[i]) or not nonzero[i]:
            continue
        aliases = _find_correlated_aliases(X_full, col_names, i, nonzero, threshold=0.995)
        if aliases:
            alias_parts = [f"{sign}{name}" for sign, name in aliases]
            alias_chains.append(f"{col_names[i]} = " + " + ".join(alias_parts))

    return {"alias_structure": alias_chains}


def _compute_confounding(ctx: _EvalContext) -> dict[str, Any]:
    """Confounding structure: pairs of effects that cannot be distinguished."""
    alias_result = _compute_alias_structure(ctx)
    alias_chains = alias_result.get("alias_structure", [])
    if not alias_chains:
        return {"confounding": [], "note": "No confounding detected."}

    confounding_list: list[dict[str, Any]] = []
    for chain in alias_chains:
        if " = " not in chain:
            continue
        parts = chain.split(" = ", 1)
        effect = parts[0].strip()
        aliases_str = parts[1].strip()
        confounded = [a.strip().lstrip("+-") for a in aliases_str.split(" + ")]
        confounding_list.append({
            "effect": effect,
            "confounded_with": confounded,
        })
    return {"confounding": confounding_list}


def _effect_order(term: str, factor_names: list[str]) -> int:
    """Determine the order of an effect term (1=main, 2=2FI, etc.)."""
    if ":" in term:
        return term.count(":") + 1
    if term in factor_names:
        return 1
    return len(term)  # approximate for single-char factor names


def _compute_clear_effects(ctx: _EvalContext) -> dict[str, Any]:
    """Identify effects whose aliases are all of higher order."""
    alias_result = _compute_alias_structure(ctx)
    alias_chains = alias_result.get("alias_structure", [])

    clear_main: list[str] = []
    clear_2fi: list[str] = []

    for chain in alias_chains:
        if " = " not in chain:
            continue
        effect, aliases_str = chain.split(" = ", 1)
        effect = effect.strip()
        order = _effect_order(effect, ctx.factor_names)

        alias_terms = [a.strip().lstrip("+-") for a in aliases_str.strip().split(" + ")]
        all_higher = all(_effect_order(a, ctx.factor_names) > order for a in alias_terms)

        if all_higher and order == 1:
            clear_main.append(effect)
        elif all_higher and order == 2:
            clear_2fi.append(effect)

    return {
        "clear_effects": {
            "main_effects": clear_main,
            "two_factor_interactions": clear_2fi,
        }
    }


def _compute_minimum_aberration(ctx: _EvalContext) -> dict[str, Any]:
    """Wordlength pattern (A_3, A_4, ...) from the defining relation."""
    if not ctx.generators:
        return {
            "minimum_aberration": {
                "wordlength_pattern": [],
                "note": "Not a fractional factorial design.",
            }
        }

    words = _defining_relation_from_generators(ctx.generators, ctx.factor_names)
    if not words:
        return {
            "minimum_aberration": {
                "wordlength_pattern": [],
                "note": "No defining relation words found.",
            }
        }

    lengths = [len(w) for w in words]
    max_len = max(lengths) if lengths else 0
    # Wordlength pattern: A_i = number of words of length i, starting at i=3
    pattern = [lengths.count(i) for i in range(3, max_len + 1)]

    return {
        "minimum_aberration": {
            "wordlength_pattern": pattern,
            "wordlength_pattern_labels": [f"A_{i}" for i in range(3, max_len + 1)],
        }
    }


# ---------------------------------------------------------------------------
# Metric dispatch registry
# ---------------------------------------------------------------------------

_METRIC_REGISTRY: dict[str, Callable[[_EvalContext], Any]] = {
    "d_efficiency": _compute_d_efficiency,
    "i_efficiency": _compute_i_efficiency,
    "g_efficiency": _compute_g_efficiency,
    "a_optimality": _compute_a_optimality,
    "e_optimality": _compute_e_optimality,
    "correlation": _compute_correlation,
    "alias_matrix": _compute_alias_matrix,
    "fds": _compute_fds,
    "prediction_variance": _compute_prediction_variance,
    "vif": _compute_vif,
    "condition_number": _compute_condition_number,
    "power": _compute_power,
    "degrees_of_freedom": _compute_degrees_of_freedom,
    "alias_structure": _compute_alias_structure,
    "confounding": _compute_confounding,
    "resolution": _compute_resolution,
    "defining_relation": _compute_defining_relation,
    "clear_effects": _compute_clear_effects,
    "minimum_aberration": _compute_minimum_aberration,
}


# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------


[docs] def evaluate_design( # noqa: PLR0913 design_matrix: pd.DataFrame | DesignResult, model: str | None = None, metric: str | list[str] = "d_efficiency", effect_size: float | None = None, alpha: float = 0.05, sigma: float | None = None, region: str = "cuboidal", n_samples: int = 100_000, include_vertices: bool = True, random_seed: int = 42, fds_resolution: int | None = None, ) -> dict[str, Any]: """Compute quality metrics for an experimental design. Parameters ---------- design_matrix : DataFrame or DesignResult The design to evaluate. If a :class:`DesignResult` is passed, the coded design matrix and any generator / defining-relation metadata are extracted automatically. model : str or None Model type: ``"main_effects"``, ``"interactions"``, ``"quadratic"``, or an explicit patsy formula. ``None`` defaults to ``"interactions"``. metric : str or list[str] One or more metric names to compute, or the special value ``"all"`` to compute every metric. Valid names: ``"d_efficiency"``, ``"i_efficiency"``, ``"g_efficiency"``, ``"a_optimality"``, ``"e_optimality"``, ``"correlation"``, ``"alias_matrix"``, ``"fds"``, ``"prediction_variance"``, ``"vif"``, ``"condition_number"``, ``"power"``, ``"degrees_of_freedom"``, ``"alias_structure"``, ``"confounding"``, ``"resolution"``, ``"defining_relation"``, ``"clear_effects"``, ``"minimum_aberration"``. effect_size : float or None Expected effect size for power calculation. When *None*, a power curve over a range of effect sizes is returned instead. alpha : float Significance level for power calculation (default 0.05). sigma : float or None Estimated noise standard deviation. Defaults to 1.0 when needed but not provided. region : {"cuboidal", "spherical"} Design region over which the region-based metrics (``i_efficiency``, ``g_efficiency``, ``fds``) integrate the prediction variance. ``"cuboidal"`` (default) is ``[-1, 1]^k``; ``"spherical"`` is the ball of radius ``sqrt(k)``. n_samples : int Number of random samples drawn over the region (default 100,000). include_vertices : bool When *True* (default), all ``2**k`` cube vertices are added to the region sample so the worst-case (G) value at a corner is represented. random_seed : int Seed for the region sampler (full reproducibility). fds_resolution : int or None Resolution of the dense FDS curve. When *None* (default) the ``fds`` metric returns only the coarse 11-point ``quantiles`` summary. When set (e.g. 200), a ``curve`` sub-dict with length-``fds_resolution`` ``fraction`` / ``prediction_variance`` / ``scaled_prediction_variance`` arrays is added for smooth plotting; the endpoints are the minimum and maximum prediction variance. Returns ------- dict[str, Any] Results keyed by metric name. The structure of each value depends on the metric - see individual metric documentation. Examples -------- >>> from process_improve.experiments import evaluate_design, generate_design, Factor >>> factors = [Factor(name="A", low=0, high=10), Factor(name="B", low=0, high=10)] >>> result = generate_design(factors, design_type="full_factorial", center_points=0) >>> metrics = evaluate_design(result, model="main_effects", metric="d_efficiency") >>> metrics["d_efficiency"] # doctest: +SKIP 100.0 """ # --- Unpack input --- generators: list[str] | None = None defining_relation: list[str] | None = None resolution: int | None = None if isinstance(design_matrix, DesignResult): generators = design_matrix.generators defining_relation = design_matrix.defining_relation resolution = design_matrix.resolution factor_names = list(design_matrix.factor_names) design_df = pd.DataFrame(design_matrix.design) else: design_df = pd.DataFrame(design_matrix) factor_names = list(design_df.columns) # Drop non-factor columns for col in ["RunOrder", "Block"]: if col in design_df.columns and col not in factor_names: design_df = design_df.drop(columns=[col]) elif col in factor_names: factor_names.remove(col) design_df = design_df.drop(columns=[col]) # --- Normalize metric to list --- if metric == "all": metrics = list(_METRIC_REGISTRY) elif isinstance(metric, str): metrics = [metric] else: metrics = list(metric) logger.debug("evaluate_design: model=%r, metrics=%s", model, metrics) # Validate metric names unknown = [m for m in metrics if m not in _METRIC_REGISTRY] if unknown: available = sorted(_METRIC_REGISTRY.keys()) raise ValueError(f"Unknown metric(s): {unknown}. Available metrics: {available}") # --- Build context --- ctx = _build_context( _EvalRequest( design_df=design_df, factor_names=factor_names, model=model, generators=generators, defining_relation=defining_relation, resolution=resolution, effect_size=effect_size, alpha=alpha, sigma=sigma, region=region, n_samples=n_samples, include_vertices=include_vertices, random_seed=random_seed, fds_resolution=fds_resolution, ) ) # --- Compute requested metrics --- results: dict[str, Any] = {} for m in metrics: result = _METRIC_REGISTRY[m](ctx) results.update(result) return results
[docs] def evaluate_all( # noqa: PLR0913 design_matrix: pd.DataFrame | DesignResult, model: str | None = None, effect_size: float | None = None, alpha: float = 0.05, sigma: float | None = None, region: str = "cuboidal", n_samples: int = 100_000, include_vertices: bool = True, random_seed: int = 42, fds_resolution: int | None = None, ) -> dict[str, Any]: """Compute *every* available metric for a design in one call. Thin convenience wrapper around :func:`evaluate_design` with ``metric="all"`` so callers need not enumerate the metric list. All parameters have the same meaning as in :func:`evaluate_design`. Returns ------- dict[str, Any] Results keyed by metric name (the union of every registered metric). See Also -------- evaluate_design : Compute one or more named metrics. """ return evaluate_design( design_matrix, model=model, metric="all", effect_size=effect_size, alpha=alpha, sigma=sigma, region=region, n_samples=n_samples, include_vertices=include_vertices, random_seed=random_seed, fds_resolution=fds_resolution, )