from __future__ import annotations
import numpy as np
import pandas as pd
import plotly.graph_objects as go
import scipy as sp
from tqdm import tqdm
from ..multivariate.methods import PCA, MCUVScaler
from .alignment_helpers import backtrack_optimal_path, distance_matrix
from .data_input import check_valid_batch_dict, dict_to_wide, melted_to_dict
epsqrt = np.sqrt(np.finfo(float).eps)
[docs]
def determine_scaling(
batches: dict[str, pd.DataFrame],
columns_to_align: list | None = None,
settings: dict | None = None,
) -> pd.DataFrame:
"""
Scales the batch data according to the variable ranges.
Parameters
----------
batches : dict[str, pd.DataFrame]
Batch data, in the standard format (keyed by batch identifier).
columns_to_align : list, optional
The column names (tags) to be scaled. If ``None``, the columns of the
first batch are used.
settings : dict, optional
Optional overrides. Currently supports the key ``"robust"`` (bool,
default ``True``) which switches between a robust range (q98 - q02)
and the raw (max - min) range.
Returns
-------
range_scalers: DataFrame
J rows, 2 columns: column 1 = range of each tag (approx. q98 - q02),
column 2 = typical minimum of each tag (robustly calculated).
TODO: put this in a scikit-learn style: .fit() and .apply() style
"""
# This will be clumsy, until we have Python 3.9
default_settings = {"robust": True}
if settings:
default_settings.update(settings)
settings = default_settings
if columns_to_align is None:
columns_to_align = batches[next(iter(batches.keys()))].columns
collector_rnge = []
collector_mins = []
for batch in batches.values():
if settings["robust"]:
# TODO: consider f_iqr feature here. Would that work?
rnge = batch[columns_to_align].quantile(0.98) - batch[columns_to_align].quantile(0.02)
else:
rnge = batch[columns_to_align].max() - batch[columns_to_align].min()
rnge[rnge.values == 0] = 1.0
collector_rnge.append(rnge)
collector_mins.append(batch.min(axis=0))
if settings["robust"]:
scalings = pd.concat(
[
pd.DataFrame(collector_rnge).median(),
pd.DataFrame(collector_mins).median(),
],
axis=1,
)
else:
scalings = pd.concat(
[pd.DataFrame(collector_rnge).mean(), pd.DataFrame(collector_mins).mean()],
axis=1,
)
scalings.columns = ["Range", "Minimum"]
return scalings
[docs]
def apply_scaling(
batches: dict[str, pd.DataFrame],
scale_df: pd.DataFrame,
columns_to_align: list | None = None,
) -> dict:
"""Scales the batches according to the information in the scaling dataframe.
Parameters
----------
batches : dict[str, pd.DataFrame]
The batches, in standard format.
scale_df : pd.DataFrame
The scaling dataframe, from `determine_scaling`
Returns
-------
dict
The scaled batch data.
"""
# TODO: handle the case of DataFrames still
if columns_to_align is None:
if isinstance(batches, dict):
batch1 = batches[next(iter(batches.keys()))]
columns_to_align = batch1.columns
elif isinstance(batches, pd.DataFrame):
columns_to_align = batches.columns
else:
raise TypeError("Undefined input type")
out = {}
for batch_id, batch in batches.items():
out[batch_id] = batch[columns_to_align].copy()
for tag, column in out[batch_id].items():
out[batch_id][tag] = (column - scale_df.loc[tag, "Minimum"]) / scale_df.loc[tag, "Range"]
return out
[docs]
def reverse_scaling(
batches: dict[str, pd.DataFrame],
scale_df: pd.DataFrame,
columns_to_align: list | None = None,
) -> dict:
"""Reverse the scaling applied by `apply_scaling`."""
# TODO: handle the case of DataFrames still
if columns_to_align is None:
if isinstance(batches, dict):
columns_to_align = batches[next(iter(batches.keys()))].columns
elif isinstance(batches, pd.DataFrame):
columns_to_align = batches.columns
else:
raise TypeError("Undefined input type")
out = {}
for batch_id, batch in batches.items():
out[batch_id] = batch[columns_to_align].copy()
for tag, column in out[batch_id].items():
out[batch_id][tag] = column * scale_df.loc[tag, "Range"] + scale_df.loc[tag, "Minimum"]
return out
[docs]
class DTWresult:
"""Result class."""
def __init__( # noqa: PLR0913
self,
synced: np.ndarray,
penalty_matrix: np.ndarray,
md_path: np.ndarray, # multi-dimensional path through distance mesh D = penalty_matrix
warping_path: np.ndarray,
distance: float,
normalized_distance: float,
):
self.synced = synced
self.penalty_matrix = penalty_matrix
self.md_path = md_path
self.warping_path = warping_path
self.distance = distance
self.normalized_distance = normalized_distance
[docs]
def align_with_path(md_path: np.ndarray, batch: pd.DataFrame, initial_row: pd.Series) -> pd.DataFrame:
"""Align a batch to the reference using the DTW path."""
row = 0
nr = md_path[:, 0].max() + 1 # to account for the zero-based indexing
synced = pd.DataFrame(np.zeros((nr, batch.shape[1])), columns=batch.columns)
synced.iloc[row, :] = batch.iloc[md_path[0, 1], :]
temp = initial_row
for idx in np.arange(1, md_path.shape[0]):
if md_path[idx, 0] != md_path[idx - 1, 0]:
row += 1
synced.iloc[row, :] = temp = batch.iloc[md_path[idx, 1], :]
else:
# TODO : Come back to page 181 of thesis: where more than 1 point in the target
# trajectory is aligned with the reference: compute the average,
temp = np.vstack((temp, batch.iloc[md_path[idx, 1], :]))
synced.iloc[row, :] = np.nanmean(temp, axis=0)
return pd.DataFrame(synced)
[docs]
def dtw_core(test: pd.DataFrame, ref: pd.DataFrame, weight_matrix: np.ndarray) -> DTWresult:
"""Compute DTW alignment of test batch against reference batch."""
show_plot = False
nt = test.shape[0] # 'test' data; will be align to the 'reference' data
nr = ref.shape[0]
assert test.shape[1] == ref.shape[1]
D = distance_matrix(test.values, ref.values, weight_matrix)
md_path, distance = backtrack_optimal_path(D)
warping_path = np.zeros(nr)
for idx in range(nr):
warping_path[idx] = md_path[np.where(md_path[:, 0] == idx)[0][-1], 1]
# Now align the `test` batch:
initial_row = ref.iloc[md_path[0, 0], :].copy()
synced = align_with_path(md_path=md_path, batch=test, initial_row=initial_row)
if show_plot: # for debugging
X = np.arange(0, nt, 1)
Y = np.arange(0, nr, 1)
X, Y = np.meshgrid(X, Y)
fig = go.Figure(
data=[
go.Mesh3d(
x=X.ravel(),
y=Y.ravel(),
z=D.ravel(),
color="lightpink",
opacity=0.90,
)
]
)
fig.show()
return DTWresult(
synced,
D,
md_path,
warping_path,
distance,
normalized_distance=distance / (nr + nt),
)
[docs]
def one_iteration_dtw(
batches_scaled: dict,
refbatch_sc: pd.DataFrame,
weight_matrix: np.ndarray,
settings: dict | None = None,
) -> tuple[dict, pd.DataFrame]:
"""Perform one iteration of the DTW alignment algorithm."""
default_settings = {"show_progress": True, "subsample": 1}
if settings:
default_settings.update(settings)
settings = default_settings
aligned_batches = {}
distances = []
average_batch = refbatch_sc.copy().reset_index(drop=True) * 0.0
successful_alignments = 0
for batch_id, batch in tqdm(batches_scaled.items(), disable=not (settings["show_progress"])):
try:
# see Kassidas, page 180
batch_subset = batch.iloc[:: int(settings["subsample"]), :]
result = dtw_core(batch_subset, refbatch_sc, weight_matrix=weight_matrix)
average_batch = average_batch + result.synced
aligned_batches[batch_id] = result
successful_alignments += 1
distances.append(
{
"batch_id": batch_id,
"Distance": result.distance,
"Normalized distance": result.normalized_distance,
}
)
except ValueError: # noqa: PERF203
raise ValueError(f"Failed on batch {batch_id}") from None
average_batch = average_batch / successful_alignments
return aligned_batches, average_batch
[docs]
def batch_dtw( # noqa: PLR0915
batches: dict[str, pd.DataFrame],
columns_to_align: list,
reference_batch: str,
settings: dict | None = None,
) -> dict:
"""
Synchronize, via iterative DTW, with weighting.
Algorithm: Kassidas et al. (2004): https://doi.org/10.1002/aic.690440412
Parameters
----------
batches : dict[str, pd.DataFrame]
Batch data, in the standard format.
columns_to_align : list
Which columns to use during the alignment process. The others are aligned, but
get no weight, and therefore do not influence the objective function.
reference_batch : str
Which key in the `batches` is the reference batch to use.
settings : dict
Default settings are::
{
"maximum_iterations": 25, # stops here, even if not converged
"tolerance": 0.1, # convergence tolerance
"robust": True, # use robust scaling
"show_progress": True, # show progress
"subsample": 1, # use every sample
"interpolate_time_axis_maximum": 100, # resample time axis to this scale
"interpolate_time_axis_delta": 1, # resolution of resampled axis
"interpolate_method": "cubic", # any scipy.interpolate.interp1d method
}
The default settings resample the time axis to 100 data points, starting at 0 and
ending at 99. Adjust the delta for more points, or change the maximum.
Returns
-------
dict
Various outputs relevant to the alignment.
TODO: Document completely later.
Notation
--------
I = number of batches: index = i
i = index for the batches
J = number of tags (columns in each batch)
j = index for the tags
k = index into the rows of each batch, the samples: 0 ... k ... K_i
"""
default_settings: dict = dict(
maximum_iterations=25, # maximum iterations (stops here, even if not converged)
tolerance=0.1, # convergence tolerance
robust=True, # use robust scaling
show_progress=True, # show progress
subsample=1, # use every sample
interpolate_time_axis_maximum=100, # interpolates everything to be on this scale
interpolate_time_axis_delta=1,
interpolate_method="cubic", # any method from scipy.interpolate.interp1d allowed
)
if settings:
default_settings.update(settings)
settings = default_settings
assert settings["maximum_iterations"] >= 3, "At least 3 iterations are required"
assert reference_batch in batches, "`reference_batch` was not found in the dict of batches."
assert check_valid_batch_dict({k: v[columns_to_align] for k, v in batches.items()}, no_nan=True)
scale_df = determine_scaling(batches=batches, columns_to_align=columns_to_align, settings=settings)
batches_scaled = apply_scaling(batches, scale_df, columns_to_align)
refbatch_sc = batches_scaled[reference_batch].iloc[:: int(settings["subsample"]), :]
weight_vector = np.ones(refbatch_sc.shape[1])
weight_matrix = np.diag(weight_vector)
weight_history = np.zeros_like(weight_vector) * np.nan
average_batch = None
delta_weight = np.linalg.norm(weight_vector)
iter_step = 0
while (np.linalg.norm(delta_weight) > settings["tolerance"]) and (iter_step <= settings["maximum_iterations"]):
if settings["show_progress"]:
print(f"Iter = {iter_step} and norm = {np.linalg.norm(delta_weight)}") # noqa: T201
iter_step += 1
weight_matrix = np.diag(weight_vector)
weight_history = np.vstack((weight_history, weight_vector.copy()))
if iter_step > 3:
refbatch_sc = average_batch
aligned_batches, average_batch = one_iteration_dtw(
batches_scaled=batches_scaled,
refbatch_sc=refbatch_sc,
weight_matrix=weight_matrix,
settings=settings,
)
# Deviations from the average batch:
next_weights = np.zeros((1, refbatch_sc.shape[1]))
for result in aligned_batches.values():
next_weights = next_weights + np.nansum(np.power(result.synced - average_batch, 2), axis=0)
# TODO: use quadratic weights for now, but try sum of the absolute values instead
# np.abs(result.synced - average_batch).sum(axis=0)
# TODO: leave out worst batches when computing the weights
# dist_df = pd.DataFrame(distances).set_index("batch_id")
# dist_df.hist("Distance", bins=50)
# Now find the average trajectory, but ignore problematic batches:
# for example, top 5% of the distances.
# TODO: make this a configurable setting
# problematic_threshold = dist_df["Distance"].quantile(0.95)
next_weights = 1.0 / np.where(next_weights > epsqrt, next_weights, 10000)
weight_vector = (next_weights / np.sum(next_weights) * len(columns_to_align)).ravel()
# If change in delta_weight is small, we terminate early; no need to fine-tune excessively.
delta_weight = np.diag(weight_matrix) - weight_vector # old - new
# OK, the weights are found: now use the last iteration's result to get back to original
# scaling for the trajectories
weight_history = weight_history[1:, :]
aligned_df_collection: list[pd.DataFrame] = []
new_time_axis = np.arange(
0,
settings["interpolate_time_axis_maximum"],
settings["interpolate_time_axis_delta"],
)
for batch_id, result in tqdm(
aligned_batches.items(),
desc="Interpolating",
disable=not (settings["show_progress"]),
):
initial_row = batches[batch_id].iloc[result.md_path[0, 0], :].copy()
synced = align_with_path(
result.md_path,
batches[batch_id].iloc[:: int(settings["subsample"]), :],
initial_row=initial_row,
)
# Resample the trajectories of the aligned data now along this sequence.
sequence = np.linspace(
0,
settings["interpolate_time_axis_maximum"] - settings["interpolate_time_axis_delta"],
synced.shape[0],
)
assert new_time_axis.min() == sequence.min()
assert new_time_axis.max() == sequence.max()
synced_interpolated = pd.DataFrame()
for column in synced:
if column in ["batch_id", "_sequence_"]:
continue
interp_column = sp.interpolate.interp1d(
sequence,
synced[column],
kind=settings["interpolate_method"],
assume_sorted=True,
)
synced_interpolated[column] = interp_column(new_time_axis)
# Pop in an extra column at the start of the df
synced_interpolated.insert(0, "batch_id", batch_id)
synced_interpolated.set_index(new_time_axis)
# Overwrite existing dataframe with this, unscaled, and interpolated dataframe.
result.synced = synced_interpolated
aligned_df_collection.append(synced_interpolated)
# Make the batch_id label consistent
aligned_df = pd.concat(aligned_df_collection)
aligned_df["batch_id"] = aligned_df["batch_id"].astype(type(batch_id))
last_average_batch = reverse_scaling(dict(avg=average_batch), scale_df)["avg"]
aligned_batch_dfdict = melted_to_dict(aligned_df, batch_id_col="batch_id")
return dict(
scale_df=scale_df,
aligned_batch_objects=aligned_batches,
aligned_batch_dfdict=aligned_batch_dfdict,
last_average_batch=last_average_batch,
weight_history=pd.DataFrame(weight_history, columns=columns_to_align),
)
[docs]
def resample_to_reference(
batches: dict[str, pd.DataFrame],
columns_to_align: list,
reference_batch: str,
settings: dict | None = None,
) -> dict:
"""Resamples all `batches` (only the `columns_to_align`) to the duration of batch with
identifier `reference`.
Parameters
----------
batches : dict[str, pd.DataFrame]
Batch data, in the standard format.
columns_to_align : list
Which columns to use. Others are ignored.
reference_batch : str
Which key in the `batches` is the reference batch.
settings : dict, optional
[description], by default None
Returns
-------
dict
Batch data, in the standard format.
"""
default_settings = {
"interpolate_kind": "cubic", # must be a valid "scipy.interpolate.interp1d" `kind`
}
if settings:
default_settings.update(settings)
settings = default_settings
out = {}
target_time = np.arange(0, batches[reference_batch].shape[0])
target_time = target_time / target_time[-1]
for batch_id, batch in batches.items():
to_resample = np.arange(0, batch.shape[0])
to_resample = to_resample / to_resample[-1]
out_df = {}
for column, series in batch.items():
if column in columns_to_align:
out_df[column] = sp.interpolate.interp1d(
to_resample,
series.values,
copy=False,
kind=settings["interpolate_kind"],
)(target_time)
out[batch_id] = pd.DataFrame(out_df)
return out
[docs]
def find_average_length(batches: dict[str, pd.DataFrame], settings: dict | None = None) -> str:
"""
Find the batch in `batches` with the average length.
Parameters
----------
batches : dict[str, pd.DataFrame]
Batch data, in the standard format.
settings : dict
Default settings are::
{"robust": True} # use robust (median) average batch length
Returns
-------
One of the dictionary keys from `batches`.
"""
default_settings = {
"robust": True, # use robust metric to calculate average batch length
}
if settings:
default_settings.update(settings)
settings = default_settings
batch_lengths = pd.Series({batch_id: df.shape[0] for batch_id, df in batches.items()})
if settings["robust"]:
# If multiple batches of the median length, return the last one.
try:
return batch_lengths.index[np.where((batch_lengths == batch_lengths.median()).values)[0][-1]]
except IndexError:
# Very exceptional: if batch_lengths.median() is computed as the average of 2 numbers
# and therefore the median length batch doesn't actually exist
return find_average_length(batches, settings=dict(robust=False))
else:
return batch_lengths.index[(batch_lengths - batch_lengths.mean()).abs().argmin()]
[docs]
def find_reference_batch(
batches: dict[str, pd.DataFrame],
columns_to_align: list,
settings: dict | None = None,
) -> str | list[str]:
"""
Find a reference batch. Assumes NO missing data.
Starts with the average duration batch; resamples (simple interpolation) of all batches to
that duration. Unfolds that resampled data. Does PCA on the wide, unfolded data. Fits,
by default, 4 components. Excludes all batches with Hotelling's T2 > 90% limit. Refits PCA
with 4 components. Finds the batch which has the multivariate combination of scores which are
the smallest (i.e. closest to the model center) and ensures this batch has SPE < 50% of the
model limit.
Parameters
----------
batches : dict[str, pd.DataFrame]
Batch data, in the standard format.
columns_to_align : list
Which columns to use. Others are ignored.
settings : dict, optional
Default settings are::
{
"robust": True, # use robust scaling
"subsample": 1, # use every sample
"method": "pca_most_average", # most average batch from a crude PCA
"n_components": 4,
"number_of_reference_batches": 1, # only a single batch returned
}
Returns
-------
str or list[str]
When ``settings["number_of_reference_batches"] == 1`` (the default), a
single dictionary key from ``batches`` is returned. When more than one
reference batch is requested, a list of that many keys is returned,
ordered from most to least central in the PCA model.
"""
default_settings: dict[str, int | float | str | bool] = {
"robust": True, # use robust scaling
"subsample": 1, # use every sample
"method": "pca_most_average",
"n_components": 4,
"number_of_reference_batches": 1,
}
if isinstance(settings, dict):
default_settings.update(settings)
settings = default_settings
assert isinstance(columns_to_align, list), "`columns_to_align` must be a list of column names."
assert check_valid_batch_dict({k: v[columns_to_align] for k, v in batches.items()})
# Starts with the average duration batch.
initial_reference_id = find_average_length(batches, settings)
# Resamples (simple interpolation) of all batches to that duration.
resampled = resample_to_reference(
batches,
columns_to_align,
reference_batch=initial_reference_id,
settings=settings,
)
# Unfolds that resampled data.
basewide = dict_to_wide(resampled)
# Does PCA on the wide, unfolded data. A=4
scaler = MCUVScaler().fit(basewide)
mcuv = scaler.fit_transform(basewide)
# You can't fit more components than rows in the matrix.
n_components = min(int(np.floor(settings["n_components"])), basewide.shape[0] - 1)
pca_first = PCA(n_components=n_components).fit(mcuv)
# Excludes all batches with Hotelling's T2 > 90% limit.
hotellings_t2_limit_90 = pca_first.hotellings_t2_limit(0.90)
to_keep = pca_first.hotellings_t2_.iloc[:, -1] < hotellings_t2_limit_90
# Refits PCA with A=4 on a subset of the batches, to avoid biasing the PCA model too much.
basewide = basewide.loc[to_keep, :]
scaler = MCUVScaler().fit(basewide)
mcuv = scaler.fit_transform(basewide)
pca_second = PCA(n_components=n_components).fit(mcuv)
# Finds batch with scores; and ensures this batch has SPE < 50% of the model limit.
metrics = pd.DataFrame(
{
"HT2": pca_second.hotellings_t2_.iloc[:, -1],
"SPE": pca_second.spe_.iloc[:, -1],
}
)
metrics = metrics.sort_values(by=["HT2", "SPE"])
start_cutoff = 0.5
spe_metrics = metrics.query(f"SPE < {pca_second.spe_limit(conf_level=0.5)}")
while spe_metrics.shape[0] < int(settings["number_of_reference_batches"]):
spe_metrics = metrics.query(f"SPE < {pca_second.spe_limit(conf_level=start_cutoff)}")
start_cutoff += 0.05
if settings["number_of_reference_batches"] == 1:
return spe_metrics.index[0] # returns a single entry from the index
else:
return spe_metrics.index[0 : int(settings["number_of_reference_batches"])].to_list()