from sklearn.cluster import DBSCAN, KMeans import numpy as np from dataclasses import dataclass from abc import ABC, abstractmethod from typing import Any, Optional @dataclass class ClusterResult: labels: np.array centers: Optional[np.array] statistics: list[dict[str, Any]] class Cluster(ABC): @abstractmethod def run(self, data: np.array) -> ClusterResult: pass class DBSCANCluster(Cluster): def __init__(self, eps: float = 0.5, min_samples: int = 5): self.eps = eps self.min_samples = min_samples #@typing.override def run(self, data: np.array) -> ClusterResult: dbscan = DBSCAN(eps=self.eps, min_samples=self.min_samples) labels = dbscan.fit_predict(data) return ClusterResult(labels, None, self.get_statistics(data, labels)) def get_statistics(self, data: np.array, labels: np.array) -> list[dict[str, Any]]: unique_labels = np.unique(labels) stats = [] for label in unique_labels: if label == -1: continue cluster_points = data[labels == label] num_points = len(cluster_points) density = num_points / (np.max(cluster_points, axis=0) - np.min(cluster_points, axis=0)).prod() stats.append({ "cluster": label, "num_points": num_points, "density": density }) return stats def __str__(self) -> str: return "DBScan" class KMeansCluster(Cluster): def __init__(self, n_clusters: int = 8, n_init: int = 1, max_iter: int = 300): self.n_clusters = n_clusters self.n_init = n_init self.max_iter = max_iter #@typing.override def run(self, data: np.array) -> ClusterResult: kmeans = KMeans(n_clusters=self.n_clusters, init="random", n_init=self.n_init, max_iter=self.max_iter, random_state=111) labels = kmeans.fit_predict(data) centers = kmeans.cluster_centers_ return ClusterResult(labels, centers, self.get_statistics(data, labels, centers)) def get_statistics(self, data: np.array, labels: np.array, centers: np.array) -> list[dict[str, Any]]: unique_labels = np.unique(labels) stats = [] for label in unique_labels: cluster_points = data[labels == label] num_points = len(cluster_points) center = centers[label] stats.append({ "cluster": label, "num_points": num_points, "center": center, }) return stats def __str__(self) -> str: return "KMeans" CLUSTERING_STRATEGIES = [DBSCANCluster(), KMeansCluster()]