# hots/core/app.py
"""HOTS – application main process."""
import importlib
import logging
import time
from pathlib import Path
import numpy as np
import pandas as pd
from hots.config.loader import AppConfig
from hots.core.instance import Instance
from hots.evaluation.evaluator import EvalSnapshot, eval_solutions
from hots.plugins import (
ClusteringFactory,
ConnectorFactory,
OptimizationFactory,
)
from hots.plugins.clustering.builder import (
build_post_clust_matrices,
build_pre_clust_matrices,
)
from hots.utils.signals import setup_signal_handlers
from hots.visualization.plot import plot_containers_groupby_nodes
[docs]
class App:
"""Application entry point for HOTS."""
def __init__(self, config: AppConfig):
"""Initialize the App with a given configuration."""
self.config = config
self.instance = Instance(config)
self.run_id = time.strftime("%Y%m%d-%H%M%S")
# Factories
self.connector = ConnectorFactory.create(config.connector, self.instance)
self.instance._load_initial_data(self.connector)
self.clustering = ClusteringFactory.create(config.clustering, self.instance)
self.clust_opt = OptimizationFactory.create(config.optimization, self.instance, pb_number=1)
self.problem_opt = OptimizationFactory.create(
config.optimization, self.instance, pb_number=2
)
# dynamically load the problem plugin (e.g. 'placement')
problem_type = config.problem.type.lower()
module_path = f"hots.plugins.problem.{problem_type}"
cls_name = f"{problem_type.title()}Plugin"
mod = importlib.import_module(module_path)
problem_cls = getattr(mod, cls_name)
self.problem = problem_cls(self.instance)
# State that persists across iterations
self.prev_snapshot: EvalSnapshot | None = None
self.results_file: Path = config.reporting.metrics_file
setup_signal_handlers(self.shutdown)
# -------------------------
# Main run loop
# -------------------------
[docs]
def run(self):
"""Run the initial evaluation and streaming update loop."""
t_start = time.time()
end_time = 0
if self.config.time_limit is None:
end_time = float("inf")
else:
end_time = t_start + self.config.time_limit
logging.info("Starting HOTS run – preprocessing")
if self.config.connector.type.lower() == "kafka":
# Clear any residual offsets/state
self.instance.clear_kafka_topics()
# Initial window
n_clusters, n_initial_moves = self._initial_phase()
self.pre_loop()
# record a minimal “initial” metric
self.instance.metrics_history.append(
{
"initial_clusters": n_clusters,
"initial_moves": n_initial_moves,
}
)
# Streaming loop
self._streaming_loop(end_time=end_time)
self._finalize_run_kpis()
self._persist_metrics()
t_total = time.time() - t_start
logging.info("Finished HOTS run in %.3f seconds", t_total)
def _initial_phase(self) -> tuple[int, int]:
"""Run initial clustering + optional first placement."""
logging.info("Starting HOTS run – preprocessing")
# Clear any residual offsets/state
self.instance.clear_kafka_topics()
# ===== Initial window =====
logging.info("Analysis period (initial window)")
labels = np.asarray(self.clustering.fit(self.instance.df_indiv))
n_clusters = int(len(np.unique(labels)))
logging.info("Initial clustering produced %d clusters", n_clusters)
if self.config.problem.parameters.get("initial_placement", True):
logging.info("Running first placement heuristic")
initial_moves = self.problem.initial(
labels,
self.instance.df_indiv,
self.instance.df_host,
)
self.connector.apply_moves(
initial_moves, self.config.connector.parameters.get("sep_time")
)
logging.info("Applied initial placement moves")
logging.info("First placement produced %d moves", len(initial_moves))
n_initial_moves = len(initial_moves)
else:
logging.info("Skipping first placement (keeping existing)")
n_initial_moves = 0
_, _ = plot_containers_groupby_nodes(
self.instance.df_indiv,
viz=self.config.viz,
max_cap=self.instance.df_meta["cpu"].max(),
sep_time=self.config.connector.parameters.get("sep_time"),
fields=self.instance.fields,
)
return n_clusters, n_initial_moves
[docs]
def pre_loop(self):
"""Build and solve optimization models before performing streaming loop."""
logging.info("Building first optimization models...")
cfg = self.instance.config.connector.parameters
# Clustering model
build_pre_clust_matrices(
self.instance.df_indiv,
cfg.get("tick_field"),
cfg.get("individual_field"),
cfg.get("metrics"),
self.instance.get_id_map(),
self.clustering,
)
self.clust_opt.build(u_mat=self.clustering.u_mat, w_mat=self.clustering.w_mat)
self.clust_opt.solve()
self.clust_opt.fill_dual_values()
# Problem model
v_mat = self.problem.build_place_adj_matrix(
self.instance.df_indiv, self.instance.get_id_map()
)
dv_mat = build_post_clust_matrices(self.clustering.clust_mat)
self.problem_opt.build(u_mat=self.clustering.u_mat, v_mat=v_mat, dv_mat=dv_mat)
self.problem_opt.solve()
self.problem_opt.fill_dual_values()
def _streaming_loop(self, *, end_time: float) -> None:
"""Streaming update loop."""
tick_field = self.config.connector.parameters.get("tick_field")
window_duration = self.config.connector.parameters.get("window_duration")
# Initial streaming bounds are derived from the last tick in the initial df
tmax = self.instance.df_indiv[tick_field].max()
tmax += self.connector.tick_increment
tmin = tmax - (window_duration - 1)
loop_nb = 1
while True:
if time.time() >= end_time:
break
logging.info("Starting loop #%d", loop_nb)
df_new = self.connector.load_next()
if df_new is None:
break
# Update ingestion state
self.instance.update_data(df_new)
working_df = self.instance.get_working_df(tmin, tmax)
# Evaluate new solution + metrics
moves, metrics, self.prev_snapshot = eval_solutions(
self.instance,
self.clustering,
self.clust_opt,
self.problem_opt,
self.problem,
working_df,
prev_snapshot=self.prev_snapshot,
)
metrics.update(
{
"run_id": self.run_id,
"loop_nb": loop_nb,
"tick_start": tmin,
"tick_end": tmax,
"window_duration": window_duration,
}
)
self.instance.metrics_history.append(metrics)
logging.info(
"Loop %d moves: %d containers",
loop_nb,
len(metrics["moving_containers"]),
)
self.connector.apply_moves(moves, tmax)
self.instance.metrics_history.append(metrics)
# Advance window
tmax += self.connector.tick_increment
tmin = tmax - (window_duration - 1)
loop_nb += 1
def _finalize_run_kpis(self) -> None:
df = pd.DataFrame(self.instance.metrics_history)
if df.empty:
return
run_kpis = {
"run_id": self.run_id,
"total_steps": int(len(df)),
"total_moves": float(
df.get("moves__moves_count", pd.Series(dtype=float)).fillna(0).sum()
)
if "moves__moves_count" in df.columns
else 0.0,
"avg_moved_ratio": float(df.get("plc__moved_ratio", pd.Series(dtype=float)).mean())
if "plc__moved_ratio" in df.columns
else 0.0,
"max_conf_density": float(df.get("place_conf__density", pd.Series(dtype=float)).max())
if "place_conf__density" in df.columns
else 0.0,
}
self.instance.run_kpis = run_kpis
def _persist_metrics(self):
out = pd.DataFrame(self.instance.metrics_history)
if out.empty:
return
self.results_file.parent.mkdir(parents=True, exist_ok=True)
out.to_csv(self.results_file, index=False)
# Try parquet
parquet_path = self.results_file.with_suffix(".parquet")
try:
out.to_parquet(parquet_path, index=False)
logging.info("Step metrics written to %s", parquet_path)
except ImportError as e:
logging.info(
"Parquet support not available (%s). Skipping parquet export, CSV written to %s",
e.__class__.__name__,
self.results_file,
)
# Run-level KPIs
if self.instance.run_kpis:
run_path = self.results_file.with_name(self.results_file.stem + "_run.json")
pd.Series(self.instance.run_kpis).to_json(run_path)
logging.info("Run KPIs written to %s", run_path)
[docs]
def shutdown(self, signum=None, frame=None):
"""Handle shutdown signals gracefully."""
logging.info("Shutting down...")
try:
self._persist_metrics()
finally:
raise SystemExit(0)