Compare commits
2 Commits
c8cf0fe045
...
987e255dad
Author | SHA1 | Date |
---|---|---|
|
987e255dad | 1 year ago |
|
588ee700ab | 1 year ago |
@ -0,0 +1 @@
|
|||||||
|
from . import normstrategy
|
@ -0,0 +1,179 @@
|
|||||||
|
from abc import ABC, abstractmethod
|
||||||
|
from pandas import DataFrame, Series
|
||||||
|
from pandas.api.types import is_numeric_dtype
|
||||||
|
from sklearn.neighbors import KNeighborsClassifier
|
||||||
|
from typing import Any, Union
|
||||||
|
|
||||||
|
class DataFrameFunction(ABC):
|
||||||
|
"""A command that may be applied in-place to a dataframe."""
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def apply(self, df: DataFrame, label: str, series: Series) -> DataFrame:
|
||||||
|
"""Apply the current function to the given dataframe, in-place.
|
||||||
|
|
||||||
|
The series is described by its label and dataframe."""
|
||||||
|
return df
|
||||||
|
|
||||||
|
|
||||||
|
class MVStrategy(DataFrameFunction):
|
||||||
|
"""A way to handle missing values in a dataframe."""
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def list_available(df: DataFrame, label: str, series: Series) -> list['MVStrategy']:
|
||||||
|
"""Get all the strategies that can be used."""
|
||||||
|
choices = [DropStrategy(), ModeStrategy()]
|
||||||
|
if is_numeric_dtype(series):
|
||||||
|
choices.extend((MeanStrategy(), MedianStrategy(), LinearRegressionStrategy()))
|
||||||
|
other_columns = df.select_dtypes(include="number").drop(label, axis=1).columns.to_list()
|
||||||
|
if len(other_columns):
|
||||||
|
choices.append(KNNStrategy(other_columns))
|
||||||
|
return choices
|
||||||
|
|
||||||
|
|
||||||
|
class ScalingStrategy(DataFrameFunction):
|
||||||
|
"""A way to handle missing values in a dataframe."""
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def list_available(df: DataFrame, series: Series) -> list['MVStrategy']:
|
||||||
|
"""Get all the strategies that can be used."""
|
||||||
|
choices = [KeepStrategy()]
|
||||||
|
if is_numeric_dtype(series):
|
||||||
|
choices.extend((MinMaxStrategy(), ZScoreStrategy()))
|
||||||
|
if series.sum() != 0:
|
||||||
|
choices.append(UnitLengthStrategy())
|
||||||
|
return choices
|
||||||
|
|
||||||
|
|
||||||
|
class DropStrategy(MVStrategy):
|
||||||
|
#@typing.override
|
||||||
|
def apply(self, df: DataFrame, label: str, series: Series) -> DataFrame:
|
||||||
|
df.dropna(subset=label, inplace=True)
|
||||||
|
return df
|
||||||
|
|
||||||
|
def __str__(self) -> str:
|
||||||
|
return "Drop"
|
||||||
|
|
||||||
|
|
||||||
|
class PositionStrategy(MVStrategy):
|
||||||
|
#@typing.override
|
||||||
|
def apply(self, df: DataFrame, label: str, series: Series) -> DataFrame:
|
||||||
|
series.fillna(self.get_value(series), inplace=True)
|
||||||
|
return df
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def get_value(self, series: Series) -> Any:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class MeanStrategy(PositionStrategy):
|
||||||
|
#@typing.override
|
||||||
|
def get_value(self, series: Series) -> Union[int, float]:
|
||||||
|
return series.mean()
|
||||||
|
|
||||||
|
def __str__(self) -> str:
|
||||||
|
return "Use mean"
|
||||||
|
|
||||||
|
|
||||||
|
class MedianStrategy(PositionStrategy):
|
||||||
|
#@typing.override
|
||||||
|
def get_value(self, series: Series) -> Union[int, float]:
|
||||||
|
return series.median()
|
||||||
|
|
||||||
|
def __str__(self) -> str:
|
||||||
|
return "Use median"
|
||||||
|
|
||||||
|
|
||||||
|
class ModeStrategy(PositionStrategy):
|
||||||
|
#@typing.override
|
||||||
|
def get_value(self, series: Series) -> Any:
|
||||||
|
return series.mode()[0]
|
||||||
|
|
||||||
|
def __str__(self) -> str:
|
||||||
|
return "Use mode"
|
||||||
|
|
||||||
|
|
||||||
|
class LinearRegressionStrategy(MVStrategy):
|
||||||
|
def apply(self, df: DataFrame, label: str, series: Series) -> DataFrame:
|
||||||
|
series.interpolate(inplace=True)
|
||||||
|
return df
|
||||||
|
|
||||||
|
def __str__(self) -> str:
|
||||||
|
return "Use linear regression"
|
||||||
|
|
||||||
|
|
||||||
|
class KNNStrategy(MVStrategy):
|
||||||
|
def __init__(self, training_features: list[str]):
|
||||||
|
self.available_features = training_features
|
||||||
|
self.training_features = training_features
|
||||||
|
self.n_neighbors = 3
|
||||||
|
|
||||||
|
def apply(self, df: DataFrame, label: str, series: Series) -> DataFrame:
|
||||||
|
# Remove any training column that have any missing values
|
||||||
|
usable_data = df.dropna(subset=self.training_features)
|
||||||
|
# Select columns to impute from
|
||||||
|
train_data = usable_data.dropna(subset=label)
|
||||||
|
# Create train dataframe
|
||||||
|
x_train = train_data.drop(label, axis=1)
|
||||||
|
y_train = train_data[label]
|
||||||
|
|
||||||
|
reg = KNeighborsClassifier(self.n_neighbors).fit(x_train, y_train)
|
||||||
|
|
||||||
|
# Create test dataframe
|
||||||
|
test_data = usable_data[usable_data[label].isnull()]
|
||||||
|
if test_data.empty:
|
||||||
|
return df
|
||||||
|
x_test = test_data.drop(label, axis=1)
|
||||||
|
predicted = reg.predict(x_test)
|
||||||
|
|
||||||
|
# Fill with predicated values and patch the original data
|
||||||
|
usable_data[label].fillna(Series(predicted), inplace=True)
|
||||||
|
df.fillna(usable_data, inplace=True)
|
||||||
|
return df
|
||||||
|
|
||||||
|
def count_max(self, df: DataFrame, label: str) -> int:
|
||||||
|
usable_data = df.dropna(subset=self.training_features)
|
||||||
|
return usable_data[label].count()
|
||||||
|
|
||||||
|
def __str__(self) -> str:
|
||||||
|
return "kNN"
|
||||||
|
|
||||||
|
|
||||||
|
class KeepStrategy(ScalingStrategy):
|
||||||
|
#@typing.override
|
||||||
|
def apply(self, df: DataFrame, label: str, series: Series) -> DataFrame:
|
||||||
|
return df
|
||||||
|
|
||||||
|
def __str__(self) -> str:
|
||||||
|
return "No-op"
|
||||||
|
|
||||||
|
|
||||||
|
class MinMaxStrategy(ScalingStrategy):
|
||||||
|
#@typing.override
|
||||||
|
def apply(self, df: DataFrame, label: str, series: Series) -> DataFrame:
|
||||||
|
minimum = series.min()
|
||||||
|
maximum = series.max()
|
||||||
|
df[label] = (series - minimum) / (maximum - minimum)
|
||||||
|
return df
|
||||||
|
|
||||||
|
def __str__(self) -> str:
|
||||||
|
return "Min-max"
|
||||||
|
|
||||||
|
|
||||||
|
class ZScoreStrategy(ScalingStrategy):
|
||||||
|
#@typing.override
|
||||||
|
def apply(self, df: DataFrame, label: str, series: Series) -> DataFrame:
|
||||||
|
df[label] = (series - series.mean()) / series.std()
|
||||||
|
return df
|
||||||
|
|
||||||
|
def __str__(self) -> str:
|
||||||
|
return "Z-Score"
|
||||||
|
|
||||||
|
|
||||||
|
class UnitLengthStrategy(ScalingStrategy):
|
||||||
|
#@typing.override
|
||||||
|
def apply(self, df: DataFrame, label: str, series: Series) -> DataFrame:
|
||||||
|
df[label] = series / series.sum()
|
||||||
|
return df
|
||||||
|
|
||||||
|
def __str__(self) -> str:
|
||||||
|
return "Unit length"
|
@ -0,0 +1,63 @@
|
|||||||
|
from sklearn.cluster import DBSCAN, KMeans
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
class DBSCAN_cluster():
|
||||||
|
def __init__(self, eps, min_samples,data):
|
||||||
|
self.eps = eps
|
||||||
|
self.min_samples = min_samples
|
||||||
|
self.data = data
|
||||||
|
self.labels = np.array([])
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
dbscan = DBSCAN(eps=self.eps, min_samples=self.min_samples)
|
||||||
|
self.labels = dbscan.fit_predict(self.data)
|
||||||
|
return self.labels
|
||||||
|
|
||||||
|
def get_stats(self):
|
||||||
|
unique_labels = np.unique(self.labels)
|
||||||
|
stats = []
|
||||||
|
for label in unique_labels:
|
||||||
|
if label == -1:
|
||||||
|
continue
|
||||||
|
cluster_points = self.data[self.labels == label]
|
||||||
|
num_points = len(cluster_points)
|
||||||
|
density = num_points / (np.max(cluster_points, axis=0) - np.min(cluster_points, axis=0)).prod()
|
||||||
|
stats.append({
|
||||||
|
"cluster": label,
|
||||||
|
"num_points": num_points,
|
||||||
|
"density": density
|
||||||
|
})
|
||||||
|
|
||||||
|
return stats
|
||||||
|
|
||||||
|
|
||||||
|
class KMeans_cluster():
|
||||||
|
def __init__(self, n_clusters, n_init, max_iter, data):
|
||||||
|
self.n_clusters = n_clusters
|
||||||
|
self.n_init = n_init
|
||||||
|
self.max_iter = max_iter
|
||||||
|
self.data = data
|
||||||
|
self.labels = np.array([])
|
||||||
|
self.centers = []
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
kmeans = KMeans(n_clusters=self.n_clusters, init="random", n_init=self.n_init, max_iter=self.max_iter, random_state=111)
|
||||||
|
self.labels = kmeans.fit_predict(self.data)
|
||||||
|
self.centers = kmeans.cluster_centers_
|
||||||
|
return self.labels
|
||||||
|
|
||||||
|
|
||||||
|
def get_stats(self):
|
||||||
|
unique_labels = np.unique(self.labels)
|
||||||
|
stats = []
|
||||||
|
|
||||||
|
for label in unique_labels:
|
||||||
|
cluster_points = self.data[self.labels == label]
|
||||||
|
num_points = len(cluster_points)
|
||||||
|
center = self.centers[label]
|
||||||
|
stats.append({
|
||||||
|
'cluster': label,
|
||||||
|
'num_points': num_points,
|
||||||
|
'center': center
|
||||||
|
})
|
||||||
|
return stats
|
Loading…
Reference in new issue