Source code for library.phases.runners.dataPreprocessing_runner
from library.utils.phase_runner_definition.phase_runner import PhaseRunner
from library.pipeline.pipeline_manager import PipelineManager
from library.phases.phases_implementation.data_preprocessing.data_preprocessing import Preprocessing
import yaml
from pathlib import Path
[docs]
class DataPreprocessingRunner(PhaseRunner):
def __init__(self, pipeline_manager: PipelineManager, include_plots: bool = False, save_path: str = "") -> None:
super().__init__(pipeline_manager, include_plots, save_path)
self.variables = pipeline_manager.variables
def _feature_encoding_helper(self) -> dict:
encoded_maps_per_pipeline = self.pipeline_manager.all_pipelines_execute(methodName="feature_analysis.feature_transformation.get_categorical_features_encoded",
verbose=True,
features=self.pipeline_manager.variables["feature_analysis_runner"]["features_to_encode"],
encode_y=True)
print(f"ENCODED MAP PIPELINS IS: {encoded_maps_per_pipeline}")
self.pipeline_manager.pipelines_analysis.encoded_map = encoded_maps_per_pipeline["not_baseline"]["ensembled"]
def _create_pipelines_divergences(self):
self.pipeline_manager.create_pipeline_divergence(category="not_baseline", pipelineName="ensembled")
self.pipeline_manager.create_pipeline_divergence(category="not_baseline", pipelineName="tree_based")
self.pipeline_manager.create_pipeline_divergence(category="not_baseline", pipelineName="support_vector_machine")
self.pipeline_manager.create_pipeline_divergence(category="not_baseline", pipelineName="naive_bayes")
self.pipeline_manager.create_pipeline_divergence(category="not_baseline", pipelineName="feed_forward_neural_network")
self.pipeline_manager.create_pipeline_divergence(category="not_baseline", pipelineName="stacking")
self.pipeline_manager.create_pipeline_divergence(category="baseline", pipelineName="baselines")
print(f"Pipelines AFTER divergences: {self.pipeline_manager.pipelines}")
return None
def _execute_preprocessing(self, preprocessing: Preprocessing, pipeline_name: str) -> str:
"""
Apply missing-value handling, duplicate analysis, outlier bounding,
feature scaling, and class-imbalance correction based on config,
returning a composed summary of operations/results.
Parameters
----------
preprocessing: Preprocessing
The preprocessing object contains all the methods
pipeline_name: str
The name of the pipeline to use.
Returns
-------
str
A composed summary of operations/results.
"""
messages = []
save_path = self.save_path + f"/{pipeline_name}"
# 1) Missing values & Duplicate analysis
print(f"\nPreprocessing --- Missing Values & Duplicates - {pipeline_name}\n")
missing_res = preprocessing.uncomplete_data_obj.get_missing_values(
placeholders=self.variables["data_preprocessing_runner"]["placeholders"],
save_plots=self.include_plots,
save_path=save_path
)
messages.append(f"Handled missing values : {missing_res}")
dup_res = preprocessing.uncomplete_data_obj.analyze_duplicates(
save_plots=self.include_plots,
save_path=save_path
)
messages.append(f"Duplicates analyzed : {dup_res}")
# 2) Outlier detection & bounding
print(f"\nPreprocessing --- Bounds & Outliers - {pipeline_name}\n")
#COMMENTED DOWN CAUSE ITS INTRODUCING SOME WEIRD ERROR -- ISNT IMPLEMENTED CORRECTED ANYWAY
out_res = preprocessing.outliers_bounds_obj.get_outliers(
detection_type=self.variables["data_preprocessing_runner"]["outliers"]["detection_type"],
save_plots=False, # HARDCODED FOR DEBUGGING. REMOVE IN PRODUCTION.
save_path=save_path
)
preprocessing.outliers_bounds_obj.bound_checking()
messages.append(f"Outliers detected by {self.variables['data_preprocessing_runner']['outliers']['detection_type']} : {None}")
# 3) Feature scaling
print(f"\nPreprocessing --- Feature Scaling - {pipeline_name}\n")
scaler = self.variables["data_preprocessing_runner"]["pipeline_specific_configurations"]["scaler"][pipeline_name]
if scaler == "no_scaler":
scale_res = "No scaling performed"
else:
scale_res = preprocessing.feature_scaling_obj.scale_features(
scaler=scaler,
columnsToScale=preprocessing.dataset.X_train.select_dtypes(include=["number"]).columns,
save_plots=self.include_plots,
save_path=save_path
)
messages.append(f"Features scaled with {scaler} : {scale_res}")
# COMMENTED DOWN CAUSE IT GOES TOO SLOW
# 4) Class imbalance correction
print(f"\nPreprocessing --- Class Imbalance - {pipeline_name}\n")
imbalancer = self.variables["data_preprocessing_runner"]["pipeline_specific_configurations"]["imbalancer"][pipeline_name]
if imbalancer == "no_imbalancer":
imb_res = "No imbalancer performed"
else:
imb_res = preprocessing.class_imbalance_obj.class_imbalance(
method=imbalancer,
save_plots=self.include_plots,
save_path=save_path
)
messages.append(f"Class imbalance: {imb_res}")
print(f"Messages: {messages}")
return "; ".join(messages)
[docs]
def run(self) -> None:
self._feature_encoding_helper()
self._create_pipelines_divergences()
print("-"*30)
print("STARTING PREPROCESSING")
print("-"*30)
results = {}
for category_name, pipelines in self.pipeline_manager.pipelines.items():
results[category_name] = {}
for pipeline_name, pipeline in pipelines.items():
print(f"--> Running preprocessing on pipeline: {category_name} / {pipeline_name}")
print("-"*30)
summary = self._execute_preprocessing(preprocessing=pipeline.preprocessing, pipeline_name=pipeline_name)
print(summary)
results[category_name][pipeline_name] = summary
return results