# (c) Kevin Dunn, 2010-2026. MIT License.
"""Response optimization for designed experiments (Tool 4).
Find optimal factor settings for one or multiple responses after fitting
a model with :func:`analyze_experiment` (Tool 3).
Implemented methods
-------------------
- **desirability** - Derringer-Suich desirability functions (single and
multi-response) with ``scipy.optimize.minimize`` (SLSQP).
- **steepest_ascent** / **steepest_descent** - Move along the gradient
of a first-order model from the design centre.
- **stationary_point** - Locate the stationary point of a second-order
model via ``numpy.linalg.solve``.
- **canonical_analysis** - Eigenvalue decomposition of the *B* matrix
to classify the stationary point (max / min / saddle).
Stubs (not yet implemented)
---------------------------
- **ridge_analysis** - Trace the optimum along increasing radii.
- **pareto_front** - Multi-objective Pareto frontier (NSGA-II).
"""
from __future__ import annotations
import logging
import re
from collections.abc import Callable
from typing import Any
import numpy as np
from scipy import optimize
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
_METHODS = {
"desirability",
"steepest_ascent",
"steepest_descent",
"stationary_point",
"canonical_analysis",
"ridge_analysis",
"pareto_front",
}
# ---------------------------------------------------------------------------
# Model evaluation layer
# ---------------------------------------------------------------------------
def _parse_term(term: str) -> tuple[str, ...]:
"""Classify a coefficient term name into its components.
Returns
-------
tuple[str, ...]
Empty tuple for ``"Intercept"``, single-element for linear,
``("A", "B")`` for interaction ``"A:B"``, ``("A", "A")`` for
quadratic ``"I(A ** 2)"``.
"""
if term == "Intercept":
return ()
# Quadratic: ``I(A ** 2)`` (older statsmodels) or
# ``np.power(A, 2)`` / ``power(A, 2)`` (newer). Both spellings
# appear in the wild depending on the installed statsmodels /
# patsy version. SEC-27 (#276): if either is missed, the term
# silently falls through to the linear branch and the
# downstream surface / optimisation produces wrong results.
m = re.match(r"I\((\w+)\s*\*\*\s*2\)", term) or re.match(
r"(?:np\.)?power\((\w+)\s*,\s*2\)", term
)
if m:
name = m.group(1)
return (name, name)
# Interaction: A:B
if ":" in term:
parts = term.split(":")
return tuple(parts)
# Linear: plain factor name
return (term,)
def _build_model_evaluator(
coefficients: list[dict[str, Any]],
factor_names: list[str],
) -> Callable[[np.ndarray], float]:
"""Return a function ``f(point) -> float`` that evaluates the model.
Parameters
----------
coefficients : list[dict]
Each dict has ``"term"`` and ``"coefficient"`` keys, as returned
by ``analyze_experiment(..., analysis_type="coefficients")``.
factor_names : list[str]
Ordered factor names (e.g. ``["A", "B"]``).
Returns
-------
callable
``f(x)`` where *x* is a 1-D array of coded factor values in the
same order as *factor_names*.
"""
name_to_idx = {n: i for i, n in enumerate(factor_names)}
parsed: list[tuple[tuple[str, ...], float]] = []
for entry in coefficients:
term = entry["term"]
coef = float(entry["coefficient"])
parsed.append((_parse_term(term), coef))
def _eval(x: np.ndarray) -> float:
y = 0.0
for components, coef in parsed:
if len(components) == 0:
# Intercept
y += coef
elif len(components) == 1:
# Linear
y += coef * x[name_to_idx[components[0]]]
elif len(components) == 2:
# Interaction or quadratic
y += coef * x[name_to_idx[components[0]]] * x[name_to_idx[components[1]]]
else:
# Higher-order (unusual but handle gracefully)
val = 1.0
for c in components:
val *= x[name_to_idx[c]]
y += coef * val
return y
return _eval
[docs]
def evaluate_model(
coefficients: list[dict[str, Any]],
factor_names: list[str],
point: dict[str, float],
) -> float:
"""Evaluate predicted response at an arbitrary coded point.
Parameters
----------
coefficients : list[dict]
Coefficient list from ``analyze_experiment``.
factor_names : list[str]
Ordered factor names.
point : dict[str, float]
Factor settings in coded units, e.g. ``{"A": 0.5, "B": -1.0}``.
Returns
-------
float
Predicted response value.
"""
f = _build_model_evaluator(coefficients, factor_names)
x = np.array([point[n] for n in factor_names], dtype=float)
return float(f(x))
# ---------------------------------------------------------------------------
# Extract b vector and B matrix from second-order model
# ---------------------------------------------------------------------------
def _extract_b_and_B( # noqa: N802
coefficients: list[dict[str, Any]],
factor_names: list[str],
) -> tuple[float, np.ndarray, np.ndarray]:
"""Extract intercept, linear vector *b* and quadratic matrix *B*.
For a second-order model ``y = b0 + b'x + x'Bx``, returns
``(b0, b, B)`` where *B* is symmetric with off-diagonal elements
equal to half the interaction coefficients.
"""
k = len(factor_names)
name_to_idx = {n: i for i, n in enumerate(factor_names)}
b0 = 0.0
b = np.zeros(k)
B = np.zeros((k, k))
for entry in coefficients:
term = entry["term"]
coef = float(entry["coefficient"])
components = _parse_term(term)
if len(components) == 0:
b0 = coef
elif len(components) == 1:
b[name_to_idx[components[0]]] = coef
elif len(components) == 2:
i = name_to_idx[components[0]]
j = name_to_idx[components[1]]
if i == j:
# Quadratic term: coefficient is the diagonal of B
B[i, i] = coef
else:
# Interaction: split equally across B[i,j] and B[j,i]
B[i, j] = coef / 2.0
B[j, i] = coef / 2.0
return b0, b, B
# ---------------------------------------------------------------------------
# Stationary point
# ---------------------------------------------------------------------------
def _find_stationary_point(
coefficients: list[dict[str, Any]],
factor_names: list[str],
factor_ranges: dict[str, dict[str, float]] | None = None,
) -> dict[str, Any]:
"""Find the stationary point of a second-order response surface model.
Solves ``2*B*x_s + b = 0`` for ``x_s``.
Parameters
----------
coefficients : list[dict]
Model coefficients.
factor_names : list[str]
Ordered factor names.
factor_ranges : dict or None
Maps factor name to ``{"low": float, "high": float}`` in actual
units. Used to convert coded → actual.
Returns
-------
dict
``stationary_point_coded``, ``stationary_point_actual``,
``predicted_response``, ``classification``.
"""
b0, b, B = _extract_b_and_B(coefficients, factor_names)
# Check that B has quadratic terms (not purely first-order)
if np.allclose(B, 0):
return {"error": "Model has no quadratic or interaction terms - cannot find stationary point."}
try:
# Solve 2*B*x_s = -b
x_s = np.linalg.solve(2.0 * B, -b)
except np.linalg.LinAlgError:
return {"error": "Singular B matrix - stationary point does not exist."}
# Predicted response at stationary point
y_s = float(b0 + b @ x_s + x_s @ B @ x_s)
# Classification from eigenvalues
eigenvalues = np.linalg.eigvalsh(B)
if np.all(eigenvalues < 0):
classification = "maximum"
elif np.all(eigenvalues > 0):
classification = "minimum"
else:
classification = "saddle_point"
# Check if stationary point is inside the design space (coded [-1, 1])
inside_design_space = bool(np.all(np.abs(x_s) <= 1.0))
result: dict[str, Any] = {
"stationary_point_coded": {n: float(x_s[i]) for i, n in enumerate(factor_names)},
"predicted_response": y_s,
"classification": classification,
"eigenvalues": [float(e) for e in eigenvalues],
"inside_design_space": inside_design_space,
}
if factor_ranges:
actual = {}
for i, name in enumerate(factor_names):
if name in factor_ranges:
lo = factor_ranges[name]["low"]
hi = factor_ranges[name]["high"]
center = (lo + hi) / 2.0
half_range = (hi - lo) / 2.0
actual[name] = center + x_s[i] * half_range
else:
actual[name] = float(x_s[i])
result["stationary_point_actual"] = actual
return result
# ---------------------------------------------------------------------------
# Canonical analysis
# ---------------------------------------------------------------------------
def _canonical_analysis(
coefficients: list[dict[str, Any]],
factor_names: list[str],
) -> dict[str, Any]:
"""Canonical analysis of a second-order response surface model.
Computes eigenvalues and eigenvectors of the *B* matrix to determine
the shape and orientation of the response surface.
Returns
-------
dict
``eigenvalues``, ``eigenvectors``, ``classification``,
``canonical_form_description``.
"""
_b0, _b, B = _extract_b_and_B(coefficients, factor_names)
if np.allclose(B, 0):
return {"error": "Model has no quadratic or interaction terms - canonical analysis not applicable."}
eigenvalues, eigenvectors = np.linalg.eigh(B)
# Sort by absolute value (largest first)
order = np.argsort(-np.abs(eigenvalues))
eigenvalues = eigenvalues[order]
eigenvectors = eigenvectors[:, order]
if np.all(eigenvalues < 0):
classification = "maximum"
elif np.all(eigenvalues > 0):
classification = "minimum"
else:
classification = "saddle_point"
desc_parts = []
for i, ev in enumerate(eigenvalues):
w_name = f"W{i + 1}"
direction = "concave" if ev < 0 else "convex"
desc_parts.append(f"{w_name}: eigenvalue={ev:.4f} ({direction})")
return {
"eigenvalues": [float(e) for e in eigenvalues],
"eigenvectors": [[float(v) for v in eigenvectors[:, i]] for i in range(len(eigenvalues))],
"classification": classification,
"canonical_form_description": desc_parts,
"factor_names": factor_names,
}
# ---------------------------------------------------------------------------
# Steepest ascent / descent
# ---------------------------------------------------------------------------
def _steepest_path( # noqa: PLR0913
coefficients: list[dict[str, Any]],
factor_names: list[str],
step_size: float = 0.5,
n_steps: int = 10,
direction: str = "ascent",
factor_ranges: dict[str, dict[str, float]] | None = None,
) -> dict[str, Any]:
"""Generate a table of steps along the steepest ascent (or descent).
Uses only the first-order (linear) coefficients to determine
direction. Steps start at the design centre (all coded = 0).
Parameters
----------
coefficients : list[dict]
Model coefficients.
factor_names : list[str]
Ordered factor names.
step_size : float
Step magnitude in coded units (default 0.5).
n_steps : int
Number of steps to take away from the design centre (default 10).
The returned ``steps`` list has ``n_steps + 1`` entries because it
also includes step 0 at the centre.
direction : str
``"ascent"`` or ``"descent"``.
factor_ranges : dict or None
For coded → actual conversion.
Returns
-------
dict
``steps`` list and ``direction_vector``.
"""
evaluator = _build_model_evaluator(coefficients, factor_names)
# Extract linear coefficients only
name_to_idx = {n: i for i, n in enumerate(factor_names)}
b = np.zeros(len(factor_names))
for entry in coefficients:
components = _parse_term(entry["term"])
if len(components) == 1 and components[0] in name_to_idx:
b[name_to_idx[components[0]]] = float(entry["coefficient"])
if np.allclose(b, 0):
return {"error": "All linear coefficients are zero - no steepest direction."}
# Direction: normalize, then scale by step_size
norm = np.linalg.norm(b)
direction_vec = b / norm
if direction == "descent":
direction_vec = -direction_vec
steps = []
for step_num in range(n_steps + 1):
x_coded = direction_vec * step_size * step_num
predicted = float(evaluator(x_coded))
step_entry: dict[str, Any] = {
"step": step_num,
"coded": {n: float(x_coded[i]) for i, n in enumerate(factor_names)},
"predicted_response": predicted,
}
if factor_ranges:
actual = {}
for i, name in enumerate(factor_names):
if name in factor_ranges:
lo = factor_ranges[name]["low"]
hi = factor_ranges[name]["high"]
center = (lo + hi) / 2.0
half_range = (hi - lo) / 2.0
actual[name] = center + x_coded[i] * half_range
else:
actual[name] = float(x_coded[i])
step_entry["actual"] = actual
steps.append(step_entry)
return {
"direction": direction,
"direction_vector": {n: float(direction_vec[i]) for i, n in enumerate(factor_names)},
"step_size": step_size,
"steps": steps,
}
# ---------------------------------------------------------------------------
# Derringer-Suich desirability functions
# ---------------------------------------------------------------------------
def _desirability_maximize(y: float, low: float, high: float, weight: float = 1.0) -> float:
"""One-sided desirability for maximisation.
d = 0 if y <= low, d = ((y - low) / (high - low))^weight if
low < y < high, d = 1 if y >= high.
"""
if y <= low:
return 0.0
if y >= high:
return 1.0
return ((y - low) / (high - low)) ** weight
def _desirability_minimize(y: float, low: float, high: float, weight: float = 1.0) -> float:
"""One-sided desirability for minimisation.
d = 1 if y <= low, d = ((high - y) / (high - low))^weight if
low < y < high, d = 0 if y >= high.
"""
if y <= low:
return 1.0
if y >= high:
return 0.0
return ((high - y) / (high - low)) ** weight
def _desirability_target( # noqa: PLR0913
y: float,
low: float,
target: float,
high: float,
weight_low: float = 1.0,
weight_high: float = 1.0,
) -> float:
"""Two-sided desirability for a target value.
Ramps up from *low* to *target*, then ramps down from *target* to
*high*. Returns 0 outside ``[low, high]``.
"""
if y <= low or y >= high:
return 0.0
if y <= target:
return ((y - low) / (target - low)) ** weight_low
return ((high - y) / (high - target)) ** weight_high
def _individual_desirability(y: float, goal: dict[str, Any]) -> float:
"""Compute individual desirability for a single response value."""
goal_type = goal["goal"]
low = goal.get("low", 0.0)
high = goal.get("high", 1.0)
weight = goal.get("weight", 1.0)
if goal_type == "maximize":
return _desirability_maximize(y, low, high, weight)
if goal_type == "minimize":
return _desirability_minimize(y, low, high, weight)
if goal_type == "target":
target = goal["target"]
return _desirability_target(y, low, target, high, weight, weight)
msg = f"Unknown goal type: {goal_type!r}. Use 'maximize', 'minimize', or 'target'."
raise ValueError(msg)
def _composite_desirability(d_values: list[float], importances: list[float] | None = None) -> float:
"""Weighted geometric mean of individual desirability values.
D = (d1^w1 * d2^w2 * ... * dk^wk) ^ (1 / sum(wi))
If any d_i is zero, the composite is zero.
"""
if not d_values:
return 0.0
weights = importances or [1.0] * len(d_values)
# If any desirability is zero, composite is zero
if any(d == 0.0 for d in d_values):
return 0.0
log_d = sum(w * np.log(d) for d, w in zip(d_values, weights, strict=True))
w_sum = sum(weights)
if w_sum == 0:
return 0.0
return float(np.exp(log_d / w_sum))
def _optimize_desirability( # noqa: PLR0913
fitted_models: list[dict[str, Any]],
goals: list[dict[str, Any]],
factor_names: list[str],
factor_ranges: dict[str, dict[str, float]] | None = None,
importances: list[float] | None = None,
random_state: int | np.random.Generator | None = 42,
) -> dict[str, Any]:
"""Optimise composite desirability using scipy SLSQP.
Parameters
----------
fitted_models : list[dict]
Each has ``"coefficients"`` and ``"response_name"``.
goals : list[dict]
Per-response goals.
factor_names : list[str]
Ordered factor names.
factor_ranges : dict or None
Factor bounds in actual units.
importances : list[float] or None
Relative importance weights for composite desirability.
Returns
-------
dict
Optimal settings, predicted responses, individual and composite
desirability.
"""
evaluators = [_build_model_evaluator(m["coefficients"], factor_names) for m in fitted_models]
def neg_composite(x: np.ndarray) -> float:
"""Return the negated composite desirability at coded settings ``x``, for minimization."""
d_vals = []
for evaluator, goal in zip(evaluators, goals, strict=True):
y_pred = evaluator(x)
d = _individual_desirability(y_pred, goal)
d_vals.append(d)
return -_composite_desirability(d_vals, importances)
k = len(factor_names)
bounds = [(-1.0, 1.0)] * k
# Multi-start: try centre + random points.
# SEC-33 (#282): the hard-coded ``42`` moved to the public signature
# ``random_state=42`` (default preserves the previous deterministic
# behaviour). Resolved via the ENG-08 helper.
from process_improve._random import check_random_state # noqa: PLC0415
rng = check_random_state(random_state)
best_result = None
best_value = np.inf
starting_points = [np.zeros(k), *[rng.uniform(-1, 1, size=k) for _ in range(9)]]
for x0 in starting_points:
res = optimize.minimize(neg_composite, x0, method="SLSQP", bounds=bounds)
if res.fun < best_value:
best_value = res.fun
best_result = res
if best_result is None:
msg = "optimization produced no result"
raise RuntimeError(msg)
x_opt = best_result.x
composite_d = -best_value
# Evaluate individual responses and desirabilities at optimum
predictions = {}
individual_d = {}
for evaluator, model_dict, goal in zip(evaluators, fitted_models, goals, strict=True):
resp_name = model_dict.get("response_name", "response")
y_pred = float(evaluator(x_opt))
predictions[resp_name] = y_pred
individual_d[resp_name] = _individual_desirability(y_pred, goal)
result: dict[str, Any] = {
"optimal_coded": {n: float(x_opt[i]) for i, n in enumerate(factor_names)},
"predicted_responses": predictions,
"individual_desirability": individual_d,
"composite_desirability": composite_d,
"optimizer_success": bool(best_result.success),
}
if factor_ranges:
actual = {}
for i, name in enumerate(factor_names):
if name in factor_ranges:
lo = factor_ranges[name]["low"]
hi = factor_ranges[name]["high"]
center = (lo + hi) / 2.0
half_range = (hi - lo) / 2.0
actual[name] = center + x_opt[i] * half_range
else:
actual[name] = float(x_opt[i])
result["optimal_actual"] = actual
return result
# ---------------------------------------------------------------------------
# Stubs for future implementation
# ---------------------------------------------------------------------------
def _ridge_analysis(
coefficients: list[dict[str, Any]],
factor_names: list[str],
factor_ranges: dict[str, dict[str, float]] | None = None,
) -> dict[str, Any]:
"""Ridge analysis - trace the optimum along increasing radii.
.. note::
Not yet implemented. Planned: constrained eigenvalue computation
tracing the optimum on spheres of increasing radius from the
design centre when the stationary point lies outside the design
space.
"""
return {
"error": (
"Ridge analysis is not yet implemented. "
"Use 'stationary_point' or 'canonical_analysis' as alternatives."
),
"status": "stub",
}
def _pareto_front(
fitted_models: list[dict[str, Any]],
goals: list[dict[str, Any]],
factor_names: list[str],
factor_ranges: dict[str, dict[str, float]] | None = None,
) -> dict[str, Any]:
"""Multi-objective Pareto frontier.
.. note::
Not yet implemented. Planned: multi-start ``scipy.optimize``
wrapper or ``pymoo`` NSGA-II for true Pareto frontiers with
many responses.
"""
return {
"error": (
"Pareto front is not yet implemented. "
"Use 'desirability' for multi-response optimization instead."
),
"status": "stub",
}
# ---------------------------------------------------------------------------
# Coded ↔ actual conversion helpers
# ---------------------------------------------------------------------------
def _coded_to_actual(coded: dict[str, float], factor_ranges: dict[str, dict[str, float]]) -> dict[str, float]:
"""Convert coded factor settings to actual units."""
actual = {}
for name, coded_val in coded.items():
if name in factor_ranges:
lo = factor_ranges[name]["low"]
hi = factor_ranges[name]["high"]
center = (lo + hi) / 2.0
half_range = (hi - lo) / 2.0
actual[name] = center + coded_val * half_range
else:
actual[name] = coded_val
return actual
# ---------------------------------------------------------------------------
# Public API - dispatcher
# ---------------------------------------------------------------------------
[docs]
def optimize_responses( # noqa: PLR0913, C901
fitted_models: list[dict[str, Any]],
goals: list[dict[str, Any]] | None = None,
method: str = "desirability",
factor_ranges: dict[str, dict[str, float]] | None = None,
step_size: float = 0.5,
n_steps: int = 10,
desirability_weights: list[float] | None = None,
) -> dict[str, Any]:
"""Find optimal factor settings for one or multiple responses.
Parameters
----------
fitted_models : list[dict]
Each dict describes a fitted model with keys:
- ``"response_name"`` (str) - name of the response.
- ``"coefficients"`` (list[dict]) - coefficient list, each with
``"term"`` and ``"coefficient"`` keys as returned by
``analyze_experiment(..., analysis_type="coefficients")``.
- ``"factor_names"`` (list[str]) - ordered factor names.
- ``"mse_residual"`` (float, optional) - mean squared error.
- ``"r_squared"`` (float, optional) - model R-squared.
goals : list[dict] or None
Per-response optimisation goals. Each dict has keys:
- ``"response"`` (str) - response name (must match a model).
- ``"goal"`` (str) - ``"maximize"``, ``"minimize"``, or
``"target"``.
- ``"target"`` (float, optional) - target value (required when
``goal="target"``).
- ``"low"`` (float) - lower acceptable bound.
- ``"high"`` (float) - upper acceptable bound.
- ``"weight"`` (float, default 1) - desirability shape parameter.
- ``"importance"`` (float, default 1) - relative importance for
composite desirability.
method : str
Optimisation method: ``"desirability"``,
``"steepest_ascent"``, ``"steepest_descent"``,
``"stationary_point"``, ``"canonical_analysis"``,
``"ridge_analysis"`` (stub), ``"pareto_front"`` (stub).
factor_ranges : dict or None
Maps factor name to ``{"low": float, "high": float}`` in actual
units. Used for coded ↔ actual conversion.
step_size : float
Step magnitude for steepest ascent/descent (coded units).
n_steps : int
Number of steps for steepest ascent/descent.
desirability_weights : list[float] or None
Importance weights for composite desirability (overrides per-goal
``"importance"`` values).
Returns
-------
dict[str, Any]
Results keyed by method. Always includes ``"method"`` and
``"factor_names"``.
Examples
--------
>>> from process_improve.experiments.optimization import optimize_responses
>>> model = {
... "response_name": "yield",
... "coefficients": [
... {"term": "Intercept", "coefficient": 40.0},
... {"term": "A", "coefficient": 5.25},
... {"term": "B", "coefficient": -2.0},
... {"term": "I(A ** 2)", "coefficient": -3.0},
... {"term": "I(B ** 2)", "coefficient": -1.5},
... {"term": "A:B", "coefficient": 1.5},
... ],
... "factor_names": ["A", "B"],
... }
>>> result = optimize_responses(
... fitted_models=[model],
... method="stationary_point",
... )
>>> result["stationary_point"]["classification"]
'maximum'
"""
logger.debug("optimize_responses: method=%r, %d fitted model(s)", method, len(fitted_models))
if method not in _METHODS:
available = sorted(_METHODS)
msg = f"Unknown method {method!r}. Available: {available}"
raise ValueError(msg)
if not fitted_models:
msg = "At least one fitted model is required."
raise ValueError(msg)
# Use factor_names from the first model as the canonical ordering
factor_names = fitted_models[0]["factor_names"]
coefficients = fitted_models[0]["coefficients"]
result: dict[str, Any] = {"method": method, "factor_names": factor_names}
if method == "stationary_point":
result["stationary_point"] = _find_stationary_point(coefficients, factor_names, factor_ranges)
elif method == "canonical_analysis":
result["canonical_analysis"] = _canonical_analysis(coefficients, factor_names)
# Also include the stationary point for context
result["stationary_point"] = _find_stationary_point(coefficients, factor_names, factor_ranges)
elif method in ("steepest_ascent", "steepest_descent"):
direction = "ascent" if method == "steepest_ascent" else "descent"
result["steepest_path"] = _steepest_path(
coefficients, factor_names, step_size, n_steps, direction, factor_ranges
)
elif method == "desirability":
if goals is None:
msg = "Goals are required for desirability optimization."
raise ValueError(msg)
importances = desirability_weights
if importances is None:
importances = [g.get("importance", 1.0) for g in goals]
result["desirability"] = _optimize_desirability(
fitted_models, goals, factor_names, factor_ranges, importances
)
elif method == "ridge_analysis":
result["ridge_analysis"] = _ridge_analysis(coefficients, factor_names, factor_ranges)
elif method == "pareto_front":
if goals is None:
msg = "Goals are required for Pareto front optimization."
raise ValueError(msg)
result["pareto_front"] = _pareto_front(fitted_models, goals, factor_names, factor_ranges)
return result