Source code for hots.main

"""
Entry point of hots module through ``hots path [OPTIONS]``.

    - path is the folder where we find the files
        x container_usage.csv : describes container resource consumption
        x node_meta.csv : describes nodes capacities
        (x node_usage.csv : describes nodes resource consumption)
    - type 'hots --help' for options description

The entire methodology is called from here (initialization, clustering,
allocation, evaluation, access to optimization model...).
"""

import logging
import math
import time

import click

from clusopt_core.cluster import Streamkm

from matplotlib import pyplot as plt

import numpy as np

import pandas as pd

# Personnal imports
# from . import allocation as alloc
from . import clustering as clt
from . import container as ctnr
from . import init as it
from . import model as mdl
from . import node
from . import placement as place
from . import plot
from .instance import Instance


@click.command()
@click.argument('path', required=True, type=click.Path(exists=True))
@click.option('-k', required=False, type=int, help='Number of clusters')
@click.option('-t', '--tau', required=False, type=int, help='Time window size')
@click.option('-m', '--method', required=False, type=str, default='loop', help='Method used')
@click.option('-c', '--cluster_method', required=False, type=str, default='loop-cluster',
              help='Method used for updating clustering')
@click.option('-p', '--param', required=False, type=str, help='Use a specific parameter file')
@click.option('-o', '--output', required=False, type=str,
              help='Use a specific directory for output')
@click.option('-ec', '--tolclust', required=False, type=str,
              help='Use specific value for epsilonC (clustering conflict threshold)')
@click.option('-ea', '--tolplace', required=False, type=str,
              help='Use specific value for epsilonA (placement conflict threshold)')
def main(path, k, tau, method, cluster_method, param, output, tolclust, tolplace):
    """Use method to propose a placement solution for micro-services adjusted in time.

    :param path: path to folder with data and parameters file
    :type path: click.Path
    :param k: number of clusters to use for clustering
    :type k: int
    :param tau: time window size for the loop
    :type tau: int
    :param method: method to use to solve initial problem
    :type method: str
    :param cluster_method: method to use to update clustering
    :type cluster_method: str
    :param param: parameter file to use
    :type param: str
    :param output: output folder to use
    :type output: str
    :param tolclust: threshold to use for clustering conflict
    :type tolclust: str
    :param tolplace: threshold to use for placement conflict
    :type tolplace: str
    """
    # Initialization part
    main_time = time.time()

    if not path[-1] == '/':
        path += '/'

    start = time.time()
    (config, output_path, my_instance) = preprocess(
        path, k, tau, method, cluster_method, param, output, tolclust, tolplace
    )
    add_time(-1, 'preprocess', (time.time() - start))

    # Plot initial data
    if False:
        indivs_cons = ctnr.plot_all_data_all_containers(
            my_instance.df_indiv, sep_time=my_instance.sep_time)
        indivs_cons.savefig(path + '/indivs_cons.svg')
        node_evo_fig = plot.plot_containers_groupby_nodes(
            my_instance.df_indiv,
            my_instance.df_host_meta[it.metrics[0]].max(),
            my_instance.sep_time,
            title='Initial Node consumption')
        node_evo_fig.savefig(path + '/init_node_plot.svg')
    total_method_time = time.time()

    # Analysis period
    start = time.time()
    (my_instance, df_host_evo,
     df_indiv_clust, labels_) = analysis_period(
        my_instance, config, method
    )
    add_time(-1, 'total_t_obs', (time.time() - start))

    # Run period
    start = time.time()
    (df_host_evo, nb_overloads) = run_period(
        my_instance, df_host_evo,
        df_indiv_clust, labels_,
        config, output_path, method, cluster_method
    )
    add_time(-1, 'total_t_run', (time.time() - start))
    total_method_time = time.time() - total_method_time

    # Print objectives of evaluation part
    (obj_nodes, obj_delta) = mdl.get_obj_value_host(df_host_evo)
    it.results_file.write('Number of nodes : %d, Ampli max : %f\n' % (
        obj_nodes, obj_delta))
    it.results_file.write('Number of overloads : %d\n' % nb_overloads)
    it.results_file.write('Total execution time : %f s\n\n' % (total_method_time))

    it.clustering_file.write('\nFinal k : %d' % my_instance.nb_clusters)

    # Save evaluation results in files
    node_results = node.get_nodes_load_info(
        df_host_evo, my_instance.df_host_meta)
    it.global_results = pd.DataFrame([{
        'c1': obj_nodes,
        'c2': obj_delta,
        'c3': node_results['load_var'].mean(),
        'c4': nb_overloads
    }])

    # Plot nodes consumption
    # TODO plot from df_host_evo
    # node_evo_fig = plot.plot_containers_groupby_nodes(
    #     my_instance.df_indiv,
    #     my_instance.df_host_meta[it.metrics[0]].max(),
    #     my_instance.sep_time,
    #     title='Initial Node consumption')
    # node_evo_fig.savefig(output_path + '/init_node_plot.svg')

    # Plot clustering & allocation for 1st part
    # plot_before_loop = False
    # if plot_before_loop:
    #     spec_containers = False
    #     if spec_containers:
    #         ctnr.show_specific_containers(working_df_indiv, df_indiv_clust,
    #                                       my_instance, labels_)
    #     show_clustering = True
    #     if show_clustering:
    #         working_df_indiv = my_instance.df_indiv.loc[
    #             my_instance.df_indiv[it.tick_field] <= my_instance.sep_time
    #         ]
    #         clust_node_fig = plot.plot_clustering_containers_by_node(
    #             working_df_indiv, my_instance.dict_id_c,
    #             labels_, filter_big=True)
    #         clust_node_fig.savefig(output_path + '/clust_node_plot.svg')
    #         first_clust_fig = plot.plot_clustering(df_indiv_clust, my_instance.dict_id_c,
    #                                                title='Clustering on first half part')
    #         first_clust_fig.savefig(output_path + '/first_clust_plot.svg')

    # Test allocation use case
    # TODO specific window not taken into account
    # if config['allocation']['enable']:
    #     logging.info('Performing allocation ... \n')
    #     print(alloc.check_constraints(
    #         my_instance, working_df_indiv, config['allocation']))
    # else:
    #     logging.info('We do not perform allocation \n')

    main_time = time.time() - main_time
    add_time(-1, 'total_time', main_time)
    node.plot_data_all_nodes(
        df_host_evo, it.metrics[0],
        my_instance.df_host_meta[it.metrics[0]].max(),
        my_instance.sep_time).savefig(
        output_path + '/node_usage_evo.svg')
    df_host_evo.to_csv(output_path + '/node_usage_evo.csv', index=False)
    it.global_results.to_csv(output_path + '/global_results.csv', index=False)
    node_results.to_csv(output_path + '/node_results.csv')
    it.loop_results.to_csv(output_path + '/loop_results.csv', index=False)
    it.times_df.to_csv(output_path + '/times.csv', index=False)
    it.results_file.write('\nTotal computing time : %f s' % (main_time))
    close_files()


