"""
Module for Spectral Clustering, a graph-based machine learning algorithm
"""
import heat as ht
import math
import torch
from typing import Tuple
from heat.core.dndarray import DNDarray
[docs]
class Spectral(ht.ClusteringMixin, ht.BaseEstimator):
"""
Spectral clustering
Attributes
----------
n_clusters : int
Number of clusters to fit
gamma : float
Kernel coefficient sigma for 'rbf', ignored for metric='euclidean'
metric : string
How to construct the similarity matrix.
- 'rbf' : construct the similarity matrix using a radial basis function (RBF) kernel.
- 'euclidean' : construct the similarity matrix as only euclidean distance.
laplacian : str
How to calculate the graph laplacian (affinity)
Currently supported : 'fully_connected', 'eNeighbour'
threshold : float
Threshold for affinity matrix if laplacian='eNeighbour'
Ignorded for laplacian='fully_connected'
boundary : str
How to interpret threshold: 'upper', 'lower'
Ignorded for laplacian='fully_connected'
eigen_solver : str
The eigenvalue decomposition strategy to use.
- 'lanczos' : Use Lanczos iterations to reduce the Laplacian matrix size before applying the torch eigenvalue solver.
- 'randomized' : Use a randomized algorithm to compute the approximate eigenvalues and eigenvectors.
reigh_rank : int
number of samples for randomized eigenvalue decomposition. Only used if eigen_solver='randomized'.
It must hold reigh_rank >= n_clusters. If n_clusters is None (automatic selection of number of clusters),
reigh_rank gives an upper bound on the number of clusters that can be found. Therefore, reigh_rank should
be set high enough to capture the expected number of clusters in that case.
reigh_n_oversamples : int
number of oversamples for randomized eigenvalue decomposition. Only used if eigen_solver='randomized'. Default is 10.
reigh_power_iter : int
number of power iterations for randomized eigenvalue decomposition. Only used if eigen_solver='randomized'. Default is 0.
Consider increasing this value if the eigen-spectrum of the Laplacian decays slowly.
lanczos_n_iter : int
number of Lanczos iterations for Eigenvalue decomposition. Only used if eigen_solver='lanczos'. Default is 300.
assign_labels: str
The strategy to use to assign labels in the embedding space.
**params: dict
Parameter dictionary for the assign_labels estimator
"""
def __init__(
self,
n_clusters: int = None,
gamma: float = 1.0,
metric: str = "rbf",
laplacian: str = "fully_connected",
threshold: float = 1.0,
boundary: str = "upper",
eigen_solver: str = "randomized",
reigh_rank: int = 100,
reigh_n_oversamples: int = 10,
reigh_power_iter: int = 0,
lanczos_n_iter: int = 300,
assign_labels: str = "kmeans",
**params,
):
self.n_clusters = n_clusters
self.gamma = gamma
self.metric = metric
self.laplacian = laplacian
self.threshold = threshold
self.boundary = boundary
self.lanczos_n_iter = lanczos_n_iter
self.assign_labels = assign_labels
self.eigen_solver = eigen_solver
self.reigh_n_oversamples = reigh_n_oversamples
self.reigh_power_iter = reigh_power_iter
self.reigh_rank = reigh_rank
if eigen_solver not in ["lanczos", "randomized"]:
raise NotImplementedError(
f"Currently only 'lanczos' and 'randomized' eigen_solver are supported, but got '{eigen_solver}' as input."
)
if eigen_solver == "randomized" and reigh_rank < (
n_clusters if n_clusters is not None else 1
):
raise ValueError("reigh_rank must be at least equal to n_clusters")
if metric == "rbf":
sig = math.sqrt(1 / (2 * gamma))
self._laplacian = ht.graph.Laplacian(
lambda x: ht.spatial.rbf(x, sigma=sig, quadratic_expansion=True),
definition="norm_sym",
mode=laplacian,
threshold_key=boundary,
threshold_value=threshold,
)
elif metric == "euclidean":
self._laplacian = ht.graph.Laplacian(
lambda x: ht.spatial.cdist(x, quadratic_expansion=True),
definition="norm_sym",
mode=laplacian,
threshold_key=boundary,
threshold_value=threshold,
)
else:
raise NotImplementedError("Other kernels currently not supported")
if assign_labels == "kmeans":
self._cluster = ht.cluster.KMeans(params)
else:
raise NotImplementedError(
"Other Label Assignment Algorithms are currently not available"
)
# in-place properties
self._labels = None
@property
def labels_(self) -> DNDarray:
"""
Returns labels of each point.
"""
return self._labels
[docs]
def _spectral_embedding(self, x: DNDarray) -> Tuple[DNDarray, DNDarray]:
"""
Helper function for dataset x embedding.
Returns Tupel(Eigenvalues, Eigenvectors) of the graph's Laplacian matrix.
Parameters
----------
x : DNDarray
Sample Matrix for which the embedding should be calculated
Notes
-----
This will throw out the complex side of the eigenvalues found during this.
"""
L = self._laplacian.construct(x)
if self.eigen_solver == "lanczos":
# use Lanczos Algorithm: for Eigenvalue and -vector calculation
v0 = ht.full(
(L.shape[0],),
fill_value=1.0 / math.sqrt(L.shape[0]),
dtype=L.dtype,
split=0,
device=L.device,
)
V, T = ht.lanczos(L, self.lanczos_n_iter, v0)
# if int(torch.__version__.split(".")[1]) >= 9:
try:
# 4. Calculate and Sort Eigenvalues and Eigenvectors of tridiagonal matrix T
eval, evec = torch.linalg.eig(T.larray)
# If x is an Eigenvector of T, then y = V@x is the corresponding Eigenvector of L
eval, idx = torch.sort(eval.real, dim=0)
eigenvalues = ht.array(eval)
eigenvectors = ht.matmul(V, ht.array(evec))[:, idx]
return eigenvalues.real, eigenvectors.real
except AttributeError: # torch version is less than 1.9.0
# 4. Calculate and Sort Eigenvalues and Eigenvectors of tridiagonal matrix T
eval, evec = torch.eig(T.larray, eigenvectors=True)
# If x is an Eigenvector of T, then y = V@x is the corresponding Eigenvector of L
eval, idx = torch.sort(eval[:, 0], dim=0)
eigenvalues = ht.array(eval)
eigenvectors = ht.matmul(V, ht.array(evec))[:, idx]
return eigenvalues, eigenvectors
else:
# use randomized eigenvalue decomposition with the chosen arguments
eigenvalues, eigenvectors = ht.linalg.reigh(
L,
self.reigh_rank,
n_oversamples=self.reigh_n_oversamples,
power_iter=self.reigh_power_iter,
)
return eigenvalues, eigenvectors
[docs]
def fit(self, x: DNDarray):
"""
Clusters dataset X via spectral embedding.
Computes the low-dim representation by calculation of eigenspectrum (eigenvalues and eigenvectors) of the graph
laplacian from the similarity matrix and fits the eigenvectors that correspond to the k lowest eigenvalues with
a seperate clustering algorithm (currently only kmeans is supported). Similarity metrics for adjacency
calculations are supported via spatial.distance. The eigenvalues and eigenvectors are computed by reducing the
Laplacian via lanczos iterations and using the torch eigenvalue solver on this smaller matrix. If other
eigenvalue decompostion methods are supported, this will be expanded.
Parameters
----------
x : DNDarray
Training instances to cluster. Shape = (n_samples, n_features)
"""
# 1. input sanitation
if not isinstance(x, DNDarray):
raise ValueError(f"input needs to be a ht.DNDarray, but was {type(x)}")
if x.split is not None and x.split != 0:
raise NotImplementedError("Not implemented for other splitting-axes")
# 2. Embed Dataset into lower-dimensional Eigenvector space
eigenvalues, eigenvectors = self._spectral_embedding(x)
# 3. Find the spectral gap, if number of clusters is not defined from the outside
if self.n_clusters is None:
diff = eigenvalues[1:] - eigenvalues[:-1]
tmp = ht.argmax(diff).item()
self.n_clusters = tmp + 1
components = eigenvectors[:, : self.n_clusters].copy()
params = self._cluster.get_params()
params["n_clusters"] = self.n_clusters
self._cluster.set_params(**params)
self._cluster.fit(components)
self._labels = self._cluster.labels_
self._cluster_centers = self._cluster.cluster_centers_
return self
[docs]
def predict(self, x: DNDarray) -> DNDarray:
"""
Return the label each sample in X belongs to.
X is transformed to the low-dim representation by calculation of eigenspectrum (eigenvalues and eigenvectors) of
the graph laplacian from the similarity matrix. Inference of lables is done by extraction of the closest
centroid of the n_clusters eigenvectors from the previously fitted clustering algorithm (kmeans).
Parameters
----------
x : DNDarray
New data to predict. Shape = (n_samples, n_features)
Warning
-------
Caution: Calculation of the low-dim representation requires some time!
"""
# input sanitation
if not isinstance(x, DNDarray):
raise ValueError(f"input needs to be a ht.DNDarray, but was {type(x)}")
if x.split is not None and x.split != 0:
raise NotImplementedError("Not implemented for other splitting-axes")
_, eigenvectors = self._spectral_embedding(x)
components = eigenvectors[:, : self.n_clusters].copy()
return self._cluster.predict(components)