Source code for heat.cluster.spectral

"""
Module for Spectral Clustering, a graph-based machine learning algorithm
"""

import heat as ht
import math
import torch
from typing import Tuple
from heat.core.dndarray import DNDarray


[docs] class Spectral(ht.ClusteringMixin, ht.BaseEstimator): """ Spectral clustering Attributes ---------- n_clusters : int Number of clusters to fit gamma : float Kernel coefficient sigma for 'rbf', ignored for metric='euclidean' metric : string How to construct the similarity matrix. - 'rbf' : construct the similarity matrix using a radial basis function (RBF) kernel. - 'euclidean' : construct the similarity matrix as only euclidean distance. laplacian : str How to calculate the graph laplacian (affinity) Currently supported : 'fully_connected', 'eNeighbour' threshold : float Threshold for affinity matrix if laplacian='eNeighbour' Ignorded for laplacian='fully_connected' boundary : str How to interpret threshold: 'upper', 'lower' Ignorded for laplacian='fully_connected' eigen_solver : str The eigenvalue decomposition strategy to use. - 'lanczos' : Use Lanczos iterations to reduce the Laplacian matrix size before applying the torch eigenvalue solver. - 'randomized' : Use a randomized algorithm to compute the approximate eigenvalues and eigenvectors. reigh_rank : int number of samples for randomized eigenvalue decomposition. Only used if eigen_solver='randomized'. It must hold reigh_rank >= n_clusters. If n_clusters is None (automatic selection of number of clusters), reigh_rank gives an upper bound on the number of clusters that can be found. Therefore, reigh_rank should be set high enough to capture the expected number of clusters in that case. reigh_n_oversamples : int number of oversamples for randomized eigenvalue decomposition. Only used if eigen_solver='randomized'. Default is 10. reigh_power_iter : int number of power iterations for randomized eigenvalue decomposition. Only used if eigen_solver='randomized'. Default is 0. Consider increasing this value if the eigen-spectrum of the Laplacian decays slowly. lanczos_n_iter : int number of Lanczos iterations for Eigenvalue decomposition. Only used if eigen_solver='lanczos'. Default is 300. assign_labels: str The strategy to use to assign labels in the embedding space. **params: dict Parameter dictionary for the assign_labels estimator """ def __init__( self, n_clusters: int = None, gamma: float = 1.0, metric: str = "rbf", laplacian: str = "fully_connected", threshold: float = 1.0, boundary: str = "upper", eigen_solver: str = "randomized", reigh_rank: int = 100, reigh_n_oversamples: int = 10, reigh_power_iter: int = 0, lanczos_n_iter: int = 300, assign_labels: str = "kmeans", **params, ): self.n_clusters = n_clusters self.gamma = gamma self.metric = metric self.laplacian = laplacian self.threshold = threshold self.boundary = boundary self.lanczos_n_iter = lanczos_n_iter self.assign_labels = assign_labels self.eigen_solver = eigen_solver self.reigh_n_oversamples = reigh_n_oversamples self.reigh_power_iter = reigh_power_iter self.reigh_rank = reigh_rank if eigen_solver not in ["lanczos", "randomized"]: raise NotImplementedError( f"Currently only 'lanczos' and 'randomized' eigen_solver are supported, but got '{eigen_solver}' as input." ) if eigen_solver == "randomized" and reigh_rank < ( n_clusters if n_clusters is not None else 1 ): raise ValueError("reigh_rank must be at least equal to n_clusters") if metric == "rbf": sig = math.sqrt(1 / (2 * gamma)) self._laplacian = ht.graph.Laplacian( lambda x: ht.spatial.rbf(x, sigma=sig, quadratic_expansion=True), definition="norm_sym", mode=laplacian, threshold_key=boundary, threshold_value=threshold, ) elif metric == "euclidean": self._laplacian = ht.graph.Laplacian( lambda x: ht.spatial.cdist(x, quadratic_expansion=True), definition="norm_sym", mode=laplacian, threshold_key=boundary, threshold_value=threshold, ) else: raise NotImplementedError("Other kernels currently not supported") if assign_labels == "kmeans": self._cluster = ht.cluster.KMeans(params) else: raise NotImplementedError( "Other Label Assignment Algorithms are currently not available" ) # in-place properties self._labels = None @property def labels_(self) -> DNDarray: """ Returns labels of each point. """ return self._labels
[docs] def _spectral_embedding(self, x: DNDarray) -> Tuple[DNDarray, DNDarray]: """ Helper function for dataset x embedding. Returns Tupel(Eigenvalues, Eigenvectors) of the graph's Laplacian matrix. Parameters ---------- x : DNDarray Sample Matrix for which the embedding should be calculated Notes ----- This will throw out the complex side of the eigenvalues found during this. """ L = self._laplacian.construct(x) if self.eigen_solver == "lanczos": # use Lanczos Algorithm: for Eigenvalue and -vector calculation v0 = ht.full( (L.shape[0],), fill_value=1.0 / math.sqrt(L.shape[0]), dtype=L.dtype, split=0, device=L.device, ) V, T = ht.lanczos(L, self.lanczos_n_iter, v0) # if int(torch.__version__.split(".")[1]) >= 9: try: # 4. Calculate and Sort Eigenvalues and Eigenvectors of tridiagonal matrix T eval, evec = torch.linalg.eig(T.larray) # If x is an Eigenvector of T, then y = V@x is the corresponding Eigenvector of L eval, idx = torch.sort(eval.real, dim=0) eigenvalues = ht.array(eval) eigenvectors = ht.matmul(V, ht.array(evec))[:, idx] return eigenvalues.real, eigenvectors.real except AttributeError: # torch version is less than 1.9.0 # 4. Calculate and Sort Eigenvalues and Eigenvectors of tridiagonal matrix T eval, evec = torch.eig(T.larray, eigenvectors=True) # If x is an Eigenvector of T, then y = V@x is the corresponding Eigenvector of L eval, idx = torch.sort(eval[:, 0], dim=0) eigenvalues = ht.array(eval) eigenvectors = ht.matmul(V, ht.array(evec))[:, idx] return eigenvalues, eigenvectors else: # use randomized eigenvalue decomposition with the chosen arguments eigenvalues, eigenvectors = ht.linalg.reigh( L, self.reigh_rank, n_oversamples=self.reigh_n_oversamples, power_iter=self.reigh_power_iter, ) return eigenvalues, eigenvectors
[docs] def fit(self, x: DNDarray): """ Clusters dataset X via spectral embedding. Computes the low-dim representation by calculation of eigenspectrum (eigenvalues and eigenvectors) of the graph laplacian from the similarity matrix and fits the eigenvectors that correspond to the k lowest eigenvalues with a seperate clustering algorithm (currently only kmeans is supported). Similarity metrics for adjacency calculations are supported via spatial.distance. The eigenvalues and eigenvectors are computed by reducing the Laplacian via lanczos iterations and using the torch eigenvalue solver on this smaller matrix. If other eigenvalue decompostion methods are supported, this will be expanded. Parameters ---------- x : DNDarray Training instances to cluster. Shape = (n_samples, n_features) """ # 1. input sanitation if not isinstance(x, DNDarray): raise ValueError(f"input needs to be a ht.DNDarray, but was {type(x)}") if x.split is not None and x.split != 0: raise NotImplementedError("Not implemented for other splitting-axes") # 2. Embed Dataset into lower-dimensional Eigenvector space eigenvalues, eigenvectors = self._spectral_embedding(x) # 3. Find the spectral gap, if number of clusters is not defined from the outside if self.n_clusters is None: diff = eigenvalues[1:] - eigenvalues[:-1] tmp = ht.argmax(diff).item() self.n_clusters = tmp + 1 components = eigenvectors[:, : self.n_clusters].copy() params = self._cluster.get_params() params["n_clusters"] = self.n_clusters self._cluster.set_params(**params) self._cluster.fit(components) self._labels = self._cluster.labels_ self._cluster_centers = self._cluster.cluster_centers_ return self
[docs] def predict(self, x: DNDarray) -> DNDarray: """ Return the label each sample in X belongs to. X is transformed to the low-dim representation by calculation of eigenspectrum (eigenvalues and eigenvectors) of the graph laplacian from the similarity matrix. Inference of lables is done by extraction of the closest centroid of the n_clusters eigenvectors from the previously fitted clustering algorithm (kmeans). Parameters ---------- x : DNDarray New data to predict. Shape = (n_samples, n_features) Warning ------- Caution: Calculation of the low-dim representation requires some time! """ # input sanitation if not isinstance(x, DNDarray): raise ValueError(f"input needs to be a ht.DNDarray, but was {type(x)}") if x.split is not None and x.split != 0: raise NotImplementedError("Not implemented for other splitting-axes") _, eigenvectors = self._spectral_embedding(x) components = eigenvectors[:, : self.n_clusters].copy() return self._cluster.predict(components)