Source code for library.phases.phases_implementation.data_preprocessing.outliers_bounds

from library.phases.phases_implementation.data_preprocessing.bounds_config import BOUNDS
from library.phases.phases_implementation.dataset.dataset import Dataset
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns


from library.utils.miscellaneous.save_or_store_plot import save_or_store_plot


[docs] class OutliersBounds: def __init__(self, dataset: Dataset) -> None: self.dataset = dataset
[docs] def bound_checking(self) -> None: """ Apply numeric *BOUNDS* to *dataset.df* and remove rare violators. The global constant :data:`BOUNDS` must map column names to (min, max) tuples. For each column, the helper will: - Drop rows that lie outside the interval when they represent < 0.5% of the total dataset - Keep (but record) them for manual analysis otherwise Returns ------- None """ # --- Check dataset validity --- if not hasattr(self.dataset, "df"): raise AttributeError("The dataset does not contain an attribute named 'df'.") if not isinstance(self.dataset.df, pd.DataFrame): raise TypeError("self.dataset.df must be a pandas DataFrame.") # --- Validate BOUNDS constant --- if not isinstance(BOUNDS, dict): raise TypeError("BOUNDS must be a dictionary mapping column names to (min, max) tuples.") if not all(isinstance(v, tuple) and len(v) == 2 for v in BOUNDS.values()): raise ValueError("Each value in BOUNDS must be a (min, max) tuple.") self.bound_cols, self.bound_limits = zip(*BOUNDS.items()) # --- Check all bound columns exist --- missing_cols = [col for col in self.bound_cols if col not in self.dataset.df.columns] if missing_cols: raise ValueError(f"The following columns in BOUNDS are missing from the dataset: {missing_cols}") # --- Delegate to helper --- try: self.outliers_dict = self._bound_checking_helper( columnsToCheck=list(self.bound_cols), bounds=list(self.bound_limits) ) except Exception as e: raise RuntimeError(f"An error occurred during bound checking: {e}") return None
def _bound_checking_helper(self, columnsToCheck: list[str] = [], bounds: list[tuple] = []) -> dict[str, pd.DataFrame]: """ Low-level helper that implements the actual bound filtering. Parameters ---------- columnsToCheck : list[str] Column names to validate. bounds : list[tuple[float, float]] Sequence of (min, max) intervals for each column. Returns ------- dict[str, pd.DataFrame] Mapping of column name ➟ offending rows (if any). """ # --- Input validation --- if not columnsToCheck or not all(isinstance(col, str) for col in columnsToCheck): raise ValueError("Parameter 'columnsToCheck' must be a non-empty list of strings.") if not bounds or not all(isinstance(b, tuple) and len(b) == 2 for b in bounds): raise ValueError("Parameter 'bounds' must be a non-empty list of (min, max) tuples.") if len(columnsToCheck) != len(bounds): raise ValueError("Number of columns and bounds must match.") if not hasattr(self.dataset, "df"): raise AttributeError("The dataset does not contain a 'df' attribute.") if not isinstance(self.dataset.df, pd.DataFrame): raise TypeError("self.dataset.df must be a pandas DataFrame.") out_of_bounds = {} for i, column in enumerate(columnsToCheck): print(f"\n--- {i + 1}. Checking column '{column}'") min_val, max_val = bounds[i] if column not in self.dataset.df.columns: print(f"⚠️ Warning: Column '{column}' not found in dataset. Skipping.") continue # Identify out-of-bounds rows try: out_of_range = self.dataset.df[ (self.dataset.df[column] < min_val) | (self.dataset.df[column] > max_val) ] except Exception as e: print(f"❌ Error checking bounds for column '{column}': {e}") continue if not out_of_range.empty: percentage = len(out_of_range) / len(self.dataset.df) * 100 out_of_bounds[column] = out_of_range print(f"Found {len(out_of_range)} values outside bounds [{min_val}, {max_val}]") print(f"Percentage: {percentage:.4f}% of data") if percentage < 0.5: print("→ Less than 0.5%. Deleting these rows...") try: self.dataset.df.drop(index=out_of_range.index, inplace=True) self.dataset.df.reset_index(drop=True, inplace=True) except Exception as e: print(f"❌ Error deleting rows for column '{column}': {e}") else: print("→ More than 0.5%. Keeping them for manual review.") else: print(f"✅ All values in column '{column}' are within bounds [{min_val}, {max_val}]") return out_of_bounds
[docs] def compare_distributions_grid( self, original_df: pd.DataFrame, cleaned_df: pd.DataFrame, columns: list[str] | None = None, bins: int = 50, max_features: int = 20 ) -> None: """ Side-by-side histograms to compare original vs. cleaned features. Parameters ---------- original_df, cleaned_df : pandas.DataFrame Pre and post-processing datasets. columns : list[str] | None Subset of columns to display. Defaults to first *max_features* numeric columns. bins : int Number of histogram bins. max_features : int Maximum number of features to plot. Returns ------- None """ # --- Validations --- if not isinstance(original_df, pd.DataFrame) or not isinstance(cleaned_df, pd.DataFrame): raise TypeError("Both original_df and cleaned_df must be pandas DataFrames.") if columns is not None and not all(isinstance(c, str) for c in columns): raise TypeError("Parameter 'columns' must be a list of strings or None.") if not isinstance(bins, int) or bins <= 0: raise ValueError("Parameter 'bins' must be a positive integer.") if not isinstance(max_features, int) or max_features <= 0: raise ValueError("Parameter 'max_features' must be a positive integer.") numeric_cols = original_df.select_dtypes(include=np.number).columns.tolist() if columns is None: columns = numeric_cols[:max_features] else: columns = [col for col in columns if col in original_df.columns and pd.api.types.is_numeric_dtype(original_df[col])] if not columns: print("⚠️ No numeric columns to plot.") return n = len(columns) cols = 2 rows = int(np.ceil(n / cols)) fig, axes = plt.subplots(rows, cols, figsize=(12, 4 * rows)) axes = axes.flatten() for i, col in enumerate(columns): try: axes[i].hist(original_df[col].dropna(), bins=bins, alpha=0.5, label='Original', color='red') axes[i].hist(cleaned_df[col].dropna(), bins=bins, alpha=0.5, label='Cleaned', color='green') axes[i].set_title(col) axes[i].legend() except Exception as e: axes[i].text(0.5, 0.5, f"Error plotting {col}\n{e}", ha='center') axes[i].set_title(col) for j in range(i + 1, len(axes)): fig.delaxes(axes[j]) plt.tight_layout() plt.show()
[docs] def get_outliers( self, detection_type: str = "iqr", threshold: float = 1.5, save_plots: bool = False, save_path: str = None ) -> str: """ Detects outliers, removes them from X_train, and returns a summary. Parameters ---------- detection_type : str Method used to detect outliers ('iqr' or 'percentile'). plot : bool Whether to show distribution plots of the outlier features. threshold : float Multiplier for IQR used to define outlier bounds. Returns ------- str Summary of the outlier detection operation. """ # --- Validations --- if detection_type not in ("iqr", "percentile"): raise ValueError("detection_type must be 'iqr' or 'percentile'.") if not isinstance(save_plots, bool): raise TypeError("save_plots must be a boolean.") if not isinstance(threshold, (int, float)) or threshold <= 0: raise ValueError("threshold must be a positive number.") # --- Get numeric columns --- if not hasattr(self.dataset, "X_train"): raise AttributeError("The dataset is missing 'X_train'.") if not isinstance(self.dataset.X_train, pd.DataFrame): raise TypeError("self.dataset.X_train must be a pandas DataFrame.") columns = self.dataset.X_train.select_dtypes(include=["number"]).columns.tolist() outlier_rows = [] for feature in columns: if feature not in self.dataset.X_train.columns: print(f"⚠️ Skipping missing column '{feature}'") continue original_values = self.dataset.X_train[feature] if original_values.nunique() < 2: continue lower_bound, upper_bound = None, None if detection_type == "iqr": q1 = original_values.quantile(0.25) q3 = original_values.quantile(0.75) iqr = q3 - q1 lower_bound = q1 - threshold * iqr upper_bound = q3 + threshold * iqr elif detection_type == "percentile": upper_bound = original_values.quantile(0.99) lower_bound = original_values.min() # Keep lower end untouched # Identify outliers outlier_mask = (original_values < lower_bound) | (original_values > upper_bound) outliersDataset = original_values[outlier_mask] outliers_count = outlier_mask.sum() if outliers_count > 0: outlier_rows.append({ "feature": feature, "outlierCount": outliers_count, "percentageOfOutliers": outliers_count / len(original_values) * 100, "descriptiveStatistics": original_values.describe(), "outliersValues": outliersDataset.values }) if save_plots: fig, ax = plt.subplots(figsize=(15, 4)) ax.set_title(f"Distribution of '{feature}'") sns.histplot(original_values, kde=True, ax=ax) save_or_store_plot(fig, save_plots, save_path + "/outliers_bounds/outliers", f"{feature}.png") if detection_type == "iqr": self.dataset.X_train = self.dataset.X_train[~outlier_mask] elif detection_type == "percentile": self.dataset.X_train[feature] = original_values.clip(upper=upper_bound) self.dataset.X_train.reset_index(drop=True, inplace=True) outlier_df = pd.DataFrame(outlier_rows) return ( f"There are {len(outlier_df)} features with outliers out of " f"{len(columns)} numerical features " f"({len(outlier_df) / len(columns) * 100:.2f}%)" f"New X_train shape: {self.dataset.X_train.shape} and y_train shape: {self.dataset.y_train.shape}" )