import pandas as pd
import os
from abc import ABC, abstractmethod
from statsmodels.stats.outliers_influence import variance_inflation_factor
from sklearn.feature_selection import mutual_info_regression
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt
from library.phases.phases_implementation.dataset.dataset import Dataset
from library.phases.phases_implementation.EDA.EDA import EDA
from library.utils.miscellaneous.save_or_store_plot import save_or_store_plot
[docs]
class ManualFeatureSelection():
"""
"""
def __init__(self, dataset: Dataset):
self.dataset = dataset
self.options = {
"VIF": VIFElimination,
"LowVariances": LowVariancesFeatureReduction,
"MutualInformation": MutualInformationFeatureReduction,
"PCA": PCAFeatureReduction
}
[docs]
def fit(self, type: str, threshold: float, delete_features: bool, save_plots: bool, save_path: str):
print(f"Running {type} feature selection")
return self.options[type](self.dataset).fit(threshold, delete_features, save_plots, save_path)
[docs]
class ManualFeatureSelectionFactory(ABC):
def __init__(self, dataset: Dataset):
self.dataset = dataset
[docs]
@abstractmethod
def fit(self, threshold: float, delete_features: bool, save_plots: bool, save_path: str):
pass
[docs]
class VIFElimination(ManualFeatureSelectionFactory):
def __init__(self, dataset: Dataset):
super().__init__(dataset)
def __calculate_vif(self):
"""
Calculates the VIF of the features.
Returns
-------
pd.DataFrame
A dataframe with the features and their VIF.
"""
vif_data = pd.DataFrame()
only_numerical_features = self.dataset.X_train.select_dtypes(include=["number"])
vif_data["Feature"] = only_numerical_features.columns
vif_data["VIF"] = [variance_inflation_factor(only_numerical_features.values, i) for i in range(len(only_numerical_features.columns))]
return vif_data
[docs]
def fit(self, threshold=10, delete_features: bool = True, save_plots: bool = False, save_path: str = ""):
"""
Starts the VIF elimination process. Eliminates in all sets.
Note: this is computationally expensive for high-feature datasets.
Parameters
----------
threshold : float
The threshold for the VIF.
Returns
-------
None
"""
number_of_iterations = 0
if save_plots:
fig, axes = plt.subplots(1, 2, figsize=(12, 6))
eda = EDA(self.dataset)
eda.plot_correlation_matrix(size="l", numerical_df=self.dataset.X_train.select_dtypes(include=["number"]), title="Prior-Elimination", save_plots=save_plots, save_path=save_path)
while True:
number_of_iterations += 1
vif_data = self.__calculate_vif()
print(f"VIF computed for iteration {number_of_iterations}:")
max_vif = vif_data["VIF"].max()
if max_vif < threshold:
break
feature_to_drop = vif_data.loc[vif_data["VIF"].idxmax(), "Feature"]
if delete_features:
self.dataset.X_train.drop(columns=[feature_to_drop], inplace=True)
self.dataset.X_val.drop(columns=[feature_to_drop], inplace=True)
self.dataset.X_test.drop(columns=[feature_to_drop], inplace=True)
print(f"\tDropped: '{feature_to_drop}' with a VIF of {max_vif}")
else:
print(f"Feature with highest VIF: '{feature_to_drop}' with VIF: {max_vif}")
break
if save_plots:
eda.plot_correlation_matrix(size="l", numerical_df=self.dataset.X_train.select_dtypes(include=["number"]), title="Post-Elimination", save_plots=save_plots, save_path=save_path)
[docs]
class LowVariancesFeatureReduction(ManualFeatureSelectionFactory):
def __init__(self, dataset: Dataset):
super().__init__(dataset)
[docs]
def constant_features_reduction(self):
"""
Removes constant features from the dataset.
"""
original_number_of_features = self.dataset.df.shape[1]
zero_variance_features = self.dataset.df.select_dtypes(include='number').std() == 0
if zero_variance_features.any():
print("Zero-variance features found:")
print(zero_variance_features[zero_variance_features].index)
self.dataset.df.drop(columns=zero_variance_features[zero_variance_features].index, inplace=True)
print(f"Removed {original_number_of_features - self.dataset.df.shape[1]} features with zero variance")
else:
print("No zero-variance features found.")
[docs]
def fit(self, threshold: float = 0.01, delete_features: bool = True, save_plots: bool = False, save_path: str = ""):
"""
Removes the features with low variance.
"""
self.constant_features_reduction()
# Create dataframe with feature and standard deviation (spreadness)
spreadness_df = pd.DataFrame({
"feature": self.dataset.X_train.columns,
"spreadness": self.dataset.X_train.std()
}).reset_index(drop=True)
if save_plots:
fig, ax = plt.subplots(figsize=(12, 8))
ax.hist(spreadness_df["spreadness"], bins=30, edgecolor='black')
ax.set_title('Distribution of Standard Deviations for Numeric Features')
ax.set_xlabel('Standard Deviation')
ax.set_ylabel('Frequency')
save_or_store_plot(fig, save_plots, save_path + "/feature_selection/manual/low_variances", "low_variances.png")
columns_to_drop = spreadness_df[spreadness_df["spreadness"] < threshold]["feature"].tolist()
if delete_features:
self.dataset.X_train.drop(columns=columns_to_drop, inplace=True)
self.dataset.X_val.drop(columns=columns_to_drop, inplace=True)
self.dataset.X_test.drop(columns=columns_to_drop, inplace=True)
[docs]
class PCAFeatureReduction(ManualFeatureSelectionFactory):
def __init__(self, dataset: Dataset):
super().__init__(dataset)
[docs]
def fit(self, threshold: float = 0.95, delete_features: bool = True, save_plots: bool = False, save_path: str = ""):
"""
Reduces the number of features using PCA.
"""
# After fitting PCA
pca = PCA(n_components=threshold)
pca.fit(self.dataset.X_train)
# Determine how many components were kept
num_components = pca.n_components_
print(f'PCA kept {num_components} components')
columns = [f'PC{i+1}' for i in range(num_components)]
if delete_features:
# Transform and convert back to DataFrame
self.dataset.X_train = pd.DataFrame(
pca.transform(self.dataset.X_train),
columns=columns,
index=self.dataset.X_train.index
)
self.dataset.X_val = pd.DataFrame(
pca.transform(self.dataset.X_val),
columns=columns,
index=self.dataset.X_val.index
)
self.dataset.X_test = pd.DataFrame(
pca.transform(self.dataset.X_test),
columns=columns,
index=self.dataset.X_test.index
)