Source code for hots.evaluation.evaluator

"""Evaluation utilities for HOTS."""

import logging
import math
from collections import Counter
from dataclasses import dataclass
from typing import Any

import networkx as nx
import numpy as np
import pandas as pd

from hots.plugins.clustering.builder import (
    build_adjacency_matrix,
    build_post_clust_matrices,
    build_pre_clust_matrices,
    change_clustering,
    cluster_mean_profile,
    get_far_container,
)
from hots.utils.tools import check_missing_entries_df


[docs] @dataclass(frozen=True) class EvalSnapshot: """All the info we need to compare two consecutive evaluations.""" labels: np.ndarray # shape (n_indiv,) placement: dict[Any, Any] # mapping indiv/container -> host/node tick: int # current time tick for traceability
def _safe_labels_array(labels) -> np.ndarray: """Ensure labels are a 1D numpy array.""" if isinstance(labels, np.ndarray): return labels # pandas Series or list-like return np.asarray(labels).reshape(-1) def _kpi_clustering(labels_now: np.ndarray, labels_prev: np.ndarray | None) -> dict[str, float]: """Compare clustering KPIs + delta vs previous.""" kpi: dict[str, float] = { "clusters_now": float(len(np.unique(labels_now))), } if labels_prev is None: kpi.update( { "clusters_prev": 0.0, "clustered_change_ratio": 0.0, "reassigned_ratio": 0.0, } ) return kpi labels_prev = _safe_labels_array(labels_prev) same_len = min(len(labels_prev), len(labels_now)) reassigned = np.sum(labels_prev[:same_len] != labels_now[:same_len]) kpi.update( { "clusters_prev": float(len(np.unique(labels_prev))), "clustered_change_ratio": float( abs(len(np.unique(labels_now)) - len(np.unique(labels_prev))) ), "reassigned_ratio": float(reassigned) / float(same_len) if same_len else 0.0, } ) return kpi def _kpi_placement( placement_now: dict[Any, Any], placement_prev: dict[Any, Any] | None, ) -> dict[str, float]: """Placement KPIs + delta vs previous (e.g., % moved).""" moved_ratio = 0.0 if placement_prev: common = set(placement_now.keys()) & set(placement_prev.keys()) moved = sum(1 for k in common if placement_now[k] != placement_prev[k]) moved_ratio = float(moved) / float(len(common)) if common else 0.0 return { "assigned_now": float(len(placement_now)), "assigned_prev": float(len(placement_prev) if placement_prev else 0), "moved_ratio": moved_ratio, } def _gini(x: np.ndarray) -> float: """Gini coefficient for non-negative values.""" x = np.asarray(x, dtype=float) if x.size == 0: return 0.0 x = x[x >= 0] if x.size == 0: return 0.0 s = x.sum() if s == 0: return 0.0 x = np.sort(x) n = x.size # Gini = (2*sum(i*x_i)/(n*sum(x))) - (n+1)/n i = np.arange(1, n + 1, dtype=float) return float((2.0 * (i * x).sum()) / (n * s) - (n + 1.0) / n) def _entropy_from_counts(counts: np.ndarray) -> float: """Shannon entropy (natural log).""" counts = np.asarray(counts, dtype=float) total = counts.sum() if total <= 0: return 0.0 p = counts[counts > 0] / total return float(-(p * np.log(p)).sum()) def _kpi_cluster_structure(labels_now: np.ndarray) -> dict[str, float]: """Cluster size distribution KPIs (balance, inequality).""" labels_now = _safe_labels_array(labels_now) if labels_now.size == 0: return { "n_clusters": 0.0, "singleton_ratio": 0.0, "size_min": 0.0, "size_max": 0.0, "size_mean": 0.0, "size_std": 0.0, "size_entropy": 0.0, "size_gini": 0.0, } c = Counter(labels_now.tolist()) sizes = np.asarray(list(c.values()), dtype=float) n_clusters = float(len(sizes)) singleton_ratio = float(np.mean(sizes == 1.0)) if sizes.size else 0.0 return { "n_clusters": n_clusters, "singleton_ratio": singleton_ratio, "size_min": float(sizes.min()) if sizes.size else 0.0, "size_max": float(sizes.max()) if sizes.size else 0.0, "size_mean": float(sizes.mean()) if sizes.size else 0.0, "size_std": float(sizes.std(ddof=0)) if sizes.size else 0.0, "size_entropy": _entropy_from_counts(sizes), "size_gini": _gini(sizes), } def _kpi_conflict_graph(g: nx.Graph) -> dict[str, float]: """Graph structure + weight summary KPIs.""" n = g.number_of_nodes() m = g.number_of_edges() if n == 0: return { "nodes": 0.0, "edges": 0.0, "density": 0.0, "components": 0.0, "largest_component_ratio": 0.0, "deg_max": 0.0, "deg_mean": 0.0, "deg_p95": 0.0, "deg_std": 0.0, "w_sum": 0.0, "w_mean": 0.0, "w_p95": 0.0, "w_max": 0.0, } degrees = np.asarray([d for _, d in g.degree()], dtype=float) deg_max = float(degrees.max()) if degrees.size else 0.0 deg_mean = float(degrees.mean()) if degrees.size else 0.0 deg_p95 = float(np.percentile(degrees, 95)) if degrees.size else 0.0 deg_std = float(degrees.std(ddof=0)) if degrees.size else 0.0 # Edge weights (some edges might not have weight if added elsewhere) w = [] for _, _, data in g.edges(data=True): if "weight" in data: w.append(float(data["weight"])) w = np.asarray(w, dtype=float) # Components comp_sizes = [len(c) for c in nx.connected_components(g)] if n else [] largest_ratio = (max(comp_sizes) / n) if comp_sizes and n else 0.0 return { "nodes": float(n), "edges": float(m), "density": float(nx.density(g)) if n > 1 else 0.0, "components": float(len(comp_sizes)), "largest_component_ratio": float(largest_ratio), "deg_max": deg_max, "deg_mean": deg_mean, "deg_p95": deg_p95, "deg_std": deg_std, "w_sum": float(w.sum()) if w.size else 0.0, "w_mean": float(w.mean()) if w.size else 0.0, "w_p95": float(np.percentile(w, 95)) if w.size else 0.0, "w_max": float(w.max()) if w.size else 0.0, } def _kpi_migrations_from_moves( moves: Any, *, n_indiv: int, ) -> dict[str, float]: """KPIs from the return of problem.adjust().""" # In your flow, `moves` is usually a list of moved container ids. if moves is None: k = 0 elif isinstance(moves, (list, tuple, set)): k = len(moves) else: # fallback: unknown structure try: k = len(moves) # type: ignore[arg-type] except Exception: k = 0 return { "moves_count": float(k), "moves_ratio": float(k) / float(n_indiv) if n_indiv else 0.0, } def _kpi_load_balance( working_df: pd.DataFrame, *, nf: str, hf: str, metrics: list[str], ) -> dict[str, float]: """ Generic load-balance KPIs over hosts for a chosen metric. NOTE: This operates on `working_df` as provided (whatever your current window is). """ if working_df is None or working_df.empty or not metrics: return { "host_load_mean": 0.0, "host_load_std": 0.0, "host_load_cv": 0.0, "host_load_p95": 0.0, "host_load_max": 0.0, "host_load_gini": 0.0, "n_hosts": 0.0, "n_indiv": 0.0, } metric = metrics[0] if metric not in working_df.columns or hf not in working_df.columns: return { "host_load_mean": 0.0, "host_load_std": 0.0, "host_load_cv": 0.0, "host_load_p95": 0.0, "host_load_max": 0.0, "host_load_gini": 0.0, "n_hosts": float(working_df[hf].nunique()) if hf in working_df.columns else 0.0, "n_indiv": float(working_df[nf].nunique()) if nf in working_df.columns else 0.0, } host_load = working_df.groupby(hf, sort=False)[metric].sum().astype(float).values if host_load.size == 0: host_load = np.asarray([0.0]) mean = float(host_load.mean()) std = float(host_load.std(ddof=0)) cv = float(std / mean) if mean != 0.0 else 0.0 return { "host_load_mean": mean, "host_load_std": std, "host_load_cv": cv, "host_load_p95": float(np.percentile(host_load, 95)), "host_load_max": float(host_load.max()), "host_load_gini": _gini(host_load), "n_hosts": float(working_df[hf].nunique()), "n_indiv": float(working_df[nf].nunique()) if nf in working_df.columns else 0.0, } def _placement_from_working_df( working_df: pd.DataFrame, *, nf: str, hf: str, tf: str, tick: int | None = None, ) -> dict[Any, Any]: """ Derive container -> host mapping from working_df. Strategy: - use rows at `tick` if provided, otherwise use max tick in df - if multiple rows per (container, tick), take the most frequent host """ if working_df is None or working_df.empty: return {} if nf not in working_df.columns or hf not in working_df.columns or tf not in working_df.columns: return {} t = tick if tick is not None else int(working_df[tf].max()) df_t = working_df.loc[working_df[tf] == t, [nf, hf]].copy() if df_t.empty: return {} # choose most frequent host per container (robust to duplicates) def _mode_host(s: pd.Series) -> Any: vc = s.value_counts(dropna=False) return vc.index[0] if not vc.empty else None plc = df_t.groupby(nf, sort=False)[hf].apply(_mode_host).dropna() return plc.to_dict()
[docs] def eval_solutions( instance, clustering, clust_opt, problem_opt, problem, working_df, *, prev_snapshot: EvalSnapshot | None = None, tick: int | None = None, ) -> dict[str, Any]: """Run the evaluation pipeline and collect evaluation metrics.""" # 1) Build & solve clustering problem cfg = instance.config.connector.parameters nf, hf, tf = cfg.get("individual_field"), cfg.get("host_field"), cfg.get("tick_field") metrics = cfg.get("metrics") new_containers = False if len(clustering.labels) < working_df[nf].nunique(): working_df = check_missing_entries_df(working_df, tf, nf, hf, metrics) new_containers = True build_pre_clust_matrices( working_df, tf, nf, metrics, instance.get_id_map(), clustering, new_containers ) if new_containers: logging.info("\n🔍 New containers detected: updating optimization models 🔍\n") clust_opt.update_size_model( instance.get_id_map(), working_df, u_mat=clustering.u_mat, w_mat=clustering.w_mat ) else: clust_opt.build(u_mat=clustering.u_mat, w_mat=clustering.w_mat) clust_opt.solve( solver=instance.config.optimization.parameters.get("solver", "glpk"), ) # 3) Extract dual values prev_duals = clust_opt.last_duals clust_opt.fill_dual_values() # 4) Read tolerances tol = instance.config.problem.parameters.get("tol", 0.1) tol_move = instance.config.problem.parameters.get("tol_move", 0.1) # 5) Conflict detection & pick moving containers for clustering clustering.profiles = cluster_mean_profile(clustering.clust_mat) moving, nodes, edges, max_deg, mean_deg = get_moving_containers_clust( clust_opt.last_duals, prev_duals, tol, tol_move, df_clust=clustering.clust_mat, profiles=clustering.profiles, ) g_clust = get_conflict_graph(clust_opt.last_duals, prev_duals, tol) clust_conf_kpi = _kpi_conflict_graph(g_clust) (clustering.clust_mat, clust_nb_changes) = change_clustering( moving, clustering, instance.get_id_map() ) clustering.u_mat = build_adjacency_matrix(clustering.labels) clust_opt.update_adjacency_constraints(clustering.u_mat) clust_opt.solve( solver=instance.config.optimization.parameters.get("solver", "glpk"), ) clust_opt.fill_dual_values() # 6) Build & solve business problem v_mat = problem.build_place_adj_matrix(working_df, instance.get_id_map()) dv_mat = build_post_clust_matrices(clustering.clust_mat) if new_containers: problem_opt.update_size_model( instance.get_id_map(), working_df, u_mat=clustering.u_mat, v_mat=v_mat, dv_mat=dv_mat ) else: problem_opt.build(u_mat=clustering.u_mat, v_mat=v_mat, dv_mat=dv_mat) problem_opt.solve() prev_duals = problem_opt.last_duals problem_opt.fill_dual_values() # 7) Conflict detection & pick moving containers for problem # TODO use problem factory for this method moving, nodes, edges, max_deg, mean_deg = get_moving_containers_place( instance, problem_opt.last_duals, prev_duals, tol, tol_move, working_df=working_df, ) g_place = get_conflict_graph(problem_opt.last_duals, prev_duals, tol) place_conf_kpi = _kpi_conflict_graph(g_place) # 8) Apply business‑problem changes moves = problem.adjust(moving, working_df) moves_kpi = _kpi_migrations_from_moves(moves, n_indiv=len(instance.get_id_map())) v_mat = problem.build_place_adj_matrix(working_df, instance.get_id_map()) problem_opt.update_adjacency_constraints(v_mat) problem_opt.solve() problem_opt.fill_dual_values() # 9) Collect metrics metrics: dict[str, Any] = { # clustering conflict **{f"clust_conf__{k}": v for k, v in clust_conf_kpi.items()}, # placement conflict **{f"place_conf__{k}": v for k, v in place_conf_kpi.items()}, # moves applied by adjust() **{f"moves__{k}": v for k, v in moves_kpi.items()}, "moving_containers": moves, } if ( tick is None and working_df is not None and not working_df.empty and tf in working_df.columns ): tick = int(working_df[tf].max()) tick = int(tick) if tick is not None else -1 placement_now = _placement_from_working_df( working_df, nf=nf, hf=hf, tf=tf, tick=tick, ) snapshot, step_kpi = eval_step_kpis( instance, labels=clustering.labels, placement=placement_now, prev_snapshot=prev_snapshot, tick=tick, working_df=working_df, ) metrics.update(step_kpi) return moves, metrics, snapshot
[docs] def get_conflict_graph( cur_duals: dict[Any, float], prev_duals: dict[Any, float], tol: float ) -> nx.Graph: """Build conflict graph where edges represent dual increases above tolerance.""" g = nx.Graph() cur_duals = cur_duals or {} prev_duals = prev_duals or {} # Work with the union of keys from previous and current iterations all_idx = set(cur_duals.keys()) | set(prev_duals.keys()) for idx in all_idx: cur = cur_duals.get(idx, 0.0) prev = prev_duals.get(idx, 0.0) delta = abs(cur - prev) if delta <= tol: continue # idx is usually a tuple like (c_i, c_j) for must_link constraints if isinstance(idx, tuple) and len(idx) == 2: a, b = idx g.add_edge(a, b, weight=delta) else: # Fallback: single node constraint, or unexpected index type g.add_node(idx, weight=delta) return g
[docs] def get_moving_containers_clust( cur_duals: dict[Any, float], prev_duals: dict[Any, float], tol: float, tol_move: float, df_clust: pd.DataFrame, profiles: np.ndarray, ) -> tuple[list[Any], int, int, int, float]: """Select containers to move from clustering conflict graph.""" g = get_conflict_graph(cur_duals, prev_duals, tol) n_nodes, n_edges = g.number_of_nodes(), g.number_of_edges() degrees = sorted(g.degree(), key=lambda x: x[1], reverse=True) if not degrees: return [], n_nodes, n_edges, 0, 0.0 max_deg = degrees[0][1] mean_deg = sum(d for _, d in degrees) / len(degrees) moving = [] budget = max(0, int(math.ceil(len(df_clust) * tol_move))) while degrees and len(moving) < budget: cid, deg = degrees[0] if deg == 0: # isolated node: just drop it from the graph g.remove_node(cid) elif deg > 1: # high-degree node: move it moving.append(cid) g.remove_node(cid) else: # deg == 1: pick its single neighbor as partner partner = next(iter(g.neighbors(cid))) to_move = get_far_container(cid, partner, df_clust, profiles) moving.append(to_move) g.remove_node(cid) if partner in g: g.remove_node(partner) isolates = list(nx.isolates(g)) if isolates: g.remove_nodes_from(isolates) # recompute degrees after mutations degrees = sorted(g.degree(), key=lambda x: x[1], reverse=True) return moving, n_nodes, n_edges, max_deg, mean_deg
[docs] def get_moving_containers_place( instance, cur_duals, prev_duals: dict[Any, float], tol: float, tol_move: float, working_df: pd.DataFrame, ) -> tuple[list[Any], int, int, int, float]: """Select containers to move from placement conflict graph.""" g = get_conflict_graph(cur_duals, prev_duals, tol) n_nodes, n_edges = g.number_of_nodes(), g.number_of_edges() degrees = sorted(g.degree(), key=lambda x: x[1], reverse=True) if not degrees: return [], n_nodes, n_edges, 0, 0.0 max_deg = degrees[0][1] mean_deg = sum(d for _, d in degrees) / len(degrees) moving = [] budget = len(instance.get_id_map()) * tol_move while degrees and len(moving) < budget: cid, deg = degrees[0] if deg == 0: # isolated node: just drop it from the graph g.remove_node(cid) elif deg > 1: # high-degree node: move it moving.append(cid) g.remove_node(cid) else: # deg == 1: pick its single neighbor as partner partner = next(iter(g.neighbors(cid))) to_move = get_container_tomove(instance, cid, partner, working_df) moving.append(to_move) g.remove_node(cid) if partner in g: g.remove_node(partner) isolates = list(nx.isolates(g)) if isolates: g.remove_nodes_from(isolates) # recompute degrees after mutations degrees = sorted(g.degree(), key=lambda x: x[1], reverse=True) return moving, n_nodes, n_edges, max_deg, mean_deg
[docs] def get_container_tomove(instance, c1, c2, working_df: pd.DataFrame): """Pick between two containers by variance on c1's placement node. Chooses the container whose removal makes the node's time series smoother (i.e., smaller variance of node_ts - container_ts). """ cfg = instance.config.connector.parameters nf, hf, tf = cfg.get("individual_field"), cfg.get("host_field"), cfg.get("tick_field") metric = cfg.get("metrics")[0] df = working_df[[nf, hf, tf, metric]].copy() # Determine c1's host (fallback: pick c2 if c1 has no rows) host_rows = df.loc[df[nf] == c1, hf] if host_rows.empty: return c2 host = host_rows.iloc[0] # Node time series on c1's host, indexed by time node_ts = df[df[hf] == host].groupby(tf, sort=True)[metric].sum().sort_index() idx = node_ts.index def cont_ts(cid): s = df[df[nf] == cid].groupby(tf, sort=True)[metric].sum().sort_index() # align to node timeline; missing ticks -> 0 return s.reindex(idx, fill_value=0) c1_ts = cont_ts(c1) c2_ts = cont_ts(c2) # Residual node load after removing each container res1 = node_ts - c1_ts res2 = node_ts - c2_ts # Population variance (ddof=0) to avoid small-sample weirdness var1 = float(res1.var(ddof=0)) var2 = float(res2.var(ddof=0)) # Pick the removal that yields *lower* variance (smoother) return c1 if var1 < var2 else c2
[docs] def eval_step_kpis( instance, labels, placement, prev_snapshot: EvalSnapshot | None, tick: int, *, working_df: pd.DataFrame | None = None, ) -> tuple[EvalSnapshot, dict[str, Any]]: """ Evaluate the two-stage result at this tick, comparing to previous snapshot. Returns: (snapshot, metrics_dict) """ labels_now = _safe_labels_array(labels) placement_now = placement or {} # ---- existing KPIs clustering_kpi = _kpi_clustering( labels_now=labels_now, labels_prev=(prev_snapshot.labels if prev_snapshot else None), ) placement_kpi = _kpi_placement( placement_now=placement_now, placement_prev=(prev_snapshot.placement if prev_snapshot else None), ) # ---- new KPIs: cluster structure clst_struct_kpi = _kpi_cluster_structure(labels_now) # ---- new KPIs: placement delta counts moved_count = 0 stable_count = 0 if prev_snapshot and prev_snapshot.placement: common = set(placement_now.keys()) & set(prev_snapshot.placement.keys()) moved_count = sum(1 for k in common if placement_now[k] != prev_snapshot.placement[k]) stable_count = len(common) - moved_count placement_delta_kpi = { "moved_count": float(moved_count), "stable_count": float(stable_count), } # ---- new KPIs: load balance (optional if working_df provided) cfg = instance.config.connector.parameters nf, hf = cfg.get("individual_field"), cfg.get("host_field") metrics_list = cfg.get("metrics") or [] load_kpi = {} if working_df is not None: load_kpi = _kpi_load_balance( working_df, nf=nf, hf=hf, metrics=metrics_list, ) metrics: dict[str, Any] = { "tick": tick, **{f"clst__{k}": v for k, v in clustering_kpi.items()}, **{f"clst_struct__{k}": v for k, v in clst_struct_kpi.items()}, **{f"plc__{k}": v for k, v in placement_kpi.items()}, **{f"plc_delta__{k}": v for k, v in placement_delta_kpi.items()}, **{f"load__{k}": v for k, v in load_kpi.items()}, } snapshot = EvalSnapshot( labels=labels_now, placement=placement_now, tick=tick, ) return snapshot, metrics