Source code for library.phases.runners.modelling.utils.states.modelling_runner_states_in


from library.phases.runners.modelling.utils.states.modelling_runner_states_base import ModellingRunnerStates
from library.pipeline.pipeline_manager import PipelineManager
from skopt.space import Real

from sklearn.ensemble import StackingClassifier
from sklearn.linear_model import LogisticRegression


[docs] class InTuningRunner(ModellingRunnerStates): def __init__(self, pipeline_manager: PipelineManager, save_plots: bool = False, save_path: str = None): super().__init__(pipeline_manager, save_plots, save_path) def _general_analysis(self): # Evaluating and storing models comments = self.pipeline_manager.variables["modelling_runner"]["model_assesment"]["comments"] self.pipeline_manager.all_pipelines_execute(methodName="modelling.evaluate_and_store_models", exclude_category="baseline", comments=comments, current_phase=self.pipeline_manager.pipeline_state) # Cross model comparison self.pipeline_manager.pipelines_analysis.plot_cross_model_comparison( save_plots=self.save_plots, save_path=self.save_path) # Time based model performance metrics_df = self.pipeline_manager.pipelines_analysis.plot_results_df(metrics=self.pipeline_manager.variables["modelling_runner"]["model_assesment"]["results_df_metrics"], save_plots=self.save_plots, save_path=self.save_path) # Results summary self.pipeline_manager.pipelines_analysis.plot_results_summary(training_metric=self.pipeline_manager.variables["modelling_runner"]["model_assesment"]["results_summary"]["training_metric"], performance_metric=self.pipeline_manager.variables["modelling_runner"]["model_assesment"]["results_summary"]["performance_metric"], save_plots=self.save_plots, save_path=self.save_path) # Intra model comparison self.pipeline_manager.pipelines_analysis.plot_intra_model_comparison( save_plots=self.save_plots, save_path=self.save_path) # Residual analyisis residuals, confusion_matrices = self.pipeline_manager.pipelines_analysis.plot_confusion_matrix(save_plots=self.save_plots, save_path=self.save_path) # Plot residuals self.pipeline_manager.pipelines_analysis.plot_residuals(save_plots=self.save_plots, save_path=self.save_path) return metrics_df.to_dict(), residuals, confusion_matrices def _get_grid_space(self): # Ensembled models gradient_boosting_grid = { 'learning_rate': self.pipeline_manager.variables["modelling_runner"]["hyperparameters"]["grid_space"]["gradient_boosting"]["learning_rate"], 'subsample': self.pipeline_manager.variables["modelling_runner"]["hyperparameters"]["grid_space"]["gradient_boosting"]["subsample"], 'n_estimators': self.pipeline_manager.variables["modelling_runner"]["hyperparameters"]["grid_space"]["gradient_boosting"]["n_estimators"], 'max_depth': self.pipeline_manager.variables["modelling_runner"]["hyperparameters"]["grid_space"]["gradient_boosting"]["max_depth"], 'min_samples_split': self.pipeline_manager.variables["modelling_runner"]["hyperparameters"]["grid_space"]["gradient_boosting"]["min_samples_split"], 'min_samples_leaf': self.pipeline_manager.variables["modelling_runner"]["hyperparameters"]["grid_space"]["gradient_boosting"]["min_samples_leaf"] } random_forest_grid = { 'n_estimators': self.pipeline_manager.variables["modelling_runner"]["hyperparameters"]["grid_space"]["random_forest"]["n_estimators"], 'max_depth': self.pipeline_manager.variables["modelling_runner"]["hyperparameters"]["grid_space"]["random_forest"]["max_depth"], 'min_samples_split': self.pipeline_manager.variables["modelling_runner"]["hyperparameters"]["grid_space"]["random_forest"]["min_samples_split"], 'min_samples_leaf': self.pipeline_manager.variables["modelling_runner"]["hyperparameters"]["grid_space"]["random_forest"]["min_samples_leaf"] } # Tree-based models decision_tree_grid = { 'criterion': self.pipeline_manager.variables["modelling_runner"]["hyperparameters"]["grid_space"]["decision_tree"]["criterion"], 'max_depth': self.pipeline_manager.variables["modelling_runner"]["hyperparameters"]["grid_space"]["decision_tree"]["max_depth"], 'min_samples_split': self.pipeline_manager.variables["modelling_runner"]["hyperparameters"]["grid_space"]["decision_tree"]["min_samples_split"], 'min_samples_leaf': self.pipeline_manager.variables["modelling_runner"]["hyperparameters"]["grid_space"]["decision_tree"]["min_samples_leaf"], 'max_features': self.pipeline_manager.variables["modelling_runner"]["hyperparameters"]["grid_space"]["decision_tree"]["max_features"], 'ccp_alpha': self.pipeline_manager.variables["modelling_runner"]["hyperparameters"]["grid_space"]["decision_tree"]["ccp_alpha"] } # Support Vector Machines models (not doing it cause it goes too slow and underperforms) # Naiva Bayes model naive_bayes_grid = { # has to be hard-coded => Real datatype is not supported 'var_smoothing': Real(1e-12, 1e-6, prior='log-uniform') } # Feed Forward Neural Network model (hard-coded) # Stacking stacking_grid = { 'final_estimator__C': self.pipeline_manager.variables["modelling_runner"]["hyperparameters"]["grid_space"]["stacking"]["final_estimator__C"], 'final_estimator__penalty': self.pipeline_manager.variables["modelling_runner"]["hyperparameters"]["grid_space"]["stacking"]["final_estimator__penalty"], 'final_estimator__solver': self.pipeline_manager.variables["modelling_runner"]["hyperparameters"]["grid_space"]["stacking"]["final_estimator__solver"], 'passthrough': self.pipeline_manager.variables["modelling_runner"]["hyperparameters"]["grid_space"]["stacking"]["passthrough"] } return gradient_boosting_grid, random_forest_grid, decision_tree_grid, naive_bayes_grid, stacking_grid def _get_grid_search_params(self): gradient_boosting_grid, random_forest_grid, decision_tree_grid, naive_bayes_grid, stacking_grid = self._get_grid_space() modelNameToOptimizer = { "Gradient Boosting": { "optimizer_type": "bayes", "param_grid": gradient_boosting_grid, "max_iter": self.pipeline_manager.variables["modelling_runner"]["hyperparameters"]["tuner_params"]["max_iter"] }, "Random Forest": { "optimizer_type": "bayes", "param_grid": random_forest_grid, "max_iter": self.pipeline_manager.variables["modelling_runner"]["hyperparameters"]["tuner_params"]["max_iter"] }, "Decision Tree": { "optimizer_type": "bayes", "param_grid": decision_tree_grid, "max_iter": self.pipeline_manager.variables["modelling_runner"]["hyperparameters"]["tuner_params"]["max_iter"] }, "Naive Bayes": { "optimizer_type": "bayes", "param_grid": naive_bayes_grid, "max_iter": self.pipeline_manager.variables["modelling_runner"]["hyperparameters"]["tuner_params"]["max_iter"] }, "Feed Forward Neural Network": { "optimizer_type": "bayes_nn", "param_grid": None, # its hardcoded "max_iter": self.pipeline_manager.variables["modelling_runner"]["hyperparameters"]["tuner_params"]["max_iter"], "epochs": self.pipeline_manager.variables["modelling_runner"]["hyperparameters"]["tuner_params"]["epochs"] } } modelNameToOptimizerStacking = { "Stacking": { "optimizer_type": "bayes", "param_grid": stacking_grid, "max_iter": self.pipeline_manager.variables["modelling_runner"]["hyperparameters"]["tuner_params"]["max_iter"] } } return modelNameToOptimizer, modelNameToOptimizerStacking def _set_up_stacking_model(self, optimized_models: dict, modelNameToOptimizerStacking: dict): """ We have to get the base estimators. In this case there are the ones that were tuned """ estimators = [] for pipelineName, results in optimized_models["not_baseline"].items(): if isinstance(results, dict): # If the model was in pre-tuning but not in in-tuning the result for its pipeline is None, not a dict for modelName, modelObject in results.items(): estimators.append((modelName, modelObject)) #Stacking model stackingModel = StackingClassifier( estimators=estimators, final_estimator=LogisticRegression(), cv=5, verbose=3 ) self.pipeline_manager.pipelines["not_baseline"]["stacking"].modelling.list_of_models["Stacking"].tuning_states["in"].assesment["model_sklearn"] = stackingModel self.pipeline_manager.pipelines["not_baseline"]["stacking"].modelling.list_of_models["Stacking"].tuning_states["post"].assesment["model_sklearn"] = stackingModel self.pipeline_manager.pipelines["not_baseline"]["stacking"].modelling.list_of_models["Stacking"].tuning_states["in"].model_sklearn = stackingModel self.pipeline_manager.pipelines["not_baseline"]["stacking"].modelling.list_of_models["Stacking"].tuning_states["post"].model_sklearn = stackingModel all_pipelines_to_exclude = [] for pipelineName, pipelineObject in self.pipeline_manager.pipelines["not_baseline"].items(): if pipelineName == "stacking": continue all_pipelines_to_exclude.append(pipelineName) self.pipeline_manager.all_pipelines_execute(methodName="modelling.fit_models", current_phase=self.pipeline_manager.pipeline_state, modelNameToOptimizer=modelNameToOptimizerStacking )
[docs] def run(self): self.pipeline_manager.pipeline_state = "in" print("In tuning runner") # Fitting models modelNameToOptimizer, modelNameToOptimizerStacking = self._get_grid_search_params() optimized_models = self.pipeline_manager.all_pipelines_execute(methodName="modelling.fit_models", exclude_pipeline_names=["stacking"], current_phase=self.pipeline_manager.pipeline_state, modelNameToOptimizer=modelNameToOptimizer) if len(self.pipeline_manager.variables["modelling_runner"]["models_to_exclude"]["stacking"]) == 0: self._set_up_stacking_model(optimized_models, modelNameToOptimizerStacking) general_analysis_results = self._general_analysis() return general_analysis_results