Source code for process_improve.experiments.structures

# (c) Kevin Dunn, 2010-2026. MIT License. Based on own private work over the years.

from __future__ import annotations

import itertools
from collections import defaultdict
from collections.abc import Iterable
from typing import ClassVar, cast

import numpy as np
import pandas as pd


[docs] class Column(pd.Series): """Create a column. Can be used as a factor, or a response vector.""" # https://pandas.pydata.org/pandas-docs/stable/development/extending.html # Temporary properties _internal_names: ClassVar[list[str]] = [*pd.DataFrame._internal_names, "not_used_for_now"] # type: ignore[attr-defined] # _internal_names exists at runtime but is missing from pandas stubs _internal_names_set: ClassVar[set[str]] = set(_internal_names) # Properties which survive subsetting, etc _metadata: ClassVar[list[str]] = [ "pi_index", # might be used later if the user provides their own index "pi_numeric", # if numeric indicator "pi_lo", # if numeric: low level (-1) "pi_hi", # if numeric: high level (+1) "pi_range", # if numeric: range: distance from low to high "pi_center", # if numeric: midway between low and high (0) "pi_is_coded", # is it a coded variables, or in real-world units "pi_units", # string variable, containing the units "pi_name", # name of the column ] # Declared for static typing only. These are populated at runtime via the # pandas ``_metadata`` mechanism (bare annotations create no class-level # attribute, so the pandas attribute machinery is untouched). pi_index: bool pi_numeric: bool pi_lo: float | None pi_hi: float | None pi_range: tuple | None pi_center: float | None pi_is_coded: bool pi_units: str | None pi_name: str | None pi_levels: dict @property def _constructor(self) -> type[Column]: return Column
[docs] def to_coded(self, center: float | None = None, range: tuple | None = None) -> Column: # noqa: A002 """Convert the column vector to coded units.""" out = self.copy(deep=True) if self.pi_is_coded: return out x_center = center or self.pi_center x_range = range or self.pi_range # Simply override the values and the `pi_is_coded` flag, but all the # rest remains as is. out.iloc[:] = (np.asarray(self.values) - x_center) / (0.5 * np.diff(np.asarray(x_range))[0]) out.pi_is_coded = True if out.pi_name: out.name = f"{out.pi_name} [coded]" return out
[docs] def to_realworld(self, center: float | None = None, range: tuple | None = None) -> Column: # noqa: A002 """Convert the column vector to real-world units.""" out = self.copy(deep=True) if not self.pi_is_coded: return out x_center = center or self.pi_center x_range = range or self.pi_range # Simply override the values and the `pi_is_coded` flag, but all the # rest remains as is. out.iloc[:] = np.asarray(self.values) * (0.5 * np.diff(np.asarray(x_range))[0]) + x_center out.pi_is_coded = False if out.pi_name and out.pi_units: out.name = f"{out.pi_name} [{out.pi_units}]" return out
[docs] def copy(self, deep: bool = True) -> Column: # type: ignore[misc] # pandas marks Series.copy @final; subclassing is intentional here """Create a copy of this Column, preserving the name.""" out = pd.Series.copy(self, deep=deep) out.name = self.name return cast("Column", out)
[docs] def extend(self, values: list) -> Column: """Extend the column with the list of new values.""" if not isinstance(values, list): raise TypeError( f"'values' must be a list; got {type(values).__name__}." ) prior_n = self.index[-1] index = list(range(prior_n + 1, prior_n + len(values) + 1)) new = pd.Series(data=values, index=index) intermediate = self.copy(deep=True) intermediate = pd.concat([intermediate, new]) # Carry the meta data over. pd.concat does not propagate custom # metadata from subclassed Series. for key in self._metadata: setattr(intermediate, key, getattr(self, key)) intermediate.name = self.name return intermediate
[docs] class Expt(pd.DataFrame): """Dataframe carrying experimental data plus process-improve metadata. ``Expt`` (short for "Experiment") is a :class:`pandas.DataFrame` subclass that adds library-managed metadata fields prefixed with ``pi_`` -- short for "process-improve". The prefix is what keeps these reserved attribute names from colliding with column names from a caller-supplied DataFrame. Pinned metadata (preserved across subsetting via ``_metadata``): - ``pi_title`` -- short human-readable name for the dataset - ``pi_source`` -- provenance string (file path, URL, ...) - ``pi_units`` -- units string for the numeric columns Other ``pi_*`` attributes (``pi_range``, ``pi_lo``, ``pi_hi``, ``pi_center``, ``pi_name``) are set by the experiments factory helpers in this module; see :func:`expt` / :func:`create_names`. The ``pi_`` prefix is documented in ``CONTRIBUTING.md`` and is part of the package's public API surface; new metadata fields should follow the same prefix. """ # Temporary properties _internal_names: ClassVar[list[str]] = [*pd.DataFrame._internal_names, "not_used_for_now"] # type: ignore[attr-defined] # _internal_names exists at runtime but is missing from pandas stubs _internal_names_set: ClassVar[set[str]] = set(_internal_names) # Properties which survive subsetting, etc _metadata: ClassVar[list[str]] = ["pi_source", "pi_title", "pi_units"] # Declared for static typing only. These are populated at runtime via the # pandas ``_metadata`` mechanism (bare annotations create no class-level # attribute, so the pandas attribute machinery is untouched). pi_source: dict | None pi_title: str | None pi_units: dict | None @property def _constructor(self) -> type[Expt]: return Expt def __repr__(self) -> str: """Return a string representation of the experiment.""" title = f"Name: {self.pi_title}" dimensions = f"Size: {self.shape[0]} experiments; {self.shape[1]} columns." return "\n".join([pd.DataFrame.__repr__(self), title, dimensions])
[docs] def get_title(self) -> str: """Return the experiment title, or empty string if not set.""" return self.pi_title or ""
[docs] def create_names(n: int, letters: bool = True, prefix: str = "X", start_at: int = 1, padded: bool = True) -> list[str]: """ Return default factor names, for a given number of `n` [integer] factors. The factor name "I" is never used. If `letters` is True (default), then at most 25 factors can be returned. If `letters` is False, then the prefix is used to construct names which are the combination of the prefix and numbers, starting at `start_at`. Example: >>> create_names(5) ["A", "B", "C", "D", "E"] >>> create_names(3, letters=False) ["X1", "X2", "X3"] >>> create_names(3, letters=False, prefix='Q', start_at=9, padded=True) ["Q09", "Q10", "Q11"] """ if letters and n <= 25: out = [chr(65 + i) for i in range(n)] if "I" in out: out.remove("I") out.append(chr(65 + n)) else: longest = 0 if padded: longest = len(str(start_at + n - 1)) out = [f"{prefix!s}{str(i).rjust(longest, '0')}" for i in range(start_at, n + start_at)] return out
[docs] def c(*args, **kwargs) -> Column: # noqa: C901, PLR0912, PLR0915 """ Perform the equivalent of the R function "c(...)", to combine data elements into a :class:`Column`. Numeric entries are converted to floating point; entries are left as-is for categorical columns (when ``levels=...`` is passed, or when the entries cannot be coerced to float). Inputs ------ index: a list of names for the entries in `args` name: a name for the column Usage ----- # All equivalent ways of creating a factor, "A" A = c(-1, 0, +1, -1, +1) A = c(-1, 0, +1, -1, +1, index=['lo', 'cp', 'hi', 'lo', 'hi']) A = c( 4, 5, 6, 4, 6, range=(4, 6)) A = c( 4, 5, 6, 4, 6, center=5, range=(4, 6)) # more explicit A = c( 4, 5, 6, 4, 6, lo=4, hi=6) A = c( 4, 5, 6, 4, 6, lo=4, hi=6, name = 'A') A = c([4, 5, 6, 4, 6], lo=4, hi=6, name = 'A') A = c([4, 5, 6, 4, 6], lo=4, hi=6, name = 'A') # By default, the assumption is the variable levels supplied are coded # units. But if any one of the following: `lo`, `hi`, `center`, `range` OR # `units` are specified, then immediately it is assumed that the variable # values are not coded. # So, to force the specification, you may supply the optional input of # `coded` as True or False A = c([4, 5, 6, 4, 6], lo=1, hi=3, coded=True) A = c([4, 5, 6, 4, 6], lo=1, hi=3, coded=False, units="g/mL") # Categorical variables B = c(0, 1, 0, 1, 0, 2, levels =(0, 1, 2)) M = c("Dry", "Wet", "Dry", "Wet", levels = ("Dry", "Wet")) """ sanitize: list | pd.Series = [] numeric = True override_coded = kwargs.get("coded") if "levels" in kwargs: numeric = False for j in args: if isinstance(j, Iterable): if isinstance(j, np.ndarray): sanitize = j.ravel().tolist() if isinstance(j, pd.Series): sanitize = j.copy() if "index" not in kwargs: kwargs["index"] = sanitize.index if isinstance(j, list): sanitize = j.copy() try: sanitize = [float(j) for j in sanitize] except ValueError: numeric = False else: try: sanitize.append(float(j)) except ValueError: numeric = False sanitize.append(j) # Index creation default_idx = list(range(1, len(sanitize) + 1)) index = kwargs.get("index", default_idx) if len(index) != len(sanitize): raise IndexError('Length of "index" must match the number of numeric inputs.') out = Column(data=sanitize, index=index, name=None) # Use sensible defaults, if not provided out.pi_index = True out.pi_lo = None out.pi_hi = None out.pi_range = None out.pi_center = None out.pi_numeric = numeric out.pi_units = None out.pi_name = None out.pi_is_coded = True out.pi_name = kwargs.get("name", "Unnamed") out.name = out.pi_name if numeric: # If any of 'lo', 'hi', 'center', or 'range' are specified, then it # is assumed that the variable is NOT coded try: out.pi_lo = kwargs["lo"] out.pi_is_coded = False except KeyError: out.pi_lo = out.min() try: out.pi_hi = kwargs["hi"] out.pi_is_coded = False except KeyError: out.pi_hi = out.max() try: out.pi_range = kwargs["range"] out.pi_is_coded = False except KeyError: out.pi_range = (out.pi_lo, out.pi_hi) try: _ = (e for e in out.pi_range) except TypeError as err: raise TypeError("The `range` input must be an iterable, with 2 values.") from err if len(out.pi_range) != 2: raise ValueError( f"The `range` variable must be a tuple with 2 values; " f"got {len(out.pi_range)} value(s)." ) out.pi_range = tuple(out.pi_range) try: out.pi_center = kwargs["center"] out.pi_is_coded = False except KeyError: out.pi_center = np.mean(out.pi_range) try: out.pi_units = kwargs["units"] out.pi_is_coded = False except KeyError: out.pi_units = "" # Finally, the user might have over-ridden the coding flag: if override_coded is not None: out.pi_is_coded = override_coded elif "levels" in kwargs: levels = kwargs.get("levels") if not isinstance(levels, Iterable): raise TypeError("Levels must be list or tuple of the unique level names.") levels_list = list(levels) raw_values: list = [] for arg in args: if isinstance(arg, str) or not isinstance(arg, Iterable): raw_values.append(arg) else: raw_values.extend(list(arg)) extras = {v for v in raw_values if not pd.isna(v)} - set(levels_list) if extras: raise ValueError( f"All values must be present in `levels`. " f"Found value(s) not in levels: {sorted(extras, key=str)}." ) out.pi_levels = {out.pi_name: levels_list} else: # np.sort handles both ndarray (numeric columns) and pandas # extension arrays (e.g. StringArray for categorical columns). levels = np.sort(out.unique()) out.pi_levels = {out.pi_name: levels.tolist()} # for use with Patsy units = kwargs.get("units", "") if units and not (out.pi_is_coded): out.name = f"{out.name} [{units}]" if out.pi_is_coded: out.name = f"{out.name} [coded]" return out
[docs] def expand_grid(**kwargs: Column) -> list[Column]: """Create the expanded grid here.""" n_col = len(kwargs) itrs = [v.values for v in kwargs.values()] product = list(itertools.product(*itrs)) vals = np.fliplr(np.array(product).reshape(len(product), n_col)) out = [] for name, values in zip(kwargs.keys(), np.split(vals, n_col, axis=1), strict=False): out.append(c(values, name=name)) return out
[docs] def supplement(x: Column, **kwargs: object) -> Column: """Supplement an existing column with additional metadata (name, units, lo, hi, etc.).""" return c(x.values, **kwargs)
# (A, name = 'Feed rate', units='g/min', lo = 5, high = 8.0) # B = supplement(B, name = 'Initial inoculate amount', units = 'g', lo = 300, # hi = 400) # C = supplement(C, name = 'Feed substrate concentration', units = 'g/L', # lo = 40, hi = 60) # D = supplement(D, name = 'Dissolved oxygen set-point', units = 'mg/L', # lo = 4, hi = 5)
[docs] def gather(*args: Column, title: str | None = None, **kwargs: Column | list) -> Expt: """ Gathers the named inputs together as columns for a data frame. Removes any rows that have ANY missing values. If even 1 value in a row is missing, then that row is removed. Usage ----- expt = gather(A=A, B=B, y=y, title='My experiment in factors A and B') A multi-column input (a ``pandas.DataFrame``, e.g. a categorical factor expanded into several indicator columns) is gathered column by column. """ out = Expt(data=None, index=None, columns=None, dtype=None) out.pi_source = defaultdict(str) out.pi_units = defaultdict(str) # Every input is merged positionally (row i with row i), so they must all # contribute the same number of rows. lengths = {len(value) for value in kwargs.values()} if len(lengths) > 1: msg = f"All inputs to gather() must have the same length; got lengths {sorted(lengths)}." raise ValueError(msg) for key, value in kwargs.items(): if isinstance(value, list): out[key] = value elif isinstance(value, pd.DataFrame): # A block of two or more columns: gather each column separately. # A single-column frame keeps the original key as its name. for col_name in value.columns: sub_key = str(key) if value.shape[1] == 1 else f"{key}_{col_name}" out[sub_key] = value[col_name].to_numpy() out.pi_source[sub_key] = col_name out.pi_units[sub_key] = getattr(value, "pi_units", "") elif isinstance(value, pd.Series): out[key] = value.values out.pi_source[key] = value.name out.pi_units[key] = value.pi_units if hasattr(value, "pi_units") else "" # Drop any missing values: out = out.dropna(axis=0, how="any") # Set the title, if one was provided out.pi_title = title return out