from abc import ABC, abstractmethod from pandas import DataFrame, Series from pandas.api.types import is_numeric_dtype from sklearn.neighbors import KNeighborsClassifier from typing import Any, Union class DataFrameFunction(ABC): """A command that may be applied in-place to a dataframe.""" @abstractmethod def apply(self, df: DataFrame, label: str, series: Series) -> DataFrame: """Apply the current function to the given dataframe, in-place. The series is described by its label and dataframe.""" return df class MVStrategy(DataFrameFunction): """A way to handle missing values in a dataframe.""" @staticmethod def list_available(df: DataFrame, label: str, series: Series) -> list['MVStrategy']: """Get all the strategies that can be used.""" choices = [DropStrategy(), ModeStrategy()] if is_numeric_dtype(series): choices.extend((MeanStrategy(), MedianStrategy(), LinearRegressionStrategy())) other_columns = df.select_dtypes(include="number").drop(label, axis=1).columns.to_list() if len(other_columns): choices.append(KNNStrategy(other_columns)) return choices class ScalingStrategy(DataFrameFunction): """A way to handle missing values in a dataframe.""" @staticmethod def list_available(df: DataFrame, series: Series) -> list['MVStrategy']: """Get all the strategies that can be used.""" choices = [KeepStrategy()] if is_numeric_dtype(series): choices.extend((MinMaxStrategy(), ZScoreStrategy())) if series.sum() != 0: choices.append(UnitLengthStrategy()) return choices class DropStrategy(MVStrategy): #@typing.override def apply(self, df: DataFrame, label: str, series: Series) -> DataFrame: df.dropna(subset=label, inplace=True) return df def __str__(self) -> str: return "Drop" class PositionStrategy(MVStrategy): #@typing.override def apply(self, df: DataFrame, label: str, series: Series) -> DataFrame: series.fillna(self.get_value(series), inplace=True) return df @abstractmethod def get_value(self, series: Series) -> Any: pass class MeanStrategy(PositionStrategy): #@typing.override def get_value(self, series: Series) -> Union[int, float]: return series.mean() def __str__(self) -> str: return "Use mean" class MedianStrategy(PositionStrategy): #@typing.override def get_value(self, series: Series) -> Union[int, float]: return series.median() def __str__(self) -> str: return "Use median" class ModeStrategy(PositionStrategy): #@typing.override def get_value(self, series: Series) -> Any: return series.mode()[0] def __str__(self) -> str: return "Use mode" class LinearRegressionStrategy(MVStrategy): def apply(self, df: DataFrame, label: str, series: Series) -> DataFrame: series.interpolate(inplace=True) return df def __str__(self) -> str: return "Use linear regression" class KNNStrategy(MVStrategy): def __init__(self, training_features: list[str]): self.available_features = training_features self.training_features = training_features self.n_neighbors = 3 def apply(self, df: DataFrame, label: str, series: Series) -> DataFrame: # Remove any training column that have any missing values usable_data = df.dropna(subset=self.training_features) # Select columns to impute from train_data = usable_data.dropna(subset=label) # Create train dataframe x_train = train_data.drop(label, axis=1) y_train = train_data[label] reg = KNeighborsClassifier(self.n_neighbors).fit(x_train, y_train) # Create test dataframe test_data = usable_data[usable_data[label].isnull()] if test_data.empty: return df x_test = test_data.drop(label, axis=1) predicted = reg.predict(x_test) # Fill with predicated values and patch the original data usable_data[label].fillna(Series(predicted), inplace=True) df.fillna(usable_data, inplace=True) return df def count_max(self, df: DataFrame, label: str) -> int: usable_data = df.dropna(subset=self.training_features) return usable_data[label].count() def __str__(self) -> str: return "kNN" class KeepStrategy(ScalingStrategy): #@typing.override def apply(self, df: DataFrame, label: str, series: Series) -> DataFrame: return df def __str__(self) -> str: return "No-op" class MinMaxStrategy(ScalingStrategy): #@typing.override def apply(self, df: DataFrame, label: str, series: Series) -> DataFrame: minimum = series.min() maximum = series.max() df[label] = (series - minimum) / (maximum - minimum) return df def __str__(self) -> str: return "Min-max" class ZScoreStrategy(ScalingStrategy): #@typing.override def apply(self, df: DataFrame, label: str, series: Series) -> DataFrame: df[label] = (series - series.mean()) / series.std() return df def __str__(self) -> str: return "Z-Score" class UnitLengthStrategy(ScalingStrategy): #@typing.override def apply(self, df: DataFrame, label: str, series: Series) -> DataFrame: df[label] = series / series.sum() return df def __str__(self) -> str: return "Unit length"