# hots/plugins/clustering/kmeans.py
"""Clustering plugin: mini‐batch KMeans streaming."""
from typing import Any
import pandas as pd
from sklearn.cluster import MiniBatchKMeans
from hots.core.interfaces import ClusteringPlugin
from hots.plugins.clustering.builder import build_matrix_indiv_attr
[docs]
class StreamKMeans(ClusteringPlugin):
"""StreamKMeans plugin using scikit‐learn’s MiniBatchKMeans."""
def __init__(self, params: dict[str, Any], instance):
"""Initialize the streaming k-means plugin."""
connector_params = instance.config.connector.parameters
self.n_clusters = params.get("nb_clusters", 5)
self.batch_size = params.get("batch_size", 100)
self.random_state = params.get("random_state", None)
self.tick_field = connector_params.get("tick_field")
self.indiv_field = connector_params.get("individual_field")
self.metrics = connector_params.get("metrics")
self.id_map = instance.get_id_map()
self.model = None
self.labels = None
self.profiles = None
self.clust_mat = None
self.u_mat = None
self.w_mat = None
[docs]
def fit(self, df: pd.DataFrame) -> pd.Series:
"""
Rebuild and fit a MiniBatchKMeans on the current data, then return labels.
This avoids any mismatch in expected feature dimension.
"""
self.clust_mat = build_matrix_indiv_attr(
df,
self.tick_field,
self.indiv_field,
self.metrics,
self.id_map,
)
x = self.clust_mat.values
# rebuild the model now that X.shape[1] is known
self.model = MiniBatchKMeans(
n_clusters=self.n_clusters,
batch_size=self.batch_size,
random_state=self.random_state,
)
self.labels = self.model.fit_predict(x)
return pd.Series(self.labels, index=self.clust_mat.index)