# hots/plugins/clustering/builder.py
"""Clustering builder utilities for HOTS."""
import logging
from itertools import combinations
import numpy as np
import pandas as pd
from scipy.spatial.distance import pdist, squareform
[docs]
def build_matrix_indiv_attr(
df: pd.DataFrame, tick_field: str, indiv_field: str, metrics: list, id_map: dict
) -> pd.DataFrame:
"""Build a container×time matrix from individual‐level DataFrame."""
rows = []
for cid, group in df.groupby(indiv_field):
row = dict(zip(group[tick_field], group[metrics[0]], strict=True))
row[indiv_field] = cid
rows.append(row)
mat = pd.DataFrame(rows).fillna(0).set_index(indiv_field)
sorted_idx = sorted(mat.index, key=lambda x: id_map[x])
return mat.loc[sorted_idx]
[docs]
def build_adjacency_matrix(labels_):
"""Build the adjacency matrix of clustering.
:param labels_: List of clusters assigned to individuals
:type labels_: List
:return: Adjacency matrix
:rtype: np.array
"""
u = np.zeros((len(labels_), len(labels_)))
for i, j in combinations(range(len(labels_)), 2):
if labels_[i] == labels_[j]:
u[i, j] = 1
u[j, i] = 1
return u
[docs]
def build_similarity_matrix(mat: pd.DataFrame) -> pd.DataFrame:
"""Compute pairwise Euclidean distance matrix from input matrix."""
return squareform(pdist(mat.values, "euclidean"))
[docs]
def build_pre_clust_matrices(
df, tick_field, indiv_field, metrics, id_map, clustering, new_containers: bool = False
):
"""Build period clustering dataframes and matrices to be used."""
clustering.clust_mat = build_matrix_indiv_attr(df, tick_field, indiv_field, metrics, id_map)
inv = {v: k for k, v in id_map.items()}
labeled_idx = [inv[i] for i in range(len(clustering.labels)) if i in inv]
labels_s = pd.Series(clustering.labels[: len(labeled_idx)], index=labeled_idx, name="cluster")
clustering.clust_mat["cluster"] = -1
clustering.clust_mat.loc[labels_s.index, "cluster"] = labels_s
if new_containers:
clustering.clust_mat = assign_new_containers_to_nearest_cluster(clustering.clust_mat)
clustering.labels = clustering.clust_mat["cluster"].astype(int).to_numpy()
clustering.u_mat = build_adjacency_matrix(clustering.labels)
clustering.w_mat = build_similarity_matrix(clustering.clust_mat)
[docs]
def build_post_clust_matrices(clust_mat):
"""Build result clustering dataframes and matrices to be used."""
cluster_profiles = cluster_mean_profile(clust_mat)
cluster_var_matrix = pairwise_sum_profile_var(cluster_profiles)
dv_mat = build_var_delta_cluster_matrix(clust_mat, cluster_var_matrix)
return dv_mat
[docs]
def cluster_mean_profile(df_clust: pd.DataFrame, cluster_col: str = "cluster") -> np.ndarray:
"""Compute the mean profile of each cluster."""
# pick only numeric feature columns
feature_cols = df_clust.columns.drop(cluster_col)
# compute means per cluster
grouped = df_clust.groupby(cluster_col)[feature_cols].mean()
# if cluster labels are not 0..K-1, reindex to dense 0..K-1
dense = grouped.reset_index(drop=True)
return dense.to_numpy(dtype=float)
[docs]
def pairwise_sum_profile_var(profiles: np.ndarray) -> np.ndarray:
"""Compute a matrix of variance of sum of profiles for each pair of cluster."""
# profiles: (k, p)
k, p = profiles.shape
# expand to (k, 1, p) and (1, k, p) then broadcast-sum -> (k, k, p)
summed = profiles[:, None, :] + profiles[None, :, :] # (k, k, p)
# variance along the feature axis
var_mat = summed.var(axis=2, ddof=0) # (k, k)
np.fill_diagonal(var_mat, -1.0)
return var_mat
[docs]
def build_var_delta_cluster_matrix(df_clust, cluster_var_matrix, *, zero_diag=True):
"""Build variance of deltas matrix from cluster."""
# labels[i] = cluster id for the i-th row in df_clust (must be ints in [0..K-1])
labels = df_clust["cluster"].to_numpy()
# Broadcast-select the (cluster_i, cluster_j) entry for all pairs
vars_matrix = cluster_var_matrix[np.ix_(labels, labels)].astype(float, copy=False)
if zero_diag:
np.fill_diagonal(vars_matrix, 0.0)
return vars_matrix
[docs]
def dist_from_mean(df_clust, profiles, cid: str) -> float:
"""Return distance from cid to its cluster mean profile."""
row = df_clust.loc[cid] # Series for that container
k = int(row["cluster"]) # cluster id
x = row.drop(labels="cluster").to_numpy(dtype=float) # features only
mu = np.asarray(profiles[k], dtype=float) # cluster mean
return np.linalg.norm(x - mu)
[docs]
def get_far_container(c1, c2, df_clust: pd.DataFrame, profiles: np.ndarray) -> str:
"""Return c1 if it's farther from its cluster mean than c2 is, else return c2."""
d1 = dist_from_mean(df_clust, profiles, c1)
d2 = dist_from_mean(df_clust, profiles, c2)
return c1 if d1 > d2 else c2
[docs]
def change_clustering(mvg_containers, clustering, dict_id_c: dict, tol_open_clust: float = None):
"""
Reassign each container in mvg_containers to the closest existing cluster
(by Euclidean distance to the cluster mean profile).
"""
nb_changes = 0
logging.info("List of moving containers (clustering):")
logging.info(mvg_containers)
# Work only on moved-out base set to compute centroids (don’t include movers)
df_clust_new = clustering.clust_mat.loc[~clustering.clust_mat.index.isin(mvg_containers)]
if df_clust_new.empty:
# No reference data to compute centroids -> nothing to do
return clustering.clust_mat, nb_changes
profiles = cluster_mean_profile(df_clust_new)
if profiles is None or (isinstance(profiles, np.ndarray) and profiles.size == 0):
return clustering.clust_mat, nb_changes
# Columns that are features (everything except 'cluster')
feature_cols = [c for c in clustering.clust_mat.columns if c != "cluster"]
for indiv in mvg_containers:
# Skip if the container isn’t present
if indiv not in clustering.clust_mat.index:
continue
# Extract feature vector for this container
row = clustering.clust_mat.loc[indiv]
x = row[feature_cols].to_numpy(dtype=float)
# Distances to each cluster centroid
# profiles[k] must be same length as x
dists = np.linalg.norm(profiles - x, axis=1)
new_cluster = int(np.argmin(dists))
# --- Optional "open a new cluster" logic ---
# min_dist = float(dists[new_cluster])
# if tol_open_clust is not None and min_dist >= tol_open_clust:
# # create new cluster with this container as its centroid
# new_cluster = profiles.shape[0]
# profiles = np.vstack([profiles, x[None, :]])
old_cluster = int(row["cluster"])
if new_cluster != old_cluster:
try:
logging.info(f"{indiv} changes cluster : from {old_cluster} to {new_cluster}\n")
except Exception:
pass
# Update df and labels_
clustering.clust_mat.loc[indiv, "cluster"] = new_cluster
nb_changes += 1
# Update labels_ only if we can resolve the integer id
c_int = dict_id_c.get(indiv, None)
if c_int is not None and 0 <= c_int < len(clustering.labels):
clustering.labels[c_int] = new_cluster
return clustering.clust_mat, nb_changes
[docs]
def assign_new_containers_to_nearest_cluster(
clust_mat: pd.DataFrame,
label_col: str = "cluster",
) -> pd.DataFrame:
"""
For any row with cluster == -1, assign it to the cluster of its
nearest existing container.
Mutates and returns `clust_mat`.
"""
# Features are all columns except the label column
feature_cols = [c for c in clust_mat.columns if c != label_col]
x = clust_mat[feature_cols].values
labels = clust_mat[label_col].to_numpy()
existing_mask = labels != -1
new_mask = labels == -1
# If there are no new containers, nothing to do
if not new_mask.any():
return clust_mat
x_existing = x[existing_mask]
labels_existing = labels[existing_mask]
# Safety: if for some reason all are -1, we can't do anything
if x_existing.shape[0] == 0:
return clust_mat
x_new = x[new_mask]
new_index = clust_mat.index[new_mask]
# For each new container, find nearest existing container
for i, cid in enumerate(new_index):
x_new_i = x_new[i]
# Euclidean distance to all existing points
dists = np.linalg.norm(x_existing - x_new_i, axis=1)
nearest_label = labels_existing[np.argmin(dists)]
clust_mat.at[cid, label_col] = nearest_label
return clust_mat