from library.phases.phases_implementation.modelling.results_analysis.results_df import ResultsDF
from library.phases.phases_implementation.modelling.shallow.model_definition.model_types.classifier import Classifier
from library.phases.phases_implementation.modelling.shallow.model_definition.model_types.regressor import Regressor
from library.phases.phases_implementation.modelling.shallow.model_definition.model_base import Model
from library.phases.phases_implementation.dataset.dataset import Dataset
from library.phases.phases_implementation.modelling.results_analysis.result_analysis import PreTuningResultAnalysis, InTuningResultAnalysis, PostTuningResultAnalysis
import concurrent.futures
import pandas as pd
import time
[docs]
class Modelling:
def __init__(self, dataset: Dataset, model_results_path: str):
self.results_df = ResultsDF(model_results_path, dataset)
self.list_of_models = {}
self.dataset = dataset
self._models_to_exclude = []
self.results_analysis = {
"pre": PreTuningResultAnalysis(phase_results_df= pd.DataFrame()),
"in": InTuningResultAnalysis(phase_results_df= pd.DataFrame()),
"post": PostTuningResultAnalysis(phase_results_df= pd.DataFrame()),
}
# 0) Attibutes logic
@property
def models_to_exclude(self):
return self._models_to_exclude
@models_to_exclude.setter
def models_to_exclude(self, value: list[str]):
for modelName in value:
assert modelName in self.list_of_models, f"Model {modelName} not found in list of models"
self._models_to_exclude = value
# 1) Adding models
[docs]
def add_model(self, model_name: str, model_sklearn: object, model_type: str = "classical"):
"""
Adds a new model to the list of models.
Parameters
----------
model_name : str
The name to assign to the new model.
model_sklearn : object
The sklearn model object to be added.
model_type : str, optional (default="classical")
The type of model being added. Must be one of:
- "classical"
- "neural_network"
- "stacking"
Notes
-----
Once a model is added, the dataset cannot be modified.
Raises
------
AssertionError
If `model_type` is not one of the accepted values.
"""
assert model_type in ["classical", "neural_network", "stacking"]
new_model = None
if self.dataset.modelTask == "classification":
new_model = Classifier(model_name, model_sklearn, model_type=model_type, results_header=self.results_df.header, dataset=self.dataset)
elif self.dataset.modelTask == "regression":
new_model = Regressor(model_name, model_sklearn, model_type=model_type, results_header=self.results_df.header, dataset=self.dataset)
self.list_of_models[model_name] = new_model
# 2) Fitting, predicting and optimizing models
def _fit_and_predict(self, modelName, modelObject: Model, current_phase: str):
"""
Fits the model and generates predictions for the specified phase.
Parameters
----------
modelName : str
The name of the model being fitted and predicted.
modelObject : Model
The model object to fit and predict.
current_phase : str
The current phase of the workflow (e.g., "pre", "in", or "post").
Returns
-------
tuple
A tuple containing the model name and the fitted model object.
"""
modelObject.fit(modelName=modelName, current_phase=current_phase)
modelObject.predict(modelName=modelName, current_phase=current_phase)
print(f"Fitted and predicted model {modelName}")
return modelName, modelObject
def _optimize_model(self,
modelName: str,
modelObject: Model,
current_phase: str,
optimization_params: dict):
"""
Optimizes the specified model during the 'in' phase using provided parameters.
Parameters
----------
modelName : str
The name of the model to optimize.
modelObject : Model
The model object to be optimized.
current_phase : str
The current phase of the workflow; must be "in" for optimization.
optimization_params : dict
A dictionary containing optimization parameters, such as:
- "optimizer_type": type of optimizer to use.
- "param_grid": parameter grid for hyperparameter tuning.
- "max_iter": maximum iterations for the optimizer.
- "epochs" (optional): number of epochs (only for neural networks).
Returns
-------
tuple
A tuple containing the model name and the optimized model object.
"""
assert current_phase == "in", "Optimize model can only be used in the 'in' phase"
modelObject.optimizer_type = optimization_params["optimizer_type"]
if modelObject.model_type == "neural_network":
epochs = optimization_params.get("epochs", None)
modelObject.fit(modelName=modelName, current_phase=current_phase,
param_grid=optimization_params["param_grid"],
max_iter=optimization_params["max_iter"],
optimizer_type=optimization_params["optimizer_type"],
model_object=modelObject,
epochs=epochs
)
else:
modelObject.fit(modelName=modelName, current_phase=current_phase,
param_grid=optimization_params["param_grid"],
max_iter=optimization_params["max_iter"],
optimizer_type=optimization_params["optimizer_type"],
model_object=modelObject
)
modelObject.predict(modelName=modelName, current_phase=current_phase)
print(f"Optimized model {modelName}")
# Setting the final model to be the tuned one
modelObject.tuning_states["post"].assesment["model_sklearn"] = modelObject.tuning_states["in"].assesment["model_sklearn"]
return modelName, modelObject
[docs]
def fit_models(self, current_phase: str, **kwargs):
"""
Fits or optimizes models depending on the current phase.
Parameters
----------
current_phase : str
Phase of operation: "pre", "in", or "post".
kwargs : dict
Additional arguments for optimization or fitting (e.g., modelNameToOptimizer).
Notes
-----
- Models are optimized in parallel, except "bayes_nn" models which run sequentially.
- Returns optimized models dictionary during the "in" phase, else None.
"""
assert current_phase in ["pre", "in", "post"], "Current phase must be one of the tuning states"
print(f"Gonna start fitting models in {current_phase} phase")
with concurrent.futures.ProcessPoolExecutor() as executor:
# Submit all model fitting tasks to the executor
if current_phase == "pre":
future_to_model = [executor.submit(self._fit_and_predict, modelName, modelObject, current_phase) for modelName, modelObject in self.list_of_models.items() if modelName not in self.models_to_exclude]
for future in concurrent.futures.as_completed(future_to_model):
modelName, model = future.result()
self.list_of_models[modelName] = model # update results
elif current_phase == "in":
modelNameToOptimizer = kwargs.get("modelNameToOptimizer", None)
assert modelNameToOptimizer is not None, "modelNameToOptimizer must be provided"
future_to_model = []
optimized_models = {}
# Separate models
bayes_nn_models = []
other_models = []
for modelName, optimization_params in modelNameToOptimizer.items():
if modelName not in list(self.list_of_models.keys()):
continue
if modelName in self.models_to_exclude:
continue
if optimization_params.get("optimizer_type") == "bayes_nn":
bayes_nn_models.append((modelName, optimization_params))
else:
other_models.append((modelName, optimization_params))
# Run non-bayes_nn models in process pool
for modelName, optimization_params in other_models:
print(f"Optimizing model {modelName}")
modelObject = self.list_of_models[modelName]
future = executor.submit(self._optimize_model, modelName, modelObject, current_phase, optimization_params)
future_to_model.append(future)
for future in concurrent.futures.as_completed(future_to_model):
modelName, modelObject = future.result()
self.list_of_models[modelName] = modelObject
optimized_models[modelName] = modelObject.tuning_states["in"].assesment["model_sklearn"]
# Run bayes_nn models sequentially (outside process pool)
for modelName, optimization_params in bayes_nn_models:
print(f"Optimizing bayes_nn model {modelName}")
modelObject = self.list_of_models[modelName]
# Direct call, not via executor
modelName, modelObject = self._optimize_model(modelName, modelObject, current_phase, optimization_params)
self.list_of_models[modelName] = modelObject
optimized_models[modelName] = modelObject.tuning_states["in"].assesment["model_sklearn"]
return optimized_models
elif current_phase == "post":
# Exclude neural-nets fro conccurent
best_model_name, baseline_model_name = kwargs.get("best_model_name", None), kwargs.get("baseline_model_name", None)
assert (best_model_name is not None) or (baseline_model_name is not None), "You must provide at least one of the best or baseline model"
future_to_model = []
if best_model_name:
if self.list_of_models[best_model_name].optimizer_type == "bayes_nn":
modelName, modelObject = self._fit_and_predict(best_model_name, self.list_of_models[best_model_name], current_phase)
self.list_of_models[best_model_name] = modelObject
else:
future = executor.submit(self._fit_and_predict, best_model_name, self.list_of_models[best_model_name], current_phase)
future_to_model.append(future)
if baseline_model_name:
future = executor.submit(self._fit_and_predict, baseline_model_name, self.list_of_models[baseline_model_name], current_phase)
future_to_model.append(future)
for future in concurrent.futures.as_completed(future_to_model):
modelName, modelObject = future.result()
self.list_of_models[modelName] = modelObject
# 3) Evaluating and store model results
def _evaluate_model(self, modelName, modelObject, current_phase: str):
print(f"Evaluating model {modelName}")
modelObject.evaluate(modelName=modelName, current_phase=current_phase)
return modelName, modelObject
[docs]
def evaluate_and_store_models(self, current_phase: str, **kwargs) -> pd.DataFrame | None:
"""
It asses each model and stores the results in the results_df.
Parameters
----------
comments: str
The comments to store in the results_df (and in disk)
current_phase: str
The current phase of the modelling
kwargs: dict
Additional keyword arguments defining phase-specific parameters
Returns
-------
pd.DataFrame or None
The results of the evaluation
"""
# Separate "bayes_nn" models from others. This is because bayes_nn cant use parallel processing (for some keras-specific reasons)
bayes_nn_models = []
other_models = []
if current_phase != "post":
# Split models based on optimizer type
for modelName, modelObject in self.list_of_models.items():
if modelName in self.models_to_exclude:
continue
if hasattr(modelObject, 'optimizer_type') and modelObject.optimizer_type == "bayes_nn":
bayes_nn_models.append((modelName, modelObject))
else:
other_models.append((modelName, modelObject))
else:
# Handle post-tuning phase
best_model_name = kwargs.get("best_model_name")
baseline_model_name = kwargs.get("baseline_model_name")
assert (best_model_name is not None) or (baseline_model_name is not None), \
"You must provide at least one of the best or baseline model"
# Check if best/baseline models are bayes_nn
if best_model_name:
model = self.list_of_models[best_model_name]
if hasattr(model, 'optimizer_type') and model.optimizer_type == "bayes_nn":
bayes_nn_models.append((best_model_name, model))
else:
other_models.append((best_model_name, model))
if baseline_model_name:
model = self.list_of_models[baseline_model_name]
if hasattr(model, 'optimizer_type') and model.optimizer_type == "bayes_nn":
bayes_nn_models.append((baseline_model_name, model))
else:
other_models.append((baseline_model_name, model))
# Process non-bayes_nn models in parallel
with concurrent.futures.ProcessPoolExecutor() as executor:
future_to_model = [
executor.submit(self._evaluate_model, modelName, modelObject, current_phase)
for modelName, modelObject in other_models
]
for future in concurrent.futures.as_completed(future_to_model):
modelName, modelObject = future.result()
self.list_of_models[modelName] = modelObject
# Process bayes_nn models sequentially
for modelName, modelObject in bayes_nn_models:
modelName, modelObject = self._evaluate_model(modelName, modelObject, current_phase)
self.list_of_models[modelName] = modelObject
# Store results and update analysis
model_logs = self.results_df.store_results(
list_of_models=self.list_of_models,
current_phase=current_phase,
models_to_exclude=self.models_to_exclude
)
if model_logs is not None:
model_logs = pd.DataFrame(model_logs)
self.results_analysis[current_phase].phase_results_df = model_logs
return model_logs
else:
print(f"NO MODEL LOGS TO STORE FOR {current_phase} PHASE")
return None