Source code for hots.init

"""
Provide stuff for initialization step (load DataFrames,
global variables)
"""

import json
from pathlib import Path

import pandas as pd

# Functions definitions #


[docs]def build_df_from_containers(df_indiv): """Build the `df_host` from containers df. :param df_indiv: _description_ :type df_indiv: pd.DataFrame :return: _description_ :rtype: pd.DataFrame """ dict_agg = {} for metric in metrics: dict_agg[metric] = 'sum' df_host = df_indiv.groupby( [tick_field, host_field], as_index=False).agg(dict_agg) return df_host
[docs]def df_from_csv(file): """Load DataFrame from CSV file. :param file: _description_ :type file: Path :return: _description_ :rtype: pd.DataFrame """ return pd.read_csv( file, index_col=False)
# TODO check if files exist ?
[docs]def init_dfs(data): """Perform CSV files reading in data folder. :param data: _description_ :type data: str :return: _description_ :rtype: Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame] """ p_data = Path(data) if Path(p_data / 'node_usage.csv').is_file(): return (df_from_csv(p_data / 'container_usage.csv'), df_from_csv(p_data / 'node_usage.csv'), df_from_csv(p_data / 'node_meta.csv')) else: print('We need to build node usage from containers ...') df_indiv = df_from_csv(p_data / 'container_usage.csv') df_host = build_df_from_containers(df_indiv) df_host.to_csv(p_data / 'node_usage.csv', index=False) return (df_indiv, df_host, df_from_csv(p_data / 'node_meta.csv'))
[docs]def read_params( path, k, tau, method, cluster_method, param, output_path ): """Get parameters from file and build the Dict config object. :param path: _description_ :type path: str :param k: _description_ :type k: int :param tau: _description_ :type tau: int :param method: _description_ :type method: str :param cluster_method: _description_ :type cluster_method: str :param param: _description_ :type param: str :param output_path: _description_ :type output_path: str :raises ValueError: _description_ :raises ValueError: _description_ :return: _description_ :rtype: Dict """ p_path = Path(path) if param is not None: config_path = Path(param) elif Path(p_path / 'params.json').exists(): config_path = p_path / 'params.json' else: config_path = 'tests/params_default.json' with open(config_path, 'r') as f: config = json.load(f) if k is not None: config['clustering']['nb_clusters'] = k if tau is not None: config['analysis']['window_duration'] = tau config['loop']['tick'] = tau else: tau = config['analysis']['window_duration'] if output_path is not None: output_path = Path(output_path) else: output_path = Path( '%sk%d_tau%d_%s_%s' % ( path, config['clustering']['nb_clusters'], int(tau), method, cluster_method )) output_path.mkdir(parents=True, exist_ok=True) define_globals(output_path, config) if method not in methods: raise ValueError('Method %s is not accepted' % method) if cluster_method not in cluster_methods: raise ValueError('Updating clustering method %s is not accepted' % cluster_method) return (config, str(output_path))
[docs]def set_loop_results(): """Create the dataframe for loop results. :return: _description_ :rtype: pd.DataFrame """ return pd.DataFrame(columns=[ 'num_loop', 'init_silhouette', 'init_delta', 'clust_conf_nodes', 'clust_conf_edges', 'clust_max_deg', 'clust_mean_deg', 'clust_changes', 'place_conf_nodes', 'place_conf_edges', 'place_max_deg', 'place_mean_deg', 'place_changes', 'end_silhouette', 'end_delta', 'loop_time' ])
[docs]def set_times_df(): """Create the dataframe for times info. :return: _description_ :rtype: pd.DataFrame """ return pd.DataFrame(columns=[ 'num_loop', 'action', 'time' ])
[docs]def define_globals(p_path, config): """Define the fields, as global variables, from config. :param p_path: _description_ :type p_path: Path :param config: _description_ :type config: Dict """ global indiv_field global host_field global tick_field global metrics global methods global cluster_methods global global_results global loop_results global times_df # global node_results global results_file global additional_results_file global optim_file global clustering_file global dict_agg_metrics global renderer global streamkm_model indiv_field = config['data']['individual_field'] host_field = config['data']['host_field'] tick_field = config['data']['tick_field'] metrics = config['data']['metrics'] methods = ['init', 'spread', 'iter-consol', 'heur', 'loop'] cluster_methods = ['loop-cluster', 'kmeans-scratch', 'stream-km'] loop_results = set_loop_results() times_df = set_times_df() results_file = open(p_path / 'results.log', 'w') # additional_results_file = open(p_path / 'results.log', 'w') optim_file = open(p_path / 'optim_logs.log', 'w') clustering_file = open(p_path / 'clustering_logs.log', 'w') dict_agg_metrics = {} for metric in metrics: dict_agg_metrics[metric] = 'sum' renderer = config['plot']['renderer']