Source code for hots.plugins.clustering.builder

# hots/plugins/clustering/builder.py

"""Clustering builder utilities for HOTS."""

import logging
from itertools import combinations

import numpy as np
import pandas as pd
from scipy.spatial.distance import pdist, squareform


[docs] def build_matrix_indiv_attr( df: pd.DataFrame, tick_field: str, indiv_field: str, metrics: list, id_map: dict ) -> pd.DataFrame: """Build a container×time matrix from individual‐level DataFrame.""" rows = [] for cid, group in df.groupby(indiv_field): row = dict(zip(group[tick_field], group[metrics[0]], strict=True)) row[indiv_field] = cid rows.append(row) mat = pd.DataFrame(rows).fillna(0).set_index(indiv_field) sorted_idx = sorted(mat.index, key=lambda x: id_map[x]) return mat.loc[sorted_idx]
[docs] def build_adjacency_matrix(labels_): """Build the adjacency matrix of clustering. :param labels_: List of clusters assigned to individuals :type labels_: List :return: Adjacency matrix :rtype: np.array """ u = np.zeros((len(labels_), len(labels_))) for i, j in combinations(range(len(labels_)), 2): if labels_[i] == labels_[j]: u[i, j] = 1 u[j, i] = 1 return u
[docs] def build_similarity_matrix(mat: pd.DataFrame) -> pd.DataFrame: """Compute pairwise Euclidean distance matrix from input matrix.""" return squareform(pdist(mat.values, "euclidean"))
[docs] def build_pre_clust_matrices( df, tick_field, indiv_field, metrics, id_map, clustering, new_containers: bool = False ): """Build period clustering dataframes and matrices to be used.""" clustering.clust_mat = build_matrix_indiv_attr(df, tick_field, indiv_field, metrics, id_map) inv = {v: k for k, v in id_map.items()} labeled_idx = [inv[i] for i in range(len(clustering.labels)) if i in inv] labels_s = pd.Series(clustering.labels[: len(labeled_idx)], index=labeled_idx, name="cluster") clustering.clust_mat["cluster"] = -1 clustering.clust_mat.loc[labels_s.index, "cluster"] = labels_s if new_containers: clustering.clust_mat = assign_new_containers_to_nearest_cluster(clustering.clust_mat) clustering.labels = clustering.clust_mat["cluster"].astype(int).to_numpy() clustering.u_mat = build_adjacency_matrix(clustering.labels) clustering.w_mat = build_similarity_matrix(clustering.clust_mat)
[docs] def build_post_clust_matrices(clust_mat): """Build result clustering dataframes and matrices to be used.""" cluster_profiles = cluster_mean_profile(clust_mat) cluster_var_matrix = pairwise_sum_profile_var(cluster_profiles) dv_mat = build_var_delta_cluster_matrix(clust_mat, cluster_var_matrix) return dv_mat
[docs] def cluster_mean_profile(df_clust: pd.DataFrame, cluster_col: str = "cluster") -> np.ndarray: """Compute the mean profile of each cluster.""" # pick only numeric feature columns feature_cols = df_clust.columns.drop(cluster_col) # compute means per cluster grouped = df_clust.groupby(cluster_col)[feature_cols].mean() # if cluster labels are not 0..K-1, reindex to dense 0..K-1 dense = grouped.reset_index(drop=True) return dense.to_numpy(dtype=float)
[docs] def pairwise_sum_profile_var(profiles: np.ndarray) -> np.ndarray: """Compute a matrix of variance of sum of profiles for each pair of cluster.""" # profiles: (k, p) k, p = profiles.shape # expand to (k, 1, p) and (1, k, p) then broadcast-sum -> (k, k, p) summed = profiles[:, None, :] + profiles[None, :, :] # (k, k, p) # variance along the feature axis var_mat = summed.var(axis=2, ddof=0) # (k, k) np.fill_diagonal(var_mat, -1.0) return var_mat
[docs] def build_var_delta_cluster_matrix(df_clust, cluster_var_matrix, *, zero_diag=True): """Build variance of deltas matrix from cluster.""" # labels[i] = cluster id for the i-th row in df_clust (must be ints in [0..K-1]) labels = df_clust["cluster"].to_numpy() # Broadcast-select the (cluster_i, cluster_j) entry for all pairs vars_matrix = cluster_var_matrix[np.ix_(labels, labels)].astype(float, copy=False) if zero_diag: np.fill_diagonal(vars_matrix, 0.0) return vars_matrix
[docs] def dist_from_mean(df_clust, profiles, cid: str) -> float: """Return distance from cid to its cluster mean profile.""" row = df_clust.loc[cid] # Series for that container k = int(row["cluster"]) # cluster id x = row.drop(labels="cluster").to_numpy(dtype=float) # features only mu = np.asarray(profiles[k], dtype=float) # cluster mean return np.linalg.norm(x - mu)
[docs] def get_far_container(c1, c2, df_clust: pd.DataFrame, profiles: np.ndarray) -> str: """Return c1 if it's farther from its cluster mean than c2 is, else return c2.""" d1 = dist_from_mean(df_clust, profiles, c1) d2 = dist_from_mean(df_clust, profiles, c2) return c1 if d1 > d2 else c2
[docs] def change_clustering(mvg_containers, clustering, dict_id_c: dict, tol_open_clust: float = None): """ Reassign each container in mvg_containers to the closest existing cluster (by Euclidean distance to the cluster mean profile). """ nb_changes = 0 logging.info("List of moving containers (clustering):") logging.info(mvg_containers) # Work only on moved-out base set to compute centroids (don’t include movers) df_clust_new = clustering.clust_mat.loc[~clustering.clust_mat.index.isin(mvg_containers)] if df_clust_new.empty: # No reference data to compute centroids -> nothing to do return clustering.clust_mat, nb_changes profiles = cluster_mean_profile(df_clust_new) if profiles is None or (isinstance(profiles, np.ndarray) and profiles.size == 0): return clustering.clust_mat, nb_changes # Columns that are features (everything except 'cluster') feature_cols = [c for c in clustering.clust_mat.columns if c != "cluster"] for indiv in mvg_containers: # Skip if the container isn’t present if indiv not in clustering.clust_mat.index: continue # Extract feature vector for this container row = clustering.clust_mat.loc[indiv] x = row[feature_cols].to_numpy(dtype=float) # Distances to each cluster centroid # profiles[k] must be same length as x dists = np.linalg.norm(profiles - x, axis=1) new_cluster = int(np.argmin(dists)) # --- Optional "open a new cluster" logic --- # min_dist = float(dists[new_cluster]) # if tol_open_clust is not None and min_dist >= tol_open_clust: # # create new cluster with this container as its centroid # new_cluster = profiles.shape[0] # profiles = np.vstack([profiles, x[None, :]]) old_cluster = int(row["cluster"]) if new_cluster != old_cluster: try: logging.info(f"{indiv} changes cluster : from {old_cluster} to {new_cluster}\n") except Exception: pass # Update df and labels_ clustering.clust_mat.loc[indiv, "cluster"] = new_cluster nb_changes += 1 # Update labels_ only if we can resolve the integer id c_int = dict_id_c.get(indiv, None) if c_int is not None and 0 <= c_int < len(clustering.labels): clustering.labels[c_int] = new_cluster return clustering.clust_mat, nb_changes
[docs] def assign_new_containers_to_nearest_cluster( clust_mat: pd.DataFrame, label_col: str = "cluster", ) -> pd.DataFrame: """ For any row with cluster == -1, assign it to the cluster of its nearest existing container. Mutates and returns `clust_mat`. """ # Features are all columns except the label column feature_cols = [c for c in clust_mat.columns if c != label_col] x = clust_mat[feature_cols].values labels = clust_mat[label_col].to_numpy() existing_mask = labels != -1 new_mask = labels == -1 # If there are no new containers, nothing to do if not new_mask.any(): return clust_mat x_existing = x[existing_mask] labels_existing = labels[existing_mask] # Safety: if for some reason all are -1, we can't do anything if x_existing.shape[0] == 0: return clust_mat x_new = x[new_mask] new_index = clust_mat.index[new_mask] # For each new container, find nearest existing container for i, cid in enumerate(new_index): x_new_i = x_new[i] # Euclidean distance to all existing points dists = np.linalg.norm(x_existing - x_new_i, axis=1) nearest_label = labels_existing[np.argmin(dists)] clust_mat.at[cid, label_col] = nearest_label return clust_mat