You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
84 lines
2.6 KiB
84 lines
2.6 KiB
from sklearn.cluster import DBSCAN, KMeans
|
|
import numpy as np
|
|
from dataclasses import dataclass
|
|
from abc import ABC, abstractmethod
|
|
from typing import Any, Optional
|
|
|
|
@dataclass
|
|
class ClusterResult:
|
|
labels: np.array
|
|
centers: Optional[np.array]
|
|
statistics: list[dict[str, Any]]
|
|
|
|
|
|
class Cluster(ABC):
|
|
@abstractmethod
|
|
def run(self, data: np.array) -> ClusterResult:
|
|
pass
|
|
|
|
|
|
class DBSCANCluster(Cluster):
|
|
def __init__(self, eps: float = 0.5, min_samples: int = 5):
|
|
self.eps = eps
|
|
self.min_samples = min_samples
|
|
|
|
#@typing.override
|
|
def run(self, data: np.array) -> ClusterResult:
|
|
dbscan = DBSCAN(eps=self.eps, min_samples=self.min_samples)
|
|
labels = dbscan.fit_predict(data)
|
|
return ClusterResult(labels, None, self.get_statistics(data, labels))
|
|
|
|
def get_statistics(self, data: np.array, labels: np.array) -> list[dict[str, Any]]:
|
|
unique_labels = np.unique(labels)
|
|
stats = []
|
|
for label in unique_labels:
|
|
if label == -1:
|
|
continue
|
|
cluster_points = data[labels == label]
|
|
num_points = len(cluster_points)
|
|
density = num_points / (np.max(cluster_points, axis=0) - np.min(cluster_points, axis=0)).prod()
|
|
stats.append({
|
|
"cluster": label,
|
|
"num_points": num_points,
|
|
"density": density
|
|
})
|
|
return stats
|
|
|
|
def __str__(self) -> str:
|
|
return "DBScan"
|
|
|
|
|
|
class KMeansCluster(Cluster):
|
|
def __init__(self, n_clusters: int = 8, n_init: int = 1, max_iter: int = 300):
|
|
self.n_clusters = n_clusters
|
|
self.n_init = n_init
|
|
self.max_iter = max_iter
|
|
|
|
#@typing.override
|
|
def run(self, data: np.array) -> ClusterResult:
|
|
kmeans = KMeans(n_clusters=self.n_clusters, init="random", n_init=self.n_init, max_iter=self.max_iter, random_state=111)
|
|
labels = kmeans.fit_predict(data)
|
|
centers = kmeans.cluster_centers_
|
|
return ClusterResult(labels, centers, self.get_statistics(data, labels, centers))
|
|
|
|
def get_statistics(self, data: np.array, labels: np.array, centers: np.array) -> list[dict[str, Any]]:
|
|
unique_labels = np.unique(labels)
|
|
stats = []
|
|
|
|
for label in unique_labels:
|
|
cluster_points = data[labels == label]
|
|
num_points = len(cluster_points)
|
|
center = centers[label]
|
|
stats.append({
|
|
"cluster": label,
|
|
"num_points": num_points,
|
|
"center": center,
|
|
})
|
|
return stats
|
|
|
|
def __str__(self) -> str:
|
|
return "KMeans"
|
|
|
|
|
|
CLUSTERING_STRATEGIES = [DBSCANCluster(), KMeansCluster()]
|