from __future__ import annotations
import logging
from typing import cast
import numpy as np
import pandas as pd
import scipy as sp
from tqdm import tqdm
try:
import plotly.graph_objects as go
except ImportError: # pragma: no cover - exercised via env-without-plotly
from process_improve._extras import _MissingExtra
go = _MissingExtra("plotly", "plotting") # type: ignore[assignment]
from ..multivariate.methods import PCA, MCUVScaler
from .alignment_helpers import backtrack_optimal_path, distance_matrix
from .data_input import check_valid_batch_dict, dict_to_wide, melted_to_dict
logger = logging.getLogger(__name__)
epsqrt = np.sqrt(np.finfo(float).eps)
[docs]
def determine_scaling(
batches: dict[str, pd.DataFrame],
columns_to_align: list | pd.Index | None = None,
settings: dict | None = None,
) -> pd.DataFrame:
"""
Scales the batch data according to the variable ranges.
Parameters
----------
batches : dict[str, pd.DataFrame]
Batch data, in the standard format (keyed by batch identifier).
columns_to_align : list, optional
The column names (tags) to be scaled. If ``None``, the columns of the
first batch are used.
settings : dict, optional
Optional overrides. Currently supports the key ``"robust"`` (bool,
default ``True``) which switches between a robust range (q98 - q02)
and the raw (max - min) range.
Returns
-------
range_scalers: DataFrame
J rows, 2 columns: column 1 = range of each tag (approx. q98 - q02),
column 2 = typical minimum of each tag (robustly calculated).
TODO: put this in a scikit-learn style: .fit() and .apply() style
"""
# This will be clumsy, until we have Python 3.9
default_settings = {"robust": True}
if settings:
default_settings.update(settings)
settings = default_settings
if columns_to_align is None:
columns_to_align = batches[next(iter(batches.keys()))].columns
collector_rnge = []
collector_mins = []
for batch in batches.values():
if settings["robust"]:
# TODO: consider f_iqr feature here. Would that work?
rnge = batch[columns_to_align].quantile(0.98) - batch[columns_to_align].quantile(0.02)
else:
rnge = batch[columns_to_align].max() - batch[columns_to_align].min()
rnge = cast("pd.Series", rnge)
rnge[rnge.to_numpy() == 0] = 1.0
collector_rnge.append(rnge)
collector_mins.append(batch.min(axis=0))
if settings["robust"]:
scalings = pd.concat(
[
pd.DataFrame(collector_rnge).median(),
pd.DataFrame(collector_mins).median(),
],
axis=1,
)
else:
scalings = pd.concat(
[pd.DataFrame(collector_rnge).mean(), pd.DataFrame(collector_mins).mean()],
axis=1,
)
scalings.columns = ["Range", "Minimum"]
return scalings
[docs]
def apply_scaling(
batches: dict[str, pd.DataFrame],
scale_df: pd.DataFrame,
columns_to_align: list | pd.Index | None = None,
) -> dict:
"""Scales the batches according to the information in the scaling dataframe.
Parameters
----------
batches : dict[str, pd.DataFrame]
The batches, in standard format.
scale_df : pd.DataFrame
The scaling dataframe, from `determine_scaling`
Returns
-------
dict
The scaled batch data.
"""
# TODO: handle the case of DataFrames still
if columns_to_align is None:
if isinstance(batches, dict):
batch1 = batches[next(iter(batches.keys()))]
columns_to_align = batch1.columns
elif isinstance(batches, pd.DataFrame):
columns_to_align = batches.columns
else:
raise TypeError("Undefined input type")
out = {}
for batch_id, batch in batches.items():
out[batch_id] = batch[columns_to_align].copy()
for tag, column in out[batch_id].items():
tag = cast("str", tag)
minimum = cast("float", scale_df.loc[tag, "Minimum"])
scale_range = cast("float", scale_df.loc[tag, "Range"])
out[batch_id][tag] = (column - minimum) / scale_range
return out
[docs]
def reverse_scaling(
batches: dict[str, pd.DataFrame],
scale_df: pd.DataFrame,
columns_to_align: list | pd.Index | None = None,
) -> dict:
"""Reverse the scaling applied by `apply_scaling`."""
# TODO: handle the case of DataFrames still
if columns_to_align is None:
if isinstance(batches, dict):
columns_to_align = batches[next(iter(batches.keys()))].columns
elif isinstance(batches, pd.DataFrame):
columns_to_align = batches.columns
else:
raise TypeError("Undefined input type")
out = {}
for batch_id, batch in batches.items():
out[batch_id] = batch[columns_to_align].copy()
for tag, column in out[batch_id].items():
tag = cast("str", tag)
minimum = cast("float", scale_df.loc[tag, "Minimum"])
scale_range = cast("float", scale_df.loc[tag, "Range"])
out[batch_id][tag] = column * scale_range + minimum
return out
[docs]
class DTWresult:
"""Result class."""
def __init__( # noqa: PLR0913
self,
synced: np.ndarray | pd.DataFrame,
penalty_matrix: np.ndarray,
md_path: np.ndarray, # multi-dimensional path through distance mesh D = penalty_matrix
warping_path: np.ndarray,
distance: float,
normalized_distance: float,
):
self.synced = synced
self.penalty_matrix = penalty_matrix
self.md_path = md_path
self.warping_path = warping_path
self.distance = distance
self.normalized_distance = normalized_distance
[docs]
def align_with_path(md_path: np.ndarray, batch: pd.DataFrame) -> pd.DataFrame:
"""Align a batch to the reference using the DTW path.
Where several samples of ``batch`` map to the same reference index (a
compression in the warping path), the synced value for that index is the
average of those batch samples. The running ``temp`` accumulator is therefore
seeded with the first batch sample for the current index - the same value
assigned to ``synced`` row 0 just below - not with a reference row. A former
``initial_row`` argument seeded it from the reference row (in one caller) or
from an out-of-space batch index (in the other), which mixed an unrelated row
into the row-0 average (#197).
"""
row = 0
nr = md_path[:, 0].max() + 1 # to account for the zero-based indexing
synced = pd.DataFrame(np.zeros((nr, batch.shape[1])), columns=batch.columns)
synced.iloc[row, :] = batch.iloc[md_path[0, 1], :]
temp: pd.Series | np.ndarray = batch.iloc[md_path[0, 1], :]
for idx in np.arange(1, md_path.shape[0]):
if md_path[idx, 0] != md_path[idx - 1, 0]:
row += 1
synced.iloc[row, :] = temp = batch.iloc[md_path[idx, 1], :]
else:
# TODO : Come back to page 181 of thesis: where more than 1 point in the target
# trajectory is aligned with the reference: compute the average,
temp = np.vstack((temp, batch.iloc[md_path[idx, 1], :]))
synced.iloc[row, :] = np.nanmean(temp, axis=0)
return pd.DataFrame(synced)
[docs]
def dtw_core(test: pd.DataFrame, ref: pd.DataFrame, weight_matrix: np.ndarray) -> DTWresult:
"""Compute DTW alignment of test batch against reference batch."""
nt = test.shape[0] # 'test' data; will be align to the 'reference' data
nr = ref.shape[0]
if test.shape[1] != ref.shape[1]:
raise ValueError(
f"test and ref must have the same number of columns; "
f"got test.shape[1]={test.shape[1]}, ref.shape[1]={ref.shape[1]}."
)
D = distance_matrix(test.values, ref.values, weight_matrix)
md_path, distance = backtrack_optimal_path(D)
warping_path = np.zeros(nr)
for idx in range(nr):
warping_path[idx] = md_path[np.where(md_path[:, 0] == idx)[0][-1], 1]
# Now align the `test` batch:
synced = align_with_path(md_path=md_path, batch=test)
return DTWresult(
synced,
D,
md_path,
warping_path,
distance,
normalized_distance=distance / (nr + nt),
)
[docs]
def one_iteration_dtw(
batches_scaled: dict,
refbatch_sc: pd.DataFrame,
weight_matrix: np.ndarray,
settings: dict | None = None,
) -> tuple[dict, pd.DataFrame]:
"""Perform one iteration of the DTW alignment algorithm."""
default_settings = {"show_progress": True, "subsample": 1}
if settings:
default_settings.update(settings)
settings = default_settings
aligned_batches = {}
distances = []
average_batch = refbatch_sc.copy().reset_index(drop=True) * 0.0
successful_alignments = 0
for batch_id, batch in tqdm(batches_scaled.items(), disable=not (settings["show_progress"])):
try:
# see Kassidas, page 180
batch_subset = batch.iloc[:: int(settings["subsample"]), :]
result = dtw_core(batch_subset, refbatch_sc, weight_matrix=weight_matrix)
average_batch = average_batch + result.synced
aligned_batches[batch_id] = result
successful_alignments += 1
distances.append(
{
"batch_id": batch_id,
"Distance": result.distance,
"Normalized distance": result.normalized_distance,
}
)
except ValueError: # noqa: PERF203
raise ValueError(f"Failed on batch {batch_id}") from None
average_batch = average_batch / successful_alignments
return aligned_batches, average_batch
[docs]
def batch_dtw( # noqa: C901, PLR0915
batches: dict[str, pd.DataFrame],
columns_to_align: list,
reference_batch: str,
settings: dict | None = None,
) -> dict:
"""
Synchronize, via iterative DTW, with weighting.
Algorithm: Kassidas et al. (2004): https://doi.org/10.1002/aic.690440412
Parameters
----------
batches : dict[str, pd.DataFrame]
Batch data, in the standard format.
columns_to_align : list
Which columns to use during the alignment process. The others are aligned, but
get no weight, and therefore do not influence the objective function.
reference_batch : str
Which key in the `batches` is the reference batch to use.
settings : dict
Default settings are::
{
"maximum_iterations": 25, # stops here, even if not converged
"tolerance": 0.1, # convergence tolerance
"robust": True, # use robust scaling
"show_progress": True, # show progress
"subsample": 1, # use every sample
"interpolate_time_axis_maximum": 100, # resample time axis to this scale
"interpolate_time_axis_delta": 1, # resolution of resampled axis
"interpolate_method": "cubic", # any scipy.interpolate.interp1d method
}
The default settings resample the time axis to 100 data points, starting at 0 and
ending at 99. Adjust the delta for more points, or change the maximum.
Returns
-------
dict
Various outputs relevant to the alignment.
TODO: Document completely later.
Notation
--------
I = number of batches: index = i
i = index for the batches
J = number of tags (columns in each batch)
j = index for the tags
k = index into the rows of each batch, the samples: 0 ... k ... K_i
"""
default_settings: dict = dict(
maximum_iterations=25, # maximum iterations (stops here, even if not converged)
tolerance=0.1, # convergence tolerance
robust=True, # use robust scaling
show_progress=True, # show progress
subsample=1, # use every sample
interpolate_time_axis_maximum=100, # interpolates everything to be on this scale
interpolate_time_axis_delta=1,
interpolate_method="cubic", # any method from scipy.interpolate.interp1d allowed
)
if settings:
default_settings.update(settings)
settings = default_settings
if settings["maximum_iterations"] < 3:
raise ValueError(
f"At least 3 iterations are required; got maximum_iterations={settings['maximum_iterations']}."
)
if reference_batch not in batches:
raise KeyError(
f"`reference_batch` was not found in the dict of batches; got {reference_batch!r}."
)
if not check_valid_batch_dict(
{k: v[columns_to_align] for k, v in batches.items()},
no_nan=True,
):
raise ValueError("One or more batches in the input dict failed validation.")
scale_df = determine_scaling(batches=batches, columns_to_align=columns_to_align, settings=settings)
batches_scaled = apply_scaling(batches, scale_df, columns_to_align)
refbatch_sc = batches_scaled[reference_batch].iloc[:: int(settings["subsample"]), :]
weight_vector = np.ones(refbatch_sc.shape[1])
weight_matrix = np.diag(weight_vector)
weight_history = np.zeros_like(weight_vector) * np.nan
average_batch = None
delta_weight: np.floating | np.ndarray = np.linalg.norm(weight_vector)
iter_step = 0
while (np.linalg.norm(delta_weight) > settings["tolerance"]) and (iter_step <= settings["maximum_iterations"]):
if settings["show_progress"]:
print(f"Iter = {iter_step} and norm = {np.linalg.norm(delta_weight)}") # noqa: T201
iter_step += 1
logger.debug(
"batch_dtw: iteration %d, weight-delta norm=%g (tolerance=%g)",
iter_step,
float(np.linalg.norm(delta_weight)),
settings["tolerance"],
)
weight_matrix = np.diag(weight_vector)
weight_history = np.vstack((weight_history, weight_vector.copy()))
if iter_step > 3:
refbatch_sc = average_batch
aligned_batches, average_batch = one_iteration_dtw(
batches_scaled=batches_scaled,
refbatch_sc=refbatch_sc,
weight_matrix=weight_matrix,
settings=settings,
)
# Deviations from the average batch:
next_weights = np.zeros((1, refbatch_sc.shape[1]))
for result in aligned_batches.values():
next_weights = next_weights + np.nansum(np.power(result.synced - average_batch, 2), axis=0)
# TODO: use quadratic weights for now, but try sum of the absolute values instead
# np.abs(result.synced - average_batch).sum(axis=0)
# TODO: leave out worst batches when computing the weights
# dist_df = pd.DataFrame(distances).set_index("batch_id")
# dist_df.hist("Distance", bins=50)
# Now find the average trajectory, but ignore problematic batches:
# for example, top 5% of the distances.
# TODO: make this a configurable setting
# problematic_threshold = dist_df["Distance"].quantile(0.95)
next_weights = 1.0 / np.where(next_weights > epsqrt, next_weights, 10000)
weight_vector = (next_weights / np.sum(next_weights) * len(columns_to_align)).ravel()
# If change in delta_weight is small, we terminate early; no need to fine-tune excessively.
delta_weight = np.diag(weight_matrix) - weight_vector # old - new
# OK, the weights are found: now use the last iteration's result to get back to original
# scaling for the trajectories
weight_history = weight_history[1:, :]
aligned_df_collection: list[pd.DataFrame] = []
new_time_axis = np.arange(
0,
settings["interpolate_time_axis_maximum"],
settings["interpolate_time_axis_delta"],
)
for batch_id, result in tqdm(
aligned_batches.items(),
desc="Interpolating",
disable=not (settings["show_progress"]),
):
synced = align_with_path(
result.md_path,
batches[batch_id].iloc[:: int(settings["subsample"]), :],
)
# Resample the trajectories of the aligned data now along this sequence.
sequence = np.linspace(
0,
settings["interpolate_time_axis_maximum"] - settings["interpolate_time_axis_delta"],
synced.shape[0],
)
# Internal invariants on the just-built axes; not user input.
assert new_time_axis.min() == sequence.min() # post-construction invariant
assert new_time_axis.max() == sequence.max() # post-construction invariant
synced_interpolated = pd.DataFrame()
for column in synced:
if column in ["batch_id", "_sequence_"]:
continue
interp_column = sp.interpolate.interp1d(
sequence,
synced[column],
kind=settings["interpolate_method"],
assume_sorted=True,
)
synced_interpolated[column] = interp_column(new_time_axis)
# Pop in an extra column at the start of the df
synced_interpolated.insert(0, "batch_id", batch_id)
synced_interpolated.set_index(new_time_axis)
# Overwrite existing dataframe with this, unscaled, and interpolated dataframe.
result.synced = synced_interpolated
aligned_df_collection.append(synced_interpolated)
# Make the batch_id label consistent
aligned_df = pd.concat(aligned_df_collection)
aligned_df["batch_id"] = aligned_df["batch_id"].astype(type(batch_id))
last_average_batch = reverse_scaling(dict(avg=cast("pd.DataFrame", average_batch)), scale_df)["avg"]
aligned_batch_dfdict = melted_to_dict(aligned_df, batch_id_col="batch_id")
return dict(
scale_df=scale_df,
aligned_batch_objects=aligned_batches,
aligned_batch_dfdict=aligned_batch_dfdict,
last_average_batch=last_average_batch,
weight_history=pd.DataFrame(weight_history, columns=columns_to_align),
)
[docs]
def resample_to_reference(
batches: dict[str, pd.DataFrame],
columns_to_align: list,
reference_batch: str,
settings: dict | None = None,
) -> dict:
"""Resamples all `batches` (only the `columns_to_align`) to the duration of batch with
identifier `reference`.
Parameters
----------
batches : dict[str, pd.DataFrame]
Batch data, in the standard format.
columns_to_align : list
Which columns to use. Others are ignored.
reference_batch : str
Which key in the `batches` is the reference batch.
settings : dict, optional
[description], by default None
Returns
-------
dict
Batch data, in the standard format.
"""
default_settings = {
"interpolate_kind": "cubic", # must be a valid "scipy.interpolate.interp1d" `kind`
}
if settings:
default_settings.update(settings)
settings = default_settings
out = {}
target_time = np.arange(0, batches[reference_batch].shape[0])
target_time = target_time / target_time[-1]
for batch_id, batch in batches.items():
to_resample = np.arange(0, batch.shape[0])
to_resample = to_resample / to_resample[-1]
out_df = {}
for column, series in batch.items():
if column in columns_to_align:
out_df[column] = sp.interpolate.interp1d(
to_resample,
series.values,
copy=False,
kind=settings["interpolate_kind"],
)(target_time)
out[batch_id] = pd.DataFrame(out_df)
return out
[docs]
def find_average_length(batches: dict[str, pd.DataFrame], settings: dict | None = None) -> str:
"""
Find the batch in `batches` with the average length.
Parameters
----------
batches : dict[str, pd.DataFrame]
Batch data, in the standard format.
settings : dict
Default settings are::
{"robust": True} # use robust (median) average batch length
Returns
-------
One of the dictionary keys from `batches`.
"""
default_settings = {
"robust": True, # use robust metric to calculate average batch length
}
if settings:
default_settings.update(settings)
settings = default_settings
batch_lengths = pd.Series({batch_id: df.shape[0] for batch_id, df in batches.items()})
if settings["robust"]:
# If multiple batches of the median length, return the last one.
try:
median_match = np.where((batch_lengths == batch_lengths.median()).to_numpy())[0][-1]
return cast("str", batch_lengths.index[int(median_match)])
except IndexError:
# Very exceptional: if batch_lengths.median() is computed as the average of 2 numbers
# and therefore the median length batch doesn't actually exist
return find_average_length(batches, settings=dict(robust=False))
else:
closest = int((batch_lengths - batch_lengths.mean()).abs().argmin())
return cast("str", batch_lengths.index[closest])
[docs]
def find_reference_batch(
batches: dict[str, pd.DataFrame],
columns_to_align: list,
settings: dict | None = None,
) -> str | list[str]:
"""
Find a reference batch. Assumes NO missing data.
Starts with the average duration batch; resamples (simple interpolation) of all batches to
that duration. Unfolds that resampled data. Does PCA on the wide, unfolded data. Fits,
by default, 4 components. Excludes all batches with Hotelling's T2 > 90% limit. Refits PCA
with 4 components. Finds the batch which has the multivariate combination of scores which are
the smallest (i.e. closest to the model center) and ensures this batch has SPE < 50% of the
model limit.
Parameters
----------
batches : dict[str, pd.DataFrame]
Batch data, in the standard format.
columns_to_align : list
Which columns to use. Others are ignored.
settings : dict, optional
Default settings are::
{
"robust": True, # use robust scaling
"subsample": 1, # use every sample
"method": "pca_most_average", # most average batch from a crude PCA
"n_components": 4,
"number_of_reference_batches": 1, # only a single batch returned
}
Returns
-------
str or list[str]
When ``settings["number_of_reference_batches"] == 1`` (the default), a
single dictionary key from ``batches`` is returned. When more than one
reference batch is requested, a list of that many keys is returned,
ordered from most to least central in the PCA model.
"""
default_settings: dict[str, int | float | str | bool] = {
"robust": True, # use robust scaling
"subsample": 1, # use every sample
"method": "pca_most_average",
"n_components": 4,
"number_of_reference_batches": 1,
}
if isinstance(settings, dict):
default_settings.update(settings)
settings = default_settings
if not isinstance(columns_to_align, list):
raise TypeError(
f"`columns_to_align` must be a list of column names; "
f"got {type(columns_to_align).__name__}."
)
if not check_valid_batch_dict({k: v[columns_to_align] for k, v in batches.items()}):
raise ValueError("One or more batches in the input dict failed validation.")
# Starts with the average duration batch.
initial_reference_id = find_average_length(batches, settings)
# Resamples (simple interpolation) of all batches to that duration.
resampled = resample_to_reference(
batches,
columns_to_align,
reference_batch=initial_reference_id,
settings=settings,
)
# Unfolds that resampled data.
basewide = dict_to_wide(resampled)
# Does PCA on the wide, unfolded data. A=4
scaler = MCUVScaler().fit(basewide)
mcuv = scaler.fit_transform(basewide)
# You can't fit more components than rows in the matrix.
n_components = min(int(np.floor(settings["n_components"])), basewide.shape[0] - 1)
pca_first = PCA(n_components=n_components).fit(mcuv)
# Excludes all batches with Hotelling's T2 > 90% limit.
hotellings_t2_limit_90 = pca_first.hotellings_t2_limit(0.90)
to_keep = pca_first.hotellings_t2_.iloc[:, -1] < hotellings_t2_limit_90
# Refits PCA with A=4 on a subset of the batches, to avoid biasing the PCA model too much.
basewide = basewide.loc[to_keep, :]
scaler = MCUVScaler().fit(basewide)
mcuv = scaler.fit_transform(basewide)
pca_second = PCA(n_components=n_components).fit(mcuv)
# Finds batch with scores; and ensures this batch has SPE < 50% of the model limit.
metrics = pd.DataFrame(
{
"HT2": pca_second.hotellings_t2_.iloc[:, -1],
"SPE": pca_second.spe_.iloc[:, -1],
}
)
metrics = metrics.sort_values(by=["HT2", "SPE"])
requested = int(settings["number_of_reference_batches"])
if requested < 1:
raise ValueError(
f"number_of_reference_batches must be >= 1, got {requested}."
)
if requested > len(metrics):
# SEC-13 (#261): without this guard the cutoff-relaxation loop below
# walks past ``conf_level=1.0``, which trips an ``assert`` inside
# ``spe_calculation`` and was -O-strippable. Fail fast with a clear
# message instead.
raise ValueError(
f"number_of_reference_batches={requested} exceeds the number of "
f"candidate batches ({len(metrics)}); cannot select that many."
)
# Boolean-mask indexing instead of a DataFrame.query() expression string
# built by f-string (no expression is assembled or evaluated).
spe_metrics = metrics[metrics["SPE"] < pca_second.spe_limit(conf_level=0.5)]
# SEC-13 (#261): bound the cutoff strictly below 1.0. If the loop runs out
# of headroom before enough batches pass, give up with a clear error
# rather than tripping the inner ``assert conf_level < 1.0`` (which is
# ``python -O``-strippable).
start_cutoff = 0.5
max_cutoff = 0.95
while spe_metrics.shape[0] < requested and start_cutoff <= max_cutoff:
spe_metrics = metrics[metrics["SPE"] < pca_second.spe_limit(conf_level=start_cutoff)]
start_cutoff += 0.05
if spe_metrics.shape[0] < requested:
raise ValueError(
f"Could not find {requested} reference batches even at "
f"conf_level={max_cutoff:.2f}; only {spe_metrics.shape[0]} "
"batches passed the SPE cutoff."
)
if requested == 1:
return spe_metrics.index[0] # returns a single entry from the index
return spe_metrics.index[0:requested].to_list()