[docs]def preprocess( path, k, tau, method, cluster_method, param, output, tolclust, tolplace ): """Load configuration, data and initialize needed objects. :param path: initial folder path :type path: str :param k: number of clusters to use for clustering :type k: int :param tau: time window size for loop :type tau: int :param method: method to use to solve initial problem :type method: str :param cluster_method: method to use to update clustering :type cluster_method: str :param param: parameter file to use :type param: str :param output: output folder to use :type output: str :param tolclust: threshold to use for clustering conflict :type tolclust: float :param tolplace: threshold to use for placement conflict :type tolplace: float :return: tuple with needed parameters, path to output and Instance object :rtype: Tuple[Dict, str, Instance] """ # TODO what if we want tick < tau ? print('Preprocessing ...') (config, output_path) = it.read_params(path, k, tau, method, cluster_method, param, output) config = spec_params(config, [tolclust, tolplace]) logging.basicConfig(filename=output_path + '/logs.log', filemode='w', format='%(message)s', level=logging.INFO) plt.style.use('bmh') # Init containers & nodes data, then Instance logging.info('Loading data and creating Instance (Instance information are in results file)\n') instance = Instance(path, config) it.results_file.write('Method used : %s\n' % method) instance.print_times(config['loop']['tick']) return (config, output_path, instance)
# TODO use Dict instead of List for genericity ?
[docs]def spec_params(config, list_params): """Define specific parameters. :param config: initial parameters set :type config: Dict :param list_params: parameters list for threshold :type list_params: List :return: final parameters set :rtype: Dict """ if list_params[0] is not None: config['loop']['tol_dual_clust'] = float(list_params[0]) if list_params[1] is not None: config['loop']['tol_dual_place'] = float(list_params[1]) return config
[docs]def analysis_period(my_instance, config, method): """Perform all needed process during analysis period (T_init). :param my_instance: Instance object :type my_instance: Instance :param config: parameters set :type config: Dict :param method: method used to solve initial problem :type method: str :return: tuple with Instance object, nodes data with new placement, clustering data and clustering labels :rtype: Tuple[Instance, pd.DataFrame, pd.DataFrame, List] """ df_host_evo = pd.DataFrame(columns=my_instance.df_host.columns) df_indiv_clust = pd.DataFrame() labels_ = [] if method == 'init': return (my_instance, my_instance.df_host, df_indiv_clust, labels_) elif method in ['heur', 'loop']: # Compute starting point n_iter = math.floor(( my_instance.sep_time + 1 - my_instance.df_host[it.tick_field].min() ) / my_instance.window_duration) start_point = ( my_instance.sep_time - n_iter * my_instance.window_duration ) + 1 end_point = ( start_point + my_instance.window_duration - 1 ) working_df_indiv = my_instance.df_indiv[ (my_instance. df_indiv[it.tick_field] >= start_point) & ( my_instance.df_indiv[it.tick_field] <= end_point) ] # First clustering part logging.info('Starting first clustering ...') print('Starting first clustering ...') start = time.time() (df_indiv_clust, my_instance.dict_id_c) = clt.build_matrix_indiv_attr( working_df_indiv) labels_ = clt.perform_clustering( df_indiv_clust, config['clustering']['algo'], my_instance.nb_clusters) df_indiv_clust['cluster'] = labels_ my_instance.nb_clusters = labels_.max() + 1 # TODO improve this part (distance...) cluster_profiles = clt.get_cluster_mean_profile( df_indiv_clust) cluster_vars = clt.get_cluster_variance(cluster_profiles) cluster_var_matrix = clt.get_sum_cluster_variance( cluster_profiles, cluster_vars) it.results_file.write('\nClustering computing time : %f s\n\n' % (time.time() - start)) print('\nClustering computing time : %f s\n\n' % (time.time() - start)) add_time(-1, 'clustering_0', (time.time() - start)) n_iter = n_iter - 1 # Loop for incrementing clustering (during analysis) # TODO finish # while n_iter > 0: # start_point = end_point + 1 # end_point = ( # start_point + my_instance.window_duration - 1 # ) # working_df_indiv = my_instance.df_indiv[ # (my_instance. # df_indiv[it.tick_field] >= start_point) & ( # my_instance.df_indiv[it.tick_field] <= end_point)] # evaluate clustering # (dv, nb_clust_changes_loop, # clustering_dual_values) = eval_clustering( # my_instance, working_df_indiv, # w, u, clustering_dual_values, constraints_dual, # tol_clust, tol_move_clust, # df_clust, cluster_profiles, labels_) # n_iter = n_iter - 1 # First placement part if config['placement']['enable']: logging.info('Performing placement ... \n') start = time.time() place.allocation_distant_pairwise( my_instance, cluster_var_matrix, labels_) print('Placement heuristic time : %f s' % (time.time() - start)) add_time(-1, 'placement_0', (time.time() - start)) else: logging.info('We do not perform placement \n') elif method == 'spread': place.allocation_spread(my_instance, my_instance.nb_nodes) elif method == 'iter-consol': place.allocation_spread(my_instance) df_host_evo = my_instance.df_host.loc[ my_instance.df_host[it.tick_field] <= my_instance.sep_time ] return (my_instance, df_host_evo, df_indiv_clust, labels_)
[docs]def run_period( my_instance, df_host_evo, df_indiv_clust, labels_, config, output_path, method, cluster_method ): """Perform all needed process during evaluation period. :param my_instance: Instance object :type my_instance: Instance :param df_host_evo: evolving node data :type df_host_evo: pd.DataFrame :param df_indiv_clust: clustering individuals data :type df_indiv_clust: pd.DataFrame :param labels_: clustering labels :type labels_: List :param config: set parameters :type config: Dict :param output_path: output folder path :type output_path: str :param method: method to use to solve problem :type method: str :param cluster_method: method to use for clustering :type cluster_method: str :return: evolving node data and number of nodes overloads :rtype: Tuple[pd.DataFrame, int] """ nb_overloads = 0 # Loops for evaluation if method in ['loop']: # loop 'streaming' progress it.results_file.write('\n### Loop process ###\n') (fig_node, fig_clust, fig_mean_clust, df_host_evo, nb_overloads) = streaming_eval( my_instance, df_indiv_clust, labels_, config['loop']['mode'], config['loop']['tick'], config['loop']['constraints_dual'], config['loop']['tol_dual_clust'], config['loop']['tol_move_clust'], config['loop']['tol_open_clust'], config['loop']['tol_dual_place'], config['loop']['tol_move_place'], config['loop']['tol_step'], cluster_method, config['optimization']['solver'], df_host_evo) fig_node.savefig(output_path + '/node_evo_plot.svg') fig_clust.savefig(output_path + '/clust_evo_plot.svg') fig_mean_clust.savefig(output_path + '/mean_clust_evo_plot.svg') elif method in ['heur', 'spread', 'iter-consol']: # TODO adapt after 'progress_time_noloop' changed (temp_df_host, nb_overloads, _, _, _) = progress_time_noloop( my_instance, 'local', my_instance.sep_time, my_instance.df_indiv[it.tick_field].max(), labels_, 0, config['loop']['constraints_dual'], {}, {}, config['loop']['tol_dual_clust'], config['loop']['tol_move_clust'], config['loop']['tol_dual_place'], config['loop']['tol_move_place'] ) df_host_evo = pd.concat([ df_host_evo, temp_df_host[~temp_df_host[it.tick_field].isin( df_host_evo[it.tick_field].unique())] ]) return (df_host_evo, nb_overloads)
[docs]def streaming_eval( my_instance, df_indiv_clust, labels_, mode, tick, constraints_dual, tol_clust, tol_move_clust, tol_open_clust, tol_place, tol_move_place, tol_step, cluster_method, solver, df_host_evo ): """Define the streaming process for evaluation. :param my_instance: Instance object :type my_instance: Instance :param df_indiv_clust: clustering individual data :type df_indiv_clust: pd.DataFrame :param labels_: clustering labels :type labels_: List :param mode: mode used to trigger the loop :type mode: str :param tick: number of new datapoints before new loop :type tick: int :param constraints_dual: constraints type compared to trigger conflicts :type constraints_dual: List :param tol_clust: threshold used for clustering conflicts :type tol_clust: float :param tol_move_clust: threshold used for number of clustering moves :type tol_move_clust: float :param tol_open_clust: threshold used for opening new cluster :type tol_open_clust: float :param tol_place: threshold used for placement conflicts :type tol_place: float :param tol_move_place: threshold used for number of placement moves :type tol_move_place: float :param tol_step: step to increment previous threshold :type tol_step: float :param cluster_method: method used to update clustering :type cluster_method: str :param solver: solver used for pyomo :type solver: str :param df_host_evo: evolving node data :type df_host_evo: pd.DataFrame :return: figures to plot to see clustering and nodes evolution + evolving node data + number of node overloads :rtype: Tuple[plt.Figure, plt.Figure, plt.Figure, List, pd.DataFrame, int] """ fig_node, ax_node = plot.init_nodes_plot( my_instance.df_indiv, my_instance.dict_id_n, my_instance.sep_time, my_instance.df_host_meta[it.metrics[0]].max() ) fig_clust, ax_clust = plot.init_plot_clustering( df_indiv_clust, my_instance.dict_id_c) cluster_profiles = clt.get_cluster_mean_profile(df_indiv_clust) fig_mean_clust, ax_mean_clust = plot.init_plot_cluster_profiles( cluster_profiles ) tmin = my_instance.sep_time - (my_instance.window_duration - 1) tmax = my_instance.sep_time total_loop_time = 0.0 loop_nb = 1 nb_clust_changes = 0 nb_place_changes = 0 total_nb_overload = 0 end = False it.results_file.write('Loop mode : %s\n' % mode) logging.info('Beginning the loop process ...\n') (working_df_indiv, df_clust, w, u, v) = build_matrices( my_instance, tmin, tmax, labels_ ) start = time.time() (clust_model, place_model, clustering_dual_values, placement_dual_values) = pre_loop( my_instance, working_df_indiv, df_clust, w, u, constraints_dual, v, cluster_method, solver ) add_time(0, 'total_loop', (time.time() - start)) if mode == 'event': tmin = my_instance.sep_time tmax = my_instance.df_indiv[it.tick_field].max() else: tmax += tick tmin = tmax - (my_instance.window_duration - 1) # TODO improve model builds while not end: loop_time = time.time() logging.info('\n # Enter loop number %d #\n' % loop_nb) it.results_file.write('\n # Loop number %d #\n' % loop_nb) it.optim_file.write('\n # Enter loop number %d #\n' % loop_nb) print('\n # Enter loop number %d #\n' % loop_nb) # TODO not fully tested (replace containers) (temp_df_host, nb_overload, loop_nb, nb_clust_changes_loop, nb_place_changes_loop) = progress_time_noloop( my_instance, 'local', tmin, tmax, labels_, loop_nb, constraints_dual, clustering_dual_values, placement_dual_values, tol_clust, tol_move_clust, tol_place, tol_move_place) df_host_evo = pd.concat([ df_host_evo, temp_df_host[~temp_df_host[it.tick_field].isin( df_host_evo[it.tick_field].unique())] ]) total_nb_overload += nb_overload nb_clust_changes += nb_clust_changes_loop nb_place_changes += nb_place_changes_loop if mode == 'event': (temp_df_host, nb_overload, loop_nb, nb_clust_changes, nb_place_changes) = progress_time_noloop( my_instance, 'loop', tmin, tmax, labels_, loop_nb, constraints_dual, clustering_dual_values, placement_dual_values, tol_clust, tol_move_clust, tol_place, tol_move_place) df_host_evo = pd.concat([ df_host_evo, temp_df_host[~temp_df_host[it.tick_field].isin( df_host_evo[it.tick_field].unique())] ]) total_nb_overload += nb_overload start = time.time() (working_df_indiv, df_clust, w, u, v) = build_matrices( my_instance, tmin, tmax, labels_ ) add_time(loop_nb, 'build_matrices', (time.time() - start)) nb_clust_changes_loop = 0 nb_place_changes_loop = 0 (init_loop_obj_nodes, init_loop_obj_delta) = mdl.get_obj_value_indivs( working_df_indiv) # TODO not very practical # plot.plot_clustering_containers_by_node( # working_df_indiv, my_instance.dict_id_c, labels_) (nb_clust_changes_loop, nb_place_changes_loop, init_loop_silhouette, end_loop_silhouette, clust_conf_nodes, clust_conf_edges, clust_max_deg, clust_mean_deg, place_conf_nodes, place_conf_edges, place_max_deg, place_mean_deg, clust_model, place_model, clustering_dual_values, placement_dual_values, df_clust, cluster_profiles, labels_) = eval_sols( my_instance, working_df_indiv, cluster_method, w, u, v, clust_model, place_model, constraints_dual, clustering_dual_values, placement_dual_values, tol_clust, tol_move_clust, tol_open_clust, tol_place, tol_move_place, df_clust, cluster_profiles, labels_, loop_nb, solver ) it.results_file.write('Number of changes in clustering : %d\n' % nb_clust_changes_loop) it.results_file.write('Number of changes in placement : %d\n' % nb_place_changes_loop) nb_clust_changes += nb_clust_changes_loop nb_place_changes += nb_place_changes_loop # update clustering & node consumption plot plot.update_clustering_plot( fig_clust, ax_clust, df_clust, my_instance.dict_id_c) plot.update_cluster_profiles(fig_mean_clust, ax_mean_clust, cluster_profiles, sorted(working_df_indiv[it.tick_field].unique())) plot.update_nodes_plot(fig_node, ax_node, working_df_indiv, my_instance.dict_id_n) working_df_indiv = my_instance.df_indiv[ (my_instance. df_indiv[it.tick_field] >= tmin) & ( my_instance.df_indiv[it.tick_field] <= tmax)] working_df_host = working_df_indiv.groupby( [working_df_indiv[it.tick_field], it.host_field], as_index=False).agg(it.dict_agg_metrics) if loop_nb > 1: df_host_evo = pd.concat( [ df_host_evo, working_df_host[~working_df_host[it.tick_field].isin( df_host_evo[it.tick_field].unique())] ] ) loop_time = (time.time() - loop_time) (end_loop_obj_nodes, end_loop_obj_delta) = mdl.get_obj_value_host( working_df_host) it.results_file.write('Loop delta before changes : %f\n' % init_loop_obj_delta) it.results_file.write('Loop delta after changes : %f\n' % end_loop_obj_delta) it.results_file.write('Loop time : %f s\n' % loop_time) add_time(loop_nb, 'total_loop', loop_time) total_loop_time += loop_time # Save loop indicators in df it.loop_results = pd.concat( [ it.loop_results, pd.DataFrame.from_records([{ 'num_loop': int(loop_nb), 'init_silhouette': init_loop_silhouette, 'init_delta': init_loop_obj_delta, 'clust_conf_nodes': clust_conf_nodes, 'clust_conf_edges': clust_conf_edges, 'clust_max_deg': clust_max_deg, 'clust_mean_deg': clust_mean_deg, 'clust_changes': int(nb_clust_changes_loop), 'place_conf_nodes': place_conf_nodes, 'place_conf_edges': place_conf_edges, 'place_max_deg': place_max_deg, 'place_mean_deg': place_mean_deg, 'place_changes': int(nb_place_changes_loop), 'end_silhouette': end_loop_silhouette, 'end_delta': end_loop_obj_delta, 'loop_time': loop_time }]) ] ) # input('\nPress any key to progress in time ...\n') tmax += tick tmin = tmax - (my_instance.window_duration - 1) if tol_clust < 1.0: tol_clust += tol_step if tol_place < 1.0: tol_place += tol_step if tmax >= my_instance.time: end = True else: loop_nb += 1 working_df_indiv = my_instance.df_indiv[ (my_instance. df_indiv[it.tick_field] >= tmin)] if tmin < working_df_indiv[it.tick_field].max(): # update clustering & node consumption plot # TODO not same size issue with clustering # print(cluster_profiles) # plot.update_clustering_plot( # fig_clust, ax_clust, df_clust, my_instance.dict_id_c) # plot.update_cluster_profiles(fig_mean_clust, ax_mean_clust, cluster_profiles, # sorted(working_df_indiv[it.tick_field].unique())) plot.update_nodes_plot(fig_node, ax_node, working_df_indiv, my_instance.dict_id_n) df_host_evo = end_loop(working_df_indiv, tmin, nb_clust_changes, nb_place_changes, total_nb_overload, total_loop_time, loop_nb, df_host_evo) return (fig_node, fig_clust, fig_mean_clust, df_host_evo, total_nb_overload)
# return (fig_node, fig_clust, fig_mean_clust, # [nb_clust_changes, nb_place_changes, total_loop_time, total_loop_time / loop_nb], # df_host_evo, total_nb_overload)
[docs]def progress_time_noloop( instance, fixing, tmin, tmax, labels_, loop_nb, constraints_dual, clustering_dual_values, placement_dual_values, tol_clust, tol_move_clust, tol_place, tol_move_place ): """We progress in time without performing the loop, checking node capacities. :param instance: Instance object :type instance: Instance :param fixing: method used to fix node assignment if overload :type fixing: str :param tmin: starting time of current window :type tmin: int :param tmax: final time of current window :type tmax: int :param labels_: clustering labels :type labels_: List :param loop_nb: current loop number :type loop_nb: int :param constraints_dual: constraints type compared to trigger conflicts :type constraints_dual: List :param clustering_dual_values: previous loop dual values for clustering :type clustering_dual_values: Dict :param placement_dual_values: previous loop dual values for placement :type placement_dual_values: Dict :param tol_clust: threshold used for clustering conflicts :type tol_clust: float :param tol_move_clust: threshold used for number of clustering moves :type tol_move_clust: float :param tol_place: threshold used for placement conflicts :type tol_place: float :param tol_move_place: threshold used for number of placement moves :type tol_move_place: float :return: evolving node data + number of overloads + loop number + clustering and placement changes :rtype: Tuple[pd.DataFrame, int, int, int, int] """ df_host_evo = pd.DataFrame(columns=instance.df_host.columns) nb_overload = 0 nb_clust_changes = 0 nb_place_changes = 0 for tick in range(tmin, tmax + 1): df_host_tick = instance.df_indiv.loc[ instance.df_indiv[it.tick_field] == tick ].groupby( [instance.df_indiv[it.tick_field], it.host_field], as_index=False).agg(it.dict_agg_metrics) host_overload = node.check_capacities(df_host_tick, instance.df_host_meta) df_host_tick[it.tick_field] = tick df_host_evo = pd.concat([ df_host_evo, df_host_tick ]) if len(host_overload) > 0: print('Overload : We must move containers') nb_overload += len(host_overload) if fixing == 'local': place.free_full_nodes(instance, host_overload, tick) elif fixing == 'loop': working_df_indiv = instance.df_indiv[ (instance. df_indiv[it.tick_field] >= tick - instance.window_duration) & ( instance.df_indiv[it.tick_field] <= tick)] (df_clust, instance.dict_id_c) = clt.build_matrix_indiv_attr( working_df_indiv) w = clt.build_similarity_matrix(df_clust) df_clust['cluster'] = labels_ u = clt.build_adjacency_matrix(labels_) v = place.build_placement_adj_matrix( working_df_indiv, instance.dict_id_c) cluster_profiles = clt.get_cluster_mean_profile( df_clust) cluster_vars = clt.get_cluster_variance(cluster_profiles) cluster_var_matrix = clt.get_sum_cluster_variance( cluster_profiles, cluster_vars) dv = ctnr.build_var_delta_matrix_cluster( df_clust, cluster_var_matrix, instance.dict_id_c) (nb_clust_changes_loop, nb_place_changes_loop, clustering_dual_values, placement_dual_values, df_clust, cluster_profiles, labels_) = eval_sols( instance, working_df_indiv, w, u, v, dv, constraints_dual, clustering_dual_values, placement_dual_values, tol_clust, tol_move_clust, tol_place, tol_move_place, df_clust, cluster_profiles, labels_ ) if nb_place_changes_loop < 1: place.free_full_nodes(instance, host_overload, tick) loop_nb += 1 nb_clust_changes += nb_clust_changes_loop nb_place_changes += nb_place_changes_loop return (df_host_evo, nb_overload, loop_nb, nb_clust_changes, nb_place_changes)
[docs]def pre_loop( my_instance, working_df_indiv, df_clust, w, u, constraints_dual, v, cluster_method, solver ): """Build optimization problems and solve them with T_init solutions. :param my_instance: Instance object :type my_instance: Instance :param working_df_indiv: current loop data :type working_df_indiv: pd.DataFrame :param df_clust: clustering related data :type df_clust: pd.DataFrame :param w: dissimilarity matrix :type w: np.array :param u: clustering adjacency matrix :type u: np.array :param constraints_dual: constraints type compared to trigger conflicts :type constraints_dual: List :param v: placement adjacency matrix :type v: np.array :param cluster_method: method used to update clustering :type cluster_method: str :param solver: solver used for pyomo :type solver: str :return: optimization models and associated dual values :rtype: Tuple[mdl.Model, mdl.Model, Dict, Dict] """ logging.info('Evaluation of problems with initial solutions') print('Building clustering model ...') start = time.time() clust_model = mdl.Model( 1, working_df_indiv, my_instance.dict_id_c, nb_clusters=my_instance.nb_clusters, w=w, sol_u=u ) add_time(0, 'build_clustering_model', (time.time() - start)) start = time.time() print('Solving first clustering ...') logging.info('# Clustering evaluation #') logging.info('Solving linear relaxation ...') add_time(0, 'solve_clustering_model', (time.time() - start)) logging.info('Clustering problem not evaluated yet\n') print('\n ## Pyomo solve ## \n\n') clust_model.solve(solver) clustering_dual_values = mdl.fill_dual_values(clust_model) if cluster_method == 'stream-km': it.streamkm_model = Streamkm( coresetsize=my_instance.nb_containers, length=my_instance.time * my_instance.nb_containers, seed=my_instance.nb_clusters, ) it.streamkm_model.partial_fit(df_clust.drop('cluster', axis=1)) _, labels_ = it.streamkm_model.get_final_clusters( my_instance.nb_clusters, seed=my_instance.nb_clusters) df_clust['cluster'] = labels_ # TODO improve this part (distance...) cluster_profiles = clt.get_cluster_mean_profile( df_clust) cluster_vars = clt.get_cluster_variance(cluster_profiles) cluster_var_matrix = clt.get_sum_cluster_variance( cluster_profiles, cluster_vars) dv = ctnr.build_var_delta_matrix_cluster( df_clust, cluster_var_matrix, my_instance.dict_id_c) # evaluate placement logging.info('# Placement evaluation #') print('Building placement model ...') start = time.time() place_model = mdl.Model( 2, working_df_indiv, my_instance.dict_id_c, my_instance.dict_id_n, my_instance.df_host_meta, dv=dv, sol_u=u, sol_v=v ) add_time(0, 'build_placement_model', (time.time() - start)) start = time.time() print('Solving first placement ...') logging.info('Placement problem not evaluated yet\n') add_time(0, 'solve_placement_model', (time.time() - start)) print('\n ## Pyomo solve ## \n\n') place_model.solve(solver) placement_dual_values = mdl.fill_dual_values(place_model) return (clust_model, place_model, clustering_dual_values, placement_dual_values)
[docs]def build_matrices(my_instance, tmin, tmax, labels_): """Build period dataframe and matrices to be used. :param my_instance: Instance object :type my_instance: Instance :param tmin: starting time of current window :type tmin: int :param tmax: final time of current window :type tmax: int :param labels_: clustering labels :type labels_: List :return: current loop data, clustering data, dissimilarity matrix and adjacency matrices :rtype: Tuple[pd.DataFrame, pd.DataFrame, np.array, np.array, np.array] """ working_df_indiv = my_instance.df_indiv[ (my_instance. df_indiv[it.tick_field] >= tmin) & ( my_instance.df_indiv[it.tick_field] <= tmax)] (df_clust, my_instance.dict_id_c) = clt.build_matrix_indiv_attr( working_df_indiv) w = clt.build_similarity_matrix(df_clust) df_clust['cluster'] = labels_ u = clt.build_adjacency_matrix(labels_) v = place.build_placement_adj_matrix( working_df_indiv, my_instance.dict_id_c) return (working_df_indiv, df_clust, w, u, v)
[docs]def eval_sols( my_instance, working_df_indiv, cluster_method, w, u, v, clust_model, place_model, constraints_dual, clustering_dual_values, placement_dual_values, tol_clust, tol_move_clust, tol_open_clust, tol_place, tol_move_place, df_clust, cluster_profiles, labels_, loop_nb, solver ): """Evaluate clustering and placement solutions. :param my_instance: Instance object :type my_instance: Instance :param working_df_indiv: current loop data :type working_df_indiv: pd.DataFrame :param cluster_method: method used to update clustering :type cluster_method: str :param w: dissimilarity matrix :type w: np.array :param u: clustering adjacency matrix :type u: np.array :param v: placement adjacency matrix :type v: np.array :param clust_model: clustering optimization model :type clust_model: mdl.Model :param place_model: placement optimization model :type place_model: mdl.Model :param constraints_dual: constraints type compared to trigger conflicts :type constraints_dual: List :param clustering_dual_values: previous loop dual values for clustering :type clustering_dual_values: Dict :param placement_dual_values: previous loop dual values for placement :type placement_dual_values: Dict :param tol_clust: threshold used for clustering conflicts :type tol_clust: float :param tol_move_clust: threshold used for number of clustering moves :type tol_move_clust: float :param tol_open_clust: threshold used for opening new cluster :type tol_open_clust: float :param tol_place: threshold used for placement conflicts :type tol_place: float :param tol_move_place: threshold used for number of placement moves :type tol_move_place: float :param df_clust: clustering related data :type df_clust: pd.DataFrame :param cluster_profiles: computed clusters mean profiles :type cluster_profiles: np.array :param labels_: cluster labels :type labels_: List :param loop_nb: current loop number :type loop_nb: int :param solver: solver used for pyomo :type solver: str :return: number of changes in clustering and placement, silhouette score before and after loop, number of nodes and edges in conflict graph + max and mean degree (clustering and placement), optimization models and associated dual values, clustering related data, clusters mean profiles and cluster labels :rtype: Tuple[ int, int, float, float, int, int, float, float, int, int, float, float, mdl.Model, mdl.Model, Dict, Dict, pd.DataFrame, np.array, List ] """ # evaluate clustering start = time.time() it.clustering_file.write( 'labels before change\n' ) it.clustering_file.write( np.array2string(labels_, separator=',') ) it.clustering_file.write('\n') if cluster_method == 'loop-cluster': (dv, nb_clust_changes_loop, init_loop_silhouette, end_loop_silhouette, clustering_dual_values, clust_model, clust_conf_nodes, clust_conf_edges, clust_max_deg, clust_mean_deg) = eval_clustering( my_instance, w, u, clust_model, clustering_dual_values, constraints_dual, tol_clust, tol_move_clust, tol_open_clust, df_clust, cluster_profiles, labels_, loop_nb, solver) elif cluster_method == 'kmeans-scratch': (dv, nb_clust_changes_loop, init_loop_silhouette, end_loop_silhouette, clustering_dual_values, clust_model, clust_conf_nodes, clust_conf_edges, clust_max_deg, clust_mean_deg) = loop_kmeans( my_instance, df_clust, labels_ ) elif cluster_method == 'stream-km': (dv, nb_clust_changes_loop, init_loop_silhouette, end_loop_silhouette, clustering_dual_values, clust_model, clust_conf_nodes, clust_conf_edges, clust_max_deg, clust_mean_deg) = stream_km( my_instance, df_clust, labels_ ) it.clustering_file.write( 'labels after change\n' ) it.clustering_file.write( np.array2string(labels_, separator=',') ) it.clustering_file.write('\n') add_time(loop_nb, 'loop-clustering', (time.time() - start)) it.clustering_file.write( 'Loop clustering time : %f s\n' % (time.time() - start)) # evaluate placement start = time.time() (nb_place_changes_loop, placement_dual_values, place_model, place_conf_nodes, place_conf_edges, place_max_deg, place_mean_deg) = eval_placement( my_instance, working_df_indiv, w, u, v, dv, placement_dual_values, constraints_dual, place_model, tol_place, tol_move_place, nb_clust_changes_loop, loop_nb, solver ) add_time(loop_nb, 'loop_placement', (time.time() - start)) return ( nb_clust_changes_loop, nb_place_changes_loop, init_loop_silhouette, end_loop_silhouette, clust_conf_nodes, clust_conf_edges, clust_max_deg, clust_mean_deg, place_conf_nodes, place_conf_edges, place_max_deg, place_mean_deg, clust_model, place_model, clustering_dual_values, placement_dual_values, df_clust, cluster_profiles, labels_ )
# TODO update return type
[docs]def eval_clustering( my_instance, w, u, clust_model, clustering_dual_values, constraints_dual, tol_clust, tol_move_clust, tol_open_clust, df_clust, cluster_profiles, labels_, loop_nb, solver ): """Evaluate current clustering solution and update it if needed. :param my_instance: Instance object :type my_instance: Instance :param w: dissimilarity matrix :type w: np.array :param u: clustering adjacency matrix :type u: np.array :param clust_model: clustering optimization model :type clust_model: mdl.Model :param clustering_dual_values: previous loop dual values for clustering :type clustering_dual_values: Dict :param constraints_dual: constraints type compared to trigger conflicts :type constraints_dual: List :param tol_clust: threshold used for clustering conflicts :type tol_clust: float :param tol_move_clust: threshold used for number of clustering moves :type tol_move_clust: float :param tol_open_clust: threshold used for opening new cluster :type tol_open_clust: float :param df_clust: clustering related data :type df_clust: pd.DataFrame :param cluster_profiles: computed clusters mean profiles :type cluster_profiles: np.array :param labels_: cluster labels :type labels_: List :param loop_nb: current loop number :type loop_nb: int :param solver: solver used for pyomo :type solver: str :return: clustering variance matrix, number of changes in clustering, silhouette score before and after the loop, clustering dual values, clustering optimization model, number of nodes and edges in conflict graph + max and mean degree (clustering) :rtype: Tuple[ np.array, int, float, float, Dict, mdl.Model, int, int, float, float ] """ nb_clust_changes_loop = 0 logging.info('# Clustering evaluation #') init_loop_silhouette = clt.get_silhouette(df_clust, labels_) start = time.time() clust_model.update_adjacency_clust_constraints(u) clust_model.update_obj_clustering(w) add_time(loop_nb, 'update_clustering_model', (time.time() - start)) logging.info('Solving clustering linear relaxation ...') start = time.time() clust_model.solve(solver) add_time(loop_nb, 'solve_clustering', (time.time() - start)) # print('Init clustering lp solution : ', clust_model.relax_mdl.objective_value) logging.info('Checking for changes in clustering dual values ...') start = time.time() (moving_containers, clust_conflict_nodes, clust_conflict_edges, clust_max_deg, clust_mean_deg) = mdl.get_moving_containers_clust( clust_model, clustering_dual_values, tol_clust, tol_move_clust, my_instance.nb_containers, my_instance.dict_id_c, df_clust, cluster_profiles) add_time(loop_nb, 'get_moves_clustering', (time.time() - start)) if len(moving_containers) > 0: logging.info('Changing clustering ...') start = time.time() ics = 0.0 (df_clust, labels_, nb_clust_changes_loop) = clt.change_clustering( moving_containers, df_clust, labels_, my_instance.dict_id_c, tol_open_clust * ics) u = clt.build_adjacency_matrix(labels_) add_time(loop_nb, 'move_clustering', (time.time() - start)) clust_model.update_adjacency_clust_constraints(u) logging.info('Solving linear relaxation after changes ...') start = time.time() clust_model.solve(solver) clustering_dual_values = mdl.fill_dual_values(clust_model) add_time(loop_nb, 'solve_new_clustering', (time.time() - start)) else: logging.info('Clustering seems still right ...') it.results_file.write('Clustering seems still right ...') # TODO improve this part (distance...) # TODO factorize # Compute new clusters profiles cluster_profiles = clt.get_cluster_mean_profile( df_clust) cluster_vars = clt.get_cluster_variance(cluster_profiles) cluster_var_matrix = clt.get_sum_cluster_variance( cluster_profiles, cluster_vars) dv = ctnr.build_var_delta_matrix_cluster( df_clust, cluster_var_matrix, my_instance.dict_id_c) end_loop_silhouette = clt.get_silhouette(df_clust, labels_) return (dv, nb_clust_changes_loop, init_loop_silhouette, end_loop_silhouette, clustering_dual_values, clust_model, clust_conflict_nodes, clust_conflict_edges, clust_max_deg, clust_mean_deg)
[docs]def eval_placement( my_instance, working_df_indiv, w, u, v, dv, placement_dual_values, constraints_dual, place_model, tol_place, tol_move_place, nb_clust_changes_loop, loop_nb, solver ): """Evaluate current clustering solution and update it if needed. :param my_instance: Instance object :type my_instance: Instance :param working_df_indiv: current loop data :type working_df_indiv: pd.DataFrame :param w: dissimilarity matrix :type w: np.array :param u: clustering adjacency matrix :type u: np.array :param v: placement adjacency matrix :type v: np.array :param dv: clustering variance matrix :type dv: np.array :param placement_dual_values: previous loop dual values for placement :type placement_dual_values: Dict :param constraints_dual: constraints type compared to trigger conflicts :type constraints_dual: List :param place_model: placement optimization model :type place_model: mdl.Model :param tol_place: threshold used for placement conflicts :type tol_place: float :param tol_move_place: threshold used for number of placement moves :type tol_move_place: float :param nb_clust_changes_loop: number of changes in clustering in current loop :type nb_clust_changes_loop: int :param loop_nb: current loop number :type loop_nb: int :param solver: solver used for pyomo :type solver: str :return: number of placement changes, dual values related to current placement, placement optimization model, number of nodes and edges in conflict graph + max and mean degree :rtype: Tuple[int, Dict, mdl.Model, int, int, float, float] """ logging.info('# Placement evaluation #') start = time.time() # TODO update without re-creating from scratch ? Study place_model = mdl.Model( 2, working_df_indiv, my_instance.dict_id_c, my_instance.dict_id_n, my_instance.df_host_meta, dv=dv, sol_u=u, sol_v=v ) add_time(loop_nb, 'update_placement_model', (time.time() - start)) it.optim_file.write('solve without any change\n') start = time.time() place_model.solve(solver) add_time(loop_nb, 'solve_placement', (time.time() - start)) # print('Init placement lp solution : ', place_model.relax_mdl.objective_value) moving_containers = [] nb_place_changes_loop = 0 place_conf_nodes = 0 place_conf_edges = 0 place_max_deg = 0 place_mean_deg = 0 if nb_clust_changes_loop > 0: logging.info('Checking for changes in placement dual values ...') start = time.time() (moving_containers, place_conf_nodes, place_conf_edges, place_max_deg, place_mean_deg) = mdl.get_moving_containers_place( place_model, placement_dual_values, tol_place, tol_move_place, my_instance.nb_containers, working_df_indiv, my_instance.dict_id_c) add_time(loop_nb, 'get_moves_placement', (time.time() - start)) if len(moving_containers) > 0: nb_place_changes_loop = len(moving_containers) start = time.time() place.move_list_containers(moving_containers, my_instance, working_df_indiv[it.tick_field].min(), working_df_indiv[it.tick_field].max()) add_time(loop_nb, 'move_placement', (time.time() - start)) v = place.build_placement_adj_matrix( working_df_indiv, my_instance.dict_id_c) start = time.time() place_model.update_adjacency_place_constraints(v) place_model.update_obj_place(dv) place_model.solve(solver) # print('After changes placement lp solution : ', place_model.relax_mdl.objective_value) placement_dual_values = mdl.fill_dual_values(place_model) add_time(loop_nb, 'solve_new_placement', (time.time() - start)) else: logging.info('No container to move : we do nothing ...\n') return (nb_place_changes_loop, placement_dual_values, place_model, place_conf_nodes, place_conf_edges, place_max_deg, place_mean_deg)
[docs]def loop_kmeans(my_instance, df_clust, labels_): """Update clustering via kmeans from scratch. :param my_instance: Instance object :type my_instance: Instance :param df_clust: clustering related data :type df_clust: pd.DataFrame :param labels_: clustering labels :type labels_: List :return: clustering variance matrix, number of changes in clustering, silhouette score before and after loop, null objects to match other return objects :rtype: Tuple[ np.array, int, float, float, Dict, mdl.Model, int, int, float, float ] """ logging.info('# Clustering via k-means from scratch #') init_loop_silhouette = clt.get_silhouette(df_clust, labels_) new_labels_ = clt.perform_clustering( df_clust.drop('cluster', axis=1), 'kmeans', my_instance.nb_clusters) nb_clust_changes_loop = len( [i for i, j in zip(labels_, new_labels_) if i != j]) labels_ = new_labels_ df_clust['cluster'] = labels_ # TODO factorize # Compute new clusters profiles cluster_profiles = clt.get_cluster_mean_profile( df_clust) cluster_vars = clt.get_cluster_variance(cluster_profiles) cluster_var_matrix = clt.get_sum_cluster_variance( cluster_profiles, cluster_vars) dv = ctnr.build_var_delta_matrix_cluster( df_clust, cluster_var_matrix, my_instance.dict_id_c) end_loop_silhouette = clt.get_silhouette(df_clust, labels_) return (dv, nb_clust_changes_loop, init_loop_silhouette, end_loop_silhouette, {}, None, 0, 0, 0, 0)
[docs]def stream_km(my_instance, df_clust, labels_): """Update clustering via kmeans from scratch. :param my_instance: Instance object :type my_instance: Instance :param df_clust: clustering related data :type df_clust: pd.DataFrame :param labels_: clustering labels :type labels_: List :return: clustering variance matrix, number of changes in clustering, silhouette score before and after loop, null objects to match other return objects :rtype: Tuple[ np.array, int, float, float, Dict, mdl.Model, int, int, float, float ] """ logging.info('# Clustering via streamkm #') init_loop_silhouette = clt.get_silhouette(df_clust, labels_) it.streamkm_model.partial_fit(df_clust.drop('cluster', axis=1)) _, new_labels_ = it.streamkm_model.get_final_clusters( my_instance.nb_clusters, seed=my_instance.nb_clusters) nb_clust_changes_loop = len( [i for i, j in zip(labels_, new_labels_) if i != j]) labels_ = new_labels_ df_clust['cluster'] = labels_ # TODO factorize # Compute new clusters profiles cluster_profiles = clt.get_cluster_mean_profile( df_clust) cluster_vars = clt.get_cluster_variance(cluster_profiles) cluster_var_matrix = clt.get_sum_cluster_variance( cluster_profiles, cluster_vars) dv = ctnr.build_var_delta_matrix_cluster( df_clust, cluster_var_matrix, my_instance.dict_id_c) end_loop_silhouette = clt.get_silhouette(df_clust, labels_) return (dv, nb_clust_changes_loop, init_loop_silhouette, end_loop_silhouette, {}, None, 0, 0, 0, 0)
[docs]def end_loop( working_df_indiv, tmin, nb_clust_changes, nb_place_changes, nb_overload, total_loop_time, loop_nb, df_host_evo ): """Perform all stuffs after last loop. :param working_df_indiv: current loop data :type working_df_indiv: pd.DataFrame :param tmin: starting time of current window :type tmin: int :param nb_clust_changes: number of changes in clustering :type nb_clust_changes: int :param nb_place_changes: number of changes in placement :type nb_place_changes: int :param nb_overload: number of node overload :type nb_overload: int :param total_loop_time: total running time for all loops :type total_loop_time: float :param loop_nb: final loop number :type loop_nb: int :param df_host_evo: evolving node data :type df_host_evo: pd.DataFrame :return: evolving node data :rtype: pd.DataFrame """ working_df_host = working_df_indiv.groupby( [working_df_indiv[it.tick_field], it.host_field], as_index=False).agg(it.dict_agg_metrics) (end_loop_obj_nodes, end_loop_obj_delta) = mdl.get_obj_value_host( working_df_host) it.results_file.write('Final loop delta : %f\n' % end_loop_obj_delta) it.results_file.write('\n### Results of loops ###\n') it.results_file.write('Total number of changes in clustering : %d\n' % nb_clust_changes) it.results_file.write('Total number of changes in placement : %d\n' % nb_place_changes) it.results_file.write('Total number of overload : %d\n' % nb_overload) it.results_file.write('Average loop time : %f s\n' % (total_loop_time / loop_nb)) if loop_nb <= 1: df_host_evo = working_df_indiv.groupby( [working_df_indiv[it.tick_field], it.host_field], as_index=False).agg(it.dict_agg_metrics) else: df_host_evo = pd.concat( [ df_host_evo, working_df_host[~working_df_host[it.tick_field].isin( df_host_evo[it.tick_field].unique())] ] ) return df_host_evo
[docs]def add_time(loop_nb, action, time): """Add an action time in times dataframe. :param loop_nb: current loop number :type loop_nb: int :param action: action / method we add time for :type action: str :param time: running time of the action :type time: float """ it.times_df = pd.concat( [ it.times_df, pd.DataFrame.from_records([{ 'num_loop': loop_nb, 'action': action, 'time': time }]) ] )
[docs]def close_files(): """Write the final files and close all open files.""" it.results_file.close()
if __name__ == '__main__': main()