"""Provide placement heuristics and all placement-related methods."""
from __future__ import annotations
import logging
from collections.abc import Iterable, Sequence
from dataclasses import dataclass
from typing import Any
import numpy as np
import pandas as pd
from hots.core.interfaces import ProblemPlugin
@dataclass
class _Fields:
indiv: str
host: str
tick: str
metric: str # we use the first metric
# ---------- module helpers ----------------------------------------------------
def _get_ts(x: Any, t: int | None = None) -> np.ndarray:
"""Return a 1D numpy array time series.
- If `x` is a scalar, broadcast to length T (must be provided).
- If `x` is already a sequence/array, convert to np.ndarray and slice to T.
"""
if isinstance(x, (list, tuple, np.ndarray, pd.Series)):
arr = np.asarray(x, dtype=float)
if t is not None:
arr = arr[:t]
return arr.astype(float, copy=False)
# scalar
if t is None:
raise ValueError("Cannot broadcast scalar without t")
return np.full(t, float(x), dtype=float)
def _fits(remaining: np.ndarray, demand: np.ndarray) -> bool:
"""Return True if `demand` fits into `remaining` on every tick."""
return np.all(remaining - demand >= 0.0)
def _safe_first(values: Iterable[Any], default: Any = None) -> Any:
"""Return the first element of an iterable, or default if empty."""
for v in values:
return v
return default
# -----------------------------------------------------------------------------
[docs]
class PlacementPlugin(ProblemPlugin):
"""Handles the 'placement' business problem."""
# ---------- lifecycle -----------------------------------------------------
def __init__(self, instance):
"""Initialize the PlacementPlugin with the given instance."""
self.instance = instance
cfg = instance.config.connector.parameters
self._f = _Fields(
indiv=cfg.get("individual_field"),
host=cfg.get("host_field"),
tick=cfg.get("tick_field"),
metric=cfg.get("metrics")[0],
)
self.pending_changes = {}
# ---------- ProblemPlugin API --------------------------------------------
[docs]
def adjust(self, moving: list[Any], working_df):
"""Finalize move list by finding new targets and apply moves."""
tick = self._f.tick
tmin, tmax = int(working_df[tick].min()), int(working_df[tick].max())
order = getattr(self, "params", {}).get("order", "max")
moves = self.move_list_containers(
self.instance, working_df, moving, tmin, tmax, order=order
)
return moves
[docs]
def initial(
self,
labels,
df_indiv: pd.DataFrame,
df_host: pd.DataFrame,
) -> list[dict[str, Any]]:
"""From initial labels, get first placement solution."""
from hots.plugins.clustering.builder import (
build_matrix_indiv_attr,
build_similarity_matrix,
)
# 1) Build the (containers × features) matrix
mat = build_matrix_indiv_attr(
df_indiv,
self.instance.config.tick_field,
self.instance.config.individual_field,
self.instance.config.metrics,
self.instance.get_id_map(),
)
# 2) Turn it into a similarity (distance) matrix
w = build_similarity_matrix(mat)
return self.allocation_distant_pairwise(df_indiv, df_host, w, labels.tolist())
@staticmethod
def _window_frames(
inst, working_df: pd.DataFrame, tmin: int, tmax: int
) -> tuple[pd.DataFrame, pd.DataFrame, np.ndarray]:
cfg = inst.config.connector.parameters
nf, hf, tf = cfg.get("individual_field"), cfg.get("host_field"), cfg.get("tick_field")
metric = cfg.get("metrics")[0]
indiv = working_df[[nf, hf, tf, metric]]
host = inst.df_host[[hf, tf, metric]]
mask_indiv = (indiv[tf] >= tmin) & (indiv[tf] <= tmax)
mask_host = (host[tf] >= tmin) & (host[tf] <= tmax)
w_indiv = indiv.loc[mask_indiv]
w_host = host.loc[mask_host]
ticks = np.sort(w_host[tf].unique())
return w_indiv, w_host, ticks
@staticmethod
def _container_ts(w_indiv: pd.DataFrame, ticks: np.ndarray, inst, container_id) -> pd.Series:
cfg = inst.config.connector.parameters
tf = cfg.get("tick_field")
nf = cfg.get("individual_field")
metric = cfg.get("metrics")[0]
return (
w_indiv[w_indiv[nf] == container_id]
.groupby(tf, sort=True)[metric]
.sum()
.reindex(ticks, fill_value=0.0)
)
@staticmethod
def _candidate_nodes(w_indiv: pd.DataFrame, inst, old_id) -> list:
hf = inst.config.connector.parameters.get("host_field")
nodes = w_indiv[hf].unique()
return [n for n in nodes if n != old_id]
@staticmethod
def _capacity_map(inst) -> dict:
hf = inst.config.connector.parameters.get("host_field")
metric = inst.config.connector.parameters.get("metrics")[0]
# capacity per node assumed scalar here
return inst.df_meta.set_index(hf)[metric].to_dict()
@staticmethod
def _node_ts(w_host: pd.DataFrame, inst, node, ticks: np.ndarray) -> pd.Series:
cfg = inst.config.connector.parameters
tf = cfg.get("tick_field")
hf = cfg.get("host_field")
metric = cfg.get("metrics")[0]
return (
w_host[w_host[hf] == node]
.groupby(tf, sort=True)[metric]
.sum()
.reindex(ticks, fill_value=0.0)
)
def _choose_best_existing_node(
self,
candidates: list,
w_host: pd.DataFrame,
ticks: np.ndarray,
caps: dict,
c_ts: pd.Series,
inst,
):
best_node, best_var = None, np.inf
for node in candidates:
cap_node = caps.get(node)
if cap_node is None:
continue
node_ts = self._node_ts(w_host, inst, node, ticks)
combined = node_ts + c_ts
# feasibility check
if np.any(combined.values > cap_node):
continue
v = float(combined.var(ddof=0))
if v < best_var:
best_var, best_node = v, node
return best_node
@staticmethod
def _pick_new_node(inst, used_nodes: set, fallback):
hf = inst.config.connector.parameters.get("host_field")
for nid in inst.df_meta[hf].unique():
if nid not in used_nodes:
return nid
return fallback
# --- simplified public method ---------------------------------------------
[docs]
def move_container(
self,
inst,
container_id,
tmin: int,
tmax: int,
old_id,
working_df: pd.DataFrame,
):
"""
Move `container_id` to the best node in [tmin, tmax], choosing a feasible
candidate (capacity per tick) that minimizes the variance of
(node_ts + container_ts). Falls back to opening a new node if needed.
Returns a move dict or None.
"""
hf = inst.config.connector.parameters.get("host_field")
logging.info(f"Moving container: {container_id}")
w_indiv, w_host, ticks = self._window_frames(inst, working_df, tmin, tmax)
if ticks.size == 0:
return None # no data in window
c_ts = self._container_ts(w_indiv, ticks, inst, container_id)
candidates = self._candidate_nodes(w_indiv, inst, old_id)
caps = self._capacity_map(inst)
best_node = self._choose_best_existing_node(
candidates=candidates,
w_host=w_host,
ticks=ticks,
caps=caps,
c_ts=c_ts,
inst=inst,
)
if best_node is None:
logging.info(
f"Impossible to move {container_id} on existing nodes. We need to open a new node."
)
in_use = set(w_indiv[hf].unique().tolist())
best_node = self._pick_new_node(inst, in_use, fallback=old_id)
if best_node == old_id:
logging.info("Impossible to open a new node: we keep the old node.")
logging.info(f"He can go on {best_node} (old is {old_id})")
if best_node != old_id:
return {
"container_name": container_id,
"old_host": old_id,
"new_host": best_node,
}
return None
[docs]
def move_list_containers(
self, instance: Any, working_df, moving: list[int], tmin: int, tmax: int, order: str = "max"
) -> list[dict[str, Any]]:
"""Move the list of containers to move.
Process:
1. Remove all moving containers from their nodes first
2. Store their old host IDs
3. Order containers by consumption (max or mean)
4. Reassign each container to best available node
:param instance: Problem instance
:param moving: List of container indices to move
:param tmin: Minimum window time
:param tmax: Maximum window time
:param order: Order to consider ('max' or 'mean'), defaults to 'max'
:return: List of move dicts
"""
f = self._f
moves_list: list[dict[str, Any]] = []
old_ids: dict[int, Any] = {}
logging.info("List of moving containers (placement):")
logging.info(moving)
# Step 1: Remove all moving containers and store old hosts
for mvg_cont in moving:
old_host = _safe_first(
working_df.loc[working_df[f.indiv] == mvg_cont][f.host].to_numpy()
)
old_ids[mvg_cont] = old_host
# Remove from tracking (capacity will be freed)
# Step 2: Compute consumption for each container
mvg_conts_cons: dict[Any, np.ndarray] = {}
for mvg_cont in moving:
mvg_conts_cons[mvg_cont] = working_df.loc[working_df[f.indiv] == mvg_cont][
f.metric
].to_numpy()
# Step 3: Order containers by consumption
if order == "max":
order_indivs = ((float(np.max(cons)), c) for c, cons in mvg_conts_cons.items())
elif order == "mean":
order_indivs = ((float(np.mean(cons)), c) for c, cons in mvg_conts_cons.items())
else:
order_indivs = ((0.0, c) for c in mvg_conts_cons.keys())
# Step 4: Move each container to best destination
for _temp, mvg_cont in sorted(order_indivs, reverse=True):
move = self.move_container(
instance, mvg_cont, tmin, tmax, old_ids[mvg_cont], working_df
)
if move is not None:
moves_list.append(move)
return moves_list
# ---------- time-series helpers ------------------------------------------
def _time_horizon(
self, df_indiv: pd.DataFrame, df_host_meta: pd.DataFrame, time_horizon: int | None
) -> int:
"""Infer the working time horizon length (number of ticks)."""
if time_horizon is not None:
return int(time_horizon)
f = self._f
c_first = _safe_first(df_indiv[f.metric].to_numpy())
if isinstance(c_first, (list, tuple, np.ndarray, pd.Series)):
return len(np.asarray(c_first))
h_first = _safe_first(df_host_meta[f.metric].to_numpy())
if isinstance(h_first, (list, tuple, np.ndarray, pd.Series)):
return len(np.asarray(h_first))
return 1
def _capacity_for_host(self, host_row: pd.Series, t: int) -> np.ndarray:
"""Host capacity time-series (length t)."""
return _get_ts(host_row[self._f.metric], t)
def _demand_for_container(self, indiv_row: pd.Series, t: int) -> np.ndarray:
"""Container demand time-series (length t)."""
return _get_ts(indiv_row[self._f.metric], t)
# ---------- compact state builders ----------------------------------------
def _init_remaining(self, df_host_meta: pd.DataFrame, t: int) -> list[dict[str, Any]]:
"""List of hosts with remaining capacity timelines."""
f = self._f
hosts = []
for _, h in df_host_meta.iterrows():
hid = h[f.host]
rem = self._capacity_for_host(h, t).copy()
hosts.append({"id": hid, "remaining": rem})
return hosts
def _current_host_map(self) -> dict[Any, Any]:
"""Map container -> current host (from instance.df_indiv)."""
f = self._f
return {r[f.indiv]: r[f.host] for _, r in self.instance.df_indiv.iterrows()}
def _compute_indiv_demands(self, df_indiv: pd.DataFrame, t: int) -> dict[Any, np.ndarray]:
"""Map container -> demand timeline."""
f = self._f
return {r[f.indiv]: self._demand_for_container(r, t) for _, r in df_indiv.iterrows()}
def _compute_host_remaining(
self,
df_host_meta: pd.DataFrame,
indiv_dem: dict[Any, np.ndarray],
t: int,
) -> dict[Any, np.ndarray]:
"""Map host -> remaining capacity timeline after subtracting current placements."""
f = self._f
remaining: dict[Any, np.ndarray] = {
h[f.host]: self._capacity_for_host(h, t) for _, h in df_host_meta.iterrows()
}
for _, r in self.instance.df_indiv.iterrows():
hid = r[f.host]
cid = r[f.indiv]
d = indiv_dem.get(cid) or self._demand_for_container(r, t)
remaining[hid] = remaining[hid] - d
return remaining
def _hosts_by_tightness(self, remaining: dict[Any, np.ndarray]) -> list[tuple[Any, np.ndarray]]:
"""Hosts sorted by increasing minimum remaining capacity (most constrained first)."""
return sorted(remaining.items(), key=lambda kv: float(np.min(kv[1])))
def _heaviest_containers_on(self, host_id: Any, t: int) -> list[tuple[Any, float]]:
"""Containers on a host, ordered by descending peak demand over horizon."""
f = self._f
rows = self.instance.df_indiv[self.instance.df_indiv[f.host] == host_id]
if rows.empty:
return []
peaks = rows[f.metric].map(lambda a: float(np.max(_get_ts(a, t))))
order = rows.assign(_peak=peaks).sort_values("_peak", ascending=False)
return [
(r[f.indiv], float(p))
for (_, r), p in zip(order.iterrows(), order["_peak"], strict=True)
]
def _best_destination(
self, remaining: dict[Any, np.ndarray], src_host: Any, demand: np.ndarray
) -> Any | None:
"""Feasible destination host with maximum slack after placing `demand`."""
candidate = None
best_slack = None
for dst, rdst in remaining.items():
if dst == src_host:
continue
if not _fits(rdst, demand):
continue
slack = float(np.min(rdst - demand))
if best_slack is None or slack > best_slack:
best_slack = slack
candidate = dst
return candidate
def _load_at_tick(self, series_or_array: Any, tick: int) -> float:
"""Scalar load at a given tick from a time-series-like value."""
try:
ts = _get_ts(series_or_array, None)
return float(ts[min(max(tick, 0), len(ts) - 1)])
except Exception:
return float(series_or_array)
def _host_remaining_over_horizon(
self, df_host_meta: pd.DataFrame, df_indiv: pd.DataFrame
) -> dict[Any, np.ndarray]:
"""Remaining capacity time-series for each host over the model horizon."""
f = self._f
t = self._time_horizon(df_indiv, df_host_meta, None)
remaining: dict[Any, np.ndarray] = {
h[f.host]: self._capacity_for_host(h, t) for _, h in df_host_meta.iterrows()
}
for _, r in self.instance.df_indiv.iterrows():
hid = r[f.host]
demand = self._demand_for_container(r, t)
remaining[hid] = remaining[hid] - demand
return remaining
def _host_remaining_at_tick(
self, df_host_meta: pd.DataFrame, df_indiv: pd.DataFrame, tick: int
) -> dict[Any, float]:
"""Remaining capacity per host at a given tick (scalar)."""
t = self._time_horizon(df_indiv, df_host_meta, None)
rem_ts = self._host_remaining_over_horizon(df_host_meta, df_indiv)
idx = min(max(tick, 0), t - 1)
return {h: float(rem[idx]) for h, rem in rem_ts.items()}
# ---------- allocation strategies ----------------------------------------
[docs]
def allocation_ffd(
self,
df_indiv: pd.DataFrame,
df_host_meta: pd.DataFrame,
time_horizon: int | None = None,
) -> list[dict[str, Any]]:
"""First-Fit Decreasing across time-series capacity.
Sort containers by descending peak demand; assign to first host whose
remaining timeline supports the entire demand.
"""
f = self._f
t = self._time_horizon(df_indiv, df_host_meta, time_horizon)
hosts = self._init_remaining(df_host_meta, t)
current_host = self._current_host_map()
items: list[tuple[Any, np.ndarray]] = []
for _, r in df_indiv.iterrows():
cid = r[f.indiv]
d = self._demand_for_container(r, t)
items.append((cid, d))
items.sort(key=lambda x: (float(np.max(x[1])), float(np.sum(x[1]))), reverse=True)
moves: list[dict[str, Any]] = []
for cid, demand in items:
for h in hosts:
if _fits(h["remaining"], demand):
src = current_host.get(cid, None)
dst = h["id"]
if src != dst:
moves.append({"container": cid, "src": src, "dst": dst})
h["remaining"] = h["remaining"] - demand
break # placed
return moves
[docs]
def allocation_spread(
self,
df_indiv: pd.DataFrame,
df_host_meta: pd.DataFrame,
labels: pd.Series | None = None,
) -> list[dict[str, Any]]:
"""Greedy spread over the full horizon.
Move one heaviest container off each most-constrained host to the feasible
host that maximizes slack after placement. One move per source host.
"""
t = self._time_horizon(df_indiv, df_host_meta, None)
indiv_dem = self._compute_indiv_demands(df_indiv, t)
host_remaining = self._compute_host_remaining(df_host_meta, indiv_dem, t)
moves: list[dict[str, Any]] = []
for worst_host, _ in self._hosts_by_tightness(host_remaining):
for cid, _ in self._heaviest_containers_on(worst_host, t):
d = indiv_dem[cid]
dst = self._best_destination(host_remaining, worst_host, d)
if dst is None:
continue
moves.append({"container": cid, "src": worst_host, "dst": dst})
host_remaining[worst_host] = host_remaining[worst_host] + d
host_remaining[dst] = host_remaining[dst] - d
break # one move per (worst) host per pass
return moves
[docs]
def allocation_distant_pairwise(
self,
df_indiv: pd.DataFrame,
df_host_meta: pd.DataFrame,
distance_mat: np.ndarray,
labels: Sequence[Any],
lower_bound: float | None = None,
) -> list[dict[str, Any]]:
"""Place containers by repeatedly pairing the **most distant** clusters
and assigning their items together when possible.
"""
t = self._time_horizon(df_indiv, df_host_meta, None)
hosts = self._init_remaining(df_host_meta, t)
current_host = self._current_host_map()
cid_to_demand = self._compute_indiv_demands(df_indiv, t)
label_to_conts, label_order = self._group_by_labels_series_or_list(labels, df_indiv)
remaining = set(range(len(label_order)))
moves: list[dict[str, Any]] = []
while remaining:
if len(remaining) == 1:
self._spread_last_cluster_pairwise(
remaining,
label_order,
label_to_conts,
cid_to_demand,
hosts,
current_host,
moves,
)
break
pair = self._select_best_distant_pair(remaining, distance_mat, lower_bound)
if pair is None:
self._spread_all_remaining_pairwise(
remaining,
label_order,
label_to_conts,
cid_to_demand,
hosts,
current_host,
moves,
)
break
self._process_cluster_pair(
pair, label_order, label_to_conts, cid_to_demand, hosts, current_host, moves
)
remaining.discard(pair[0])
remaining.discard(pair[1])
return moves
def _try_place_single_pairwise(
self, cid: Any, cid_to_demand: dict, hosts: list, current_host: dict, moves: list
) -> None:
"""Try to place a single container on any available host."""
d = cid_to_demand[cid]
for i in range(len(hosts)):
rem = hosts[i]["remaining"]
if _fits(rem, d):
src = current_host.get(cid, None)
dst = hosts[i]["id"]
if src != dst:
moves.append({"container": cid, "src": src, "dst": dst})
hosts[i]["remaining"] = rem - d
return
def _find_host_for_pair_pairwise(
self, d_i: np.ndarray, d_j: np.ndarray, hosts: list
) -> int | None:
"""Find a host that can fit both containers."""
for i in range(len(hosts)):
if _fits(hosts[i]["remaining"], d_i + d_j):
return i
return None
def _place_pair_pairwise(
self, c_i: Any, c_j: Any, cid_to_demand: dict, hosts: list, current_host: dict, moves: list
) -> None:
"""Place a pair of containers on the same host if possible."""
di, dj = cid_to_demand[c_i], cid_to_demand[c_j]
idx = self._find_host_for_pair_pairwise(di, dj, hosts)
if idx is None:
self._try_place_single_pairwise(c_i, cid_to_demand, hosts, current_host, moves)
self._try_place_single_pairwise(c_j, cid_to_demand, hosts, current_host, moves)
return
dst = hosts[idx]["id"]
for cid, _d in ((c_i, di), (c_j, dj)):
src = current_host.get(cid, None)
if src != dst:
moves.append({"container": cid, "src": src, "dst": dst})
hosts[idx]["remaining"] = hosts[idx]["remaining"] - (di + dj)
def _select_best_distant_pair(
self, rem_set: set, distance_mat: np.ndarray, lower_bound: float | None
) -> tuple[int, int] | None:
"""Select the pair of clusters with the largest distance."""
best, best_val = None, -np.inf
rem_list = list(rem_set)
for i in range(len(rem_list)):
for j in range(i + 1, len(rem_list)):
a, b = rem_list[i], rem_list[j]
val = float(distance_mat[a, b])
if np.isnan(val) or (lower_bound is not None and val < lower_bound):
continue
if val > best_val:
best_val, best = val, (a, b)
return best
def _get_sorted_containers_pairwise(
self, label: Any, label_to_conts: dict, cid_to_demand: dict
) -> list:
"""Get containers for a label, sorted by demand (descending)."""
conts = list(label_to_conts.get(label, []))
conts.sort(key=lambda c: float(np.max(cid_to_demand[c])), reverse=True)
return conts
def _spread_last_cluster_pairwise(
self,
remaining: set,
label_order: list,
label_to_conts: dict,
cid_to_demand: dict,
hosts: list,
current_host: dict,
moves: list,
) -> None:
"""Spread the last remaining cluster across hosts."""
last_idx = next(iter(remaining))
lab = label_order[last_idx]
conts = self._get_sorted_containers_pairwise(lab, label_to_conts, cid_to_demand)
for cid in conts:
self._try_place_single_pairwise(cid, cid_to_demand, hosts, current_host, moves)
def _spread_all_remaining_pairwise(
self,
remaining: set,
label_order: list,
label_to_conts: dict,
cid_to_demand: dict,
hosts: list,
current_host: dict,
moves: list,
) -> None:
"""Spread all remaining clusters when no valid pairs exist."""
for idx in list(remaining):
lab = label_order[idx]
conts = self._get_sorted_containers_pairwise(lab, label_to_conts, cid_to_demand)
for cid in conts:
self._try_place_single_pairwise(cid, cid_to_demand, hosts, current_host, moves)
def _process_cluster_pair(
self,
pair: tuple[int, int],
label_order: list,
label_to_conts: dict,
cid_to_demand: dict,
hosts: list,
current_host: dict,
moves: list,
) -> None:
"""Process a pair of clusters by placing their containers."""
i_idx, j_idx = pair
li, lj = label_order[i_idx], label_order[j_idx]
list_i = list(label_to_conts.get(li, []))
list_j = list(label_to_conts.get(lj, []))
n_pairs = min(len(list_i), len(list_j))
# Place pairs
for k in range(n_pairs):
self._place_pair_pairwise(
list_i[k], list_j[k], cid_to_demand, hosts, current_host, moves
)
# Place remaining containers
self._place_remaining_containers_pairwise(
list_i[n_pairs:], cid_to_demand, hosts, current_host, moves
)
self._place_remaining_containers_pairwise(
list_j[n_pairs:], cid_to_demand, hosts, current_host, moves
)
def _place_remaining_containers_pairwise(
self,
container_list: list,
cid_to_demand: dict,
hosts: list,
current_host: dict,
moves: list,
) -> None:
"""Place remaining containers sorted by demand."""
sorted_conts = sorted(
container_list, key=lambda c: float(np.max(cid_to_demand[c])), reverse=True
)
for cid in sorted_conts:
self._try_place_single_pairwise(cid, cid_to_demand, hosts, current_host, moves)
[docs]
def spread_containers(
self, df_indiv: pd.DataFrame, tick: int, *, single_move: bool = False
) -> list[dict[str, Any]]:
"""Greedy spread **at a single tick**."""
df = self.instance.df_indiv
df_host_meta = getattr(self.instance, "df_host_meta", None)
if df_host_meta is None or df.empty:
return []
moves: list[dict[str, Any]] = []
rem = self._host_remaining_at_tick(df_host_meta, df_indiv, tick)
while True:
moved = self._try_spread_one_container(df, rem, tick, moves)
if not moved or single_move:
break
return moves
def _try_spread_one_container(
self, df: pd.DataFrame, rem: dict[Any, float], tick: int, moves: list
) -> bool:
"""Try to spread one container from the most loaded host. Returns True if moved."""
f = self._f
worst_host = min(rem.items(), key=lambda kv: kv[1])[0]
subset = df[df[f.host] == worst_host]
if subset.empty:
return False
subset = self._add_load_column(subset, tick)
for _, r in subset.iterrows():
cid = r[f.indiv]
load = self._load_at_tick(r[f.metric], tick)
dst = self._find_best_destination_spread(load, worst_host, rem)
if dst is None:
continue
moves.append({"container": cid, "src": worst_host, "dst": dst})
rem[worst_host] += load
rem[dst] -= load
df.loc[df[f.indiv] == cid, f.host] = dst
return True
return False
def _add_load_column(self, subset: pd.DataFrame, tick: int) -> pd.DataFrame:
"""Add load column and sort by descending load."""
f = self._f
subset = subset.assign(_load=subset[f.metric].map(lambda a: self._load_at_tick(a, tick)))
return subset.sort_values("_load", ascending=False)
def _find_best_destination_spread(
self, load: float, src_host: Any, rem: dict[Any, float]
) -> Any | None:
"""Find the best destination host for a container with given load."""
dst, best_slack = None, None
for h, slack in rem.items():
if h == src_host or slack < load:
continue
s = slack - load
if best_slack is None or s > best_slack:
best_slack, dst = s, h
return dst
# Backward-compat thin wrapper (kept to avoid breaking callers)
[docs]
def spread_containers_new(self, df_indiv: pd.DataFrame, tick: int) -> list[dict[str, Any]]:
"""Single step of `spread_containers` (kept for compatibility)."""
return self.spread_containers(df_indiv, tick, single_move=True)
# ----- Clustering colocalization (merged / simplified) --------------------
def _group_by_labels_series_or_list(
self, labels: Sequence[Any] | pd.Series, df_indiv: pd.DataFrame
) -> tuple[dict[Any, list[Any]], list[Any]]:
"""Return (label -> [container_id]), and ordered list of unique labels.
Works if `labels` is:
- Series indexed by container ids,
- Series aligned with df_indiv rows,
- list/ndarray aligned with df_indiv rows.
"""
f = self._f
label_to_conts: dict[Any, list[Any]] = {}
if isinstance(labels, pd.Series):
if set(labels.index) >= set(df_indiv[f.indiv]):
for _, r in df_indiv.iterrows():
cid = r[f.indiv]
lab = labels.loc[cid]
label_to_conts.setdefault(lab, []).append(cid)
else:
for cid, lab in zip(df_indiv[f.indiv], labels, strict=True):
label_to_conts.setdefault(lab, []).append(cid)
else:
labs = np.asarray(labels)
for cid, lab in zip(df_indiv[f.indiv].tolist(), labs, strict=True):
label_to_conts.setdefault(lab, []).append(cid)
uniq = list(label_to_conts.keys())
return label_to_conts, uniq
[docs]
def colocalize_clusters(
self, df_indiv: pd.DataFrame, df_host_meta: pd.DataFrame, labels: pd.Series, n_clusters: int
) -> list[dict[str, Any]]:
"""Colocalize cluster members using **majority-host** heuristic.
For each cluster:
1) Identify the majority host (host with most members of that cluster).
2) Move minority members to that host if they fit over the full horizon.
Containers are processed in descending peak demand to increase feasibility.
"""
t = self._time_horizon(df_indiv, df_host_meta, None)
remaining = self._host_remaining_over_horizon(df_host_meta, df_indiv)
df = self.instance.df_indiv
cluster_of: dict[Any, int] = labels.to_dict()
# host -> members and majority host per cluster
host_members: dict[Any, list[Any]] = {
h: df[df[self._f.host] == h][self._f.indiv].tolist() for h in remaining
}
cluster_counts: dict[int, dict[Any, int]] = {}
for h, members in host_members.items():
for cid in members:
c = cluster_of.get(cid, -1)
cluster_counts.setdefault(c, {}).setdefault(h, 0)
cluster_counts[c][h] += 1
majority_host: dict[int, Any] = {
c: max(counts.items(), key=lambda kv: kv[1])[0]
for c, counts in cluster_counts.items()
if counts
}
# precompute demand & peaks
demand = {r[self._f.indiv]: self._demand_for_container(r, t) for _, r in df.iterrows()}
peak = {cid: float(np.max(d)) for cid, d in demand.items()}
# minority members → try to move to their cluster majority host
candidates: list[tuple[Any, Any]] = []
for cid, cl in cluster_of.items():
tgt = majority_host.get(cl)
if tgt is None:
continue
cur_vals = df.loc[df[self._f.indiv] == cid, self._f.host].values
if len(cur_vals) and cur_vals[0] != tgt:
candidates.append((cid, tgt))
candidates.sort(key=lambda x: peak.get(x[0], 0.0), reverse=True)
moves: list[dict[str, Any]] = []
for cid, tgt in candidates:
cur_vals = df.loc[df[self._f.indiv] == cid, self._f.host].values
if len(cur_vals) == 0:
continue
src = cur_vals[0]
d = demand[cid]
if _fits(remaining[tgt], d):
moves.append({"container": cid, "src": src, "dst": tgt})
remaining[src] = remaining[src] + d
remaining[tgt] = remaining[tgt] - d
df.loc[df[self._f.indiv] == cid, self._f.host] = tgt
return moves
# Backward-compat thin wrapper (kept to avoid breaking callers)
[docs]
def colocalize_clusters_new(
self, df_indiv: pd.DataFrame, df_host_meta: pd.DataFrame, labels: pd.Series, n_clusters: int
) -> list[dict[str, Any]]:
"""Compatibility wrapper for the merged `colocalize_clusters`."""
return self.colocalize_clusters(df_indiv, df_host_meta, labels, n_clusters)
[docs]
def place_opposite_clusters(
self,
df_indiv: pd.DataFrame,
df_host_meta: pd.DataFrame,
labels: pd.Series,
clusters_to_separate: Sequence[int],
) -> list[dict[str, Any]]:
"""Enforce strong separation for a set of 'opposite' clusters."""
t = self._time_horizon(df_indiv, df_host_meta, None)
remaining = self._host_remaining_over_horizon(df_host_meta, df_indiv)
df = self.instance.df_indiv
f = self._f
cl_of: dict[Any, int] = labels.to_dict()
target_set = set(clusters_to_separate)
host_members = {h: df[df[f.host] == h][f.indiv].tolist() for h in remaining}
anchors = self._find_cluster_anchors(host_members, cl_of, target_set)
demand = {r[f.indiv]: self._demand_for_container(r, t) for _, r in df.iterrows()}
peak = {cid: float(np.max(d)) for cid, d in demand.items()}
moves: list[dict[str, Any]] = []
self._pull_members_to_anchors(
anchors, cl_of, df, remaining, host_members, demand, peak, moves
)
self._push_foreign_from_anchors(
anchors, target_set, cl_of, host_members, df, remaining, demand, peak, moves
)
return moves
def _find_cluster_anchors(
self, host_members: dict[Any, list[Any]], cl_of: dict[Any, int], target_set: set
) -> dict[int, Any]:
"""Find anchor host for each target cluster (host with most members)."""
anchors: dict[int, Any] = {}
for c in target_set:
best_h, best_cnt = None, -1
for h, members in host_members.items():
cnt = sum(1 for cid in members if cl_of.get(cid, -1) == c)
if cnt > best_cnt:
best_cnt, best_h = cnt, h
if best_h is not None:
anchors[c] = best_h
return anchors
def _pull_members_to_anchors(
self,
anchors: dict[int, Any],
cl_of: dict[Any, int],
df: pd.DataFrame,
remaining: dict[Any, np.ndarray],
host_members: dict[Any, list[Any]],
demand: dict[Any, np.ndarray],
peak: dict[Any, float],
moves: list,
) -> None:
"""Pull minority members to their cluster anchor."""
f = self._f
for c, anchor in anchors.items():
members_c = [cid for cid, lab in cl_of.items() if lab == c]
members_c.sort(key=lambda x: peak.get(x, 0.0), reverse=True)
for cid in members_c:
cur = _safe_first(df.loc[df[f.indiv] == cid][f.host].to_numpy())
if cur == anchor:
continue
d = demand[cid]
if _fits(remaining[anchor], d):
moves.append({"container": cid, "src": cur, "dst": anchor})
remaining[cur] = remaining[cur] + d
remaining[anchor] = remaining[anchor] - d
host_members[cur].remove(cid)
host_members[anchor].append(cid)
df.loc[df[f.indiv] == cid, f.host] = anchor
def _push_foreign_from_anchors(
self,
anchors: dict[int, Any],
target_set: set,
cl_of: dict[Any, int],
host_members: dict[Any, list[Any]],
df: pd.DataFrame,
remaining: dict[Any, np.ndarray],
demand: dict[Any, np.ndarray],
peak: dict[Any, float],
moves: list,
) -> None:
"""Push foreign target cluster members out of anchors."""
f = self._f
anchor_values = set(anchors.values())
for c, anchor in anchors.items():
foreign = self._find_foreign_members(anchor, c, host_members, cl_of, target_set, peak)
for cid in foreign:
d = demand[cid]
best_dst = self._find_best_destination_opposite(anchor, anchor_values, remaining, d)
if best_dst is None:
continue
moves.append({"container": cid, "src": anchor, "dst": best_dst})
remaining[anchor] = remaining[anchor] + d
remaining[best_dst] = remaining[best_dst] - d
host_members[anchor].remove(cid)
host_members[best_dst].append(cid)
df.loc[df[self._f.indiv] == cid, f.host] = best_dst
def _find_foreign_members(
self,
anchor: Any,
cluster: int,
host_members: dict[Any, list[Any]],
cl_of: dict[Any, int],
target_set: set,
peak: dict[Any, float],
) -> list:
"""Find and sort foreign target cluster members on an anchor host."""
foreign = [
cid
for cid in list(host_members[anchor])
if cl_of.get(cid, -1) in target_set and cl_of.get(cid, -1) != cluster
]
foreign.sort(key=lambda x: peak.get(x, 0.0), reverse=True)
return foreign
def _find_best_destination_opposite(
self, anchor: Any, anchor_values: set, remaining: dict[Any, np.ndarray], d: np.ndarray
) -> Any | None:
"""Find best destination host avoiding anchors."""
best_dst, best_slack = None, None
for h, rem in remaining.items():
if h == anchor or h in anchor_values:
continue
if _fits(rem, d):
slack = float(np.min(rem - d))
if best_slack is None or slack > best_slack:
best_slack, best_dst = slack, h
return best_dst
[docs]
def find_substitution(self, *args, **kwargs) -> list[dict[str, Any]]:
"""Find a beneficial two-way swap between hosts."""
df_indiv = kwargs.get("df_indiv", getattr(self.instance, "df_indiv", None))
df_host_meta = kwargs.get("df_host_meta", getattr(self.instance, "df_host_meta", None))
if df_indiv is None or df_host_meta is None:
return []
t = self._time_horizon(df_indiv, df_host_meta, None)
remaining = self._host_remaining_over_horizon(df_host_meta, df_indiv)
f = self._f
indiv_dem = {r[f.indiv]: self._demand_for_container(r, t) for _, r in df_indiv.iterrows()}
by_host = {
h: self.instance.df_indiv[self.instance.df_indiv[f.host] == h] for h in remaining
}
best_swap = self._find_best_swap(by_host, remaining, indiv_dem, f)
if not best_swap:
return []
return self._create_swap_moves(best_swap)
def _find_best_swap(
self, by_host: dict, remaining: dict[Any, np.ndarray], indiv_dem: dict, f: _Fields
) -> tuple[Any, Any, Any, Any] | None:
"""Find the best container swap that improves worst slack."""
best_gain, best_swap = 0.0, None
for ha, rows_a in by_host.items():
if rows_a.empty:
continue
for hb, rows_b in by_host.items():
if hb == ha or rows_b.empty:
continue
base_slack = self._worst_slack(ha, hb, remaining)
swap = self._find_best_pair_swap(
rows_a, rows_b, ha, hb, indiv_dem, remaining, base_slack, f
)
if swap and swap[0] > best_gain:
best_gain, best_swap = swap[0], swap[1:]
return best_swap
def _find_best_pair_swap(
self,
rows_a: pd.DataFrame,
rows_b: pd.DataFrame,
ha: Any,
hb: Any,
indiv_dem: dict,
remaining: dict[Any, np.ndarray],
base_slack: float,
f: _Fields,
) -> tuple[float, Any, Any, Any, Any] | None:
"""Find best swap between two specific hosts."""
best_gain, best = 0.0, None
for _, ra in rows_a.iterrows():
da = indiv_dem[ra[f.indiv]]
for _, rb in rows_b.iterrows():
db = indiv_dem[rb[f.indiv]]
gain = self._evaluate_swap(ha, hb, da, db, remaining, base_slack)
if gain > best_gain:
best_gain = gain
best = (best_gain, ra[f.indiv], ha, rb[f.indiv], hb)
return best
def _evaluate_swap(
self,
ha: Any,
hb: Any,
da: np.ndarray,
db: np.ndarray,
remaining: dict[Any, np.ndarray],
base_slack: float,
) -> float:
"""Evaluate the gain from swapping two containers."""
raft_a = remaining[ha] + da - db
raft_b = remaining[hb] + db - da
if np.any(raft_a < 0) or np.any(raft_b < 0):
return 0.0
return float(min(np.min(raft_a), np.min(raft_b))) - base_slack
def _worst_slack(self, a: Any, b: Any, rem: dict[Any, np.ndarray]) -> float:
"""Calculate worst slack between two hosts."""
return float(min(np.min(rem[a]), np.min(rem[b])))
def _create_swap_moves(self, swap: tuple[Any, Any, Any, Any]) -> list[dict[str, Any]]:
"""Create move list from swap tuple."""
a_c, ha, b_c, hb = swap
return [
{"container": a_c, "src": ha, "dst": hb},
{"container": b_c, "src": hb, "dst": ha},
]
# ----- Graph helpers ------------------------------------------------------
[docs]
def build_place_adj_matrix(self, df_indiv: pd.DataFrame, id_map: dict[int, Any]) -> np.ndarray:
"""Build adjacency matrix where A[i,j] = 1 if containers i and j share host."""
f = self._f
hosts = {}
for cid, i in id_map.items():
row = df_indiv[df_indiv[f.indiv] == cid]
hosts[i] = _safe_first(row[f.host].to_numpy())
n = len(id_map)
a = np.zeros((n, n), dtype=int)
for i in range(n):
for j in range(i + 1, n):
if hosts.get(i) == hosts.get(j) and hosts.get(i) is not None:
a[i, j] = 1
a[j, i] = 1
return a