Source code for heat.sparse.factories

"""Provides high-level DCSR_matrix initialization functions"""

import torch
import numpy as np
from scipy.sparse import csr_matrix as scipy_csr_matrix
from scipy.sparse import csc_matrix as scipy_csc_matrix

from typing import Optional, Type, Iterable
import warnings

from ..core import devices
from ..core import types
from ..core.communication import MPI, sanitize_comm, Communication
from ..core.devices import Device
from ..core.types import datatype

from .dcsx_matrix import DCSC_matrix, DCSR_matrix, __DCSX_matrix

__all__ = [
    "sparse_csr_matrix",
    "sparse_csc_matrix",
]


[docs] def sparse_csr_matrix( obj: Iterable, dtype: Optional[Type[datatype]] = None, split: Optional[int] = None, is_split: Optional[int] = None, device: Optional[Device] = None, comm: Optional[Communication] = None, ) -> DCSR_matrix: """ Create a :class:`~heat.sparse.DCSR_matrix`. Parameters ---------- obj : array_like A tensor or array, any object exposing the array interface, an object whose ``__array__`` method returns an array, or any (nested) sequence. Sparse tensor that needs to be distributed. dtype : datatype, optional The desired data-type for the sparse matrix. If not given, then the type will be determined as the minimum type required to hold the objects in the sequence. This argument can only be used to ‘upcast’ the array. For downcasting, use the :func:`~heat.sparse.DCSR_matrix.astype` method. split : int or None, optional The axis along which the passed array content ``obj`` is split and distributed in memory. DCSR_matrix only supports distribution along axis 0. Mutually exclusive with ``is_split``. is_split : int or None, optional Specifies the axis along which the local data portions, passed in obj, are split across all machines. DCSR_matrix only supports distribution along axis 0. Useful for interfacing with other distributed-memory code. The shape of the global array is automatically inferred. Mutually exclusive with ``split``. device : str or Device, optional Specifies the :class:`~heat.core.devices.Device` the array shall be allocated on (i.e. globally set default device). comm : Communication, optional Handle to the nodes holding distributed array chunks. Raises ------ ValueError If split and is_split parameters are not one of 0 or None. Examples -------- Create a :class:`~heat.sparse.DCSR_matrix` from :class:`torch.Tensor` (layout ==> torch.sparse_csr) >>> indptr = torch.tensor([0, 2, 3, 6]) >>> indices = torch.tensor([0, 2, 2, 0, 1, 2]) >>> data = torch.tensor([1, 2, 3, 4, 5, 6], dtype=torch.float) >>> torch_sparse_csr = torch.sparse_csr_tensor(indptr, indices, data) >>> heat_sparse_csr = ht.sparse.sparse_csr_matrix(torch_sparse_csr, split=0) >>> heat_sparse_csr (indptr: tensor([0, 2, 3, 6]), indices: tensor([0, 2, 2, 0, 1, 2]), data: tensor([1., 2., 3., 4., 5., 6.]), dtype=ht.float32, device=cpu:0, split=0) Create a :class:`~heat.sparse.DCSR_matrix` from :class:`scipy.sparse.csr_matrix` >>> scipy_sparse_csr = scipy.sparse.csr_matrix((data, indices, indptr)) >>> heat_sparse_csr = ht.sparse.sparse_csr_matrix(scipy_sparse_csr, split=0) >>> heat_sparse_csr (indptr: tensor([0, 2, 3, 6], dtype=torch.int32), indices: tensor([0, 2, 2, 0, 1, 2], dtype=torch.int32), data: tensor([1., 2., 3., 4., 5., 6.]), dtype=ht.float32, device=cpu:0, split=0) Create a :class:`~heat.sparse.DCSR_matrix` using data that is already distributed (with `is_split`) >>> indptrs = [torch.tensor([0, 2, 3]), torch.tensor([0, 3])] >>> indices = [torch.tensor([0, 2, 2]), torch.tensor([0, 1, 2])] >>> data = [torch.tensor([1, 2, 3], dtype=torch.float), torch.tensor([4, 5, 6], dtype=torch.float)] >>> rank = ht.MPI_WORLD.rank >>> local_indptr = indptrs[rank] >>> local_indices = indices[rank] >>> local_data = data[rank] >>> local_torch_sparse_csr = torch.sparse_csr_tensor(local_indptr, local_indices, local_data) >>> heat_sparse_csr = ht.sparse.sparse_csr_matrix(local_torch_sparse_csr, is_split=0) >>> heat_sparse_csr (indptr: tensor([0, 2, 3, 6]), indices: tensor([0, 2, 2, 0, 1, 2]), data: tensor([1., 2., 3., 4., 5., 6.]), dtype=ht.float32, device=cpu:0, split=0) Create a :class:`~heat.sparse.DCSR_matrix` from List >>> ht.sparse.sparse_csr_matrix([[0, 0, 1], [1, 0, 2], [0, 0, 3]]) (indptr: tensor([0, 1, 3, 4]), indices: tensor([2, 0, 2, 2]), data: tensor([1, 1, 2, 3]), dtype=ht.int64, device=cpu:0, split=None) """ return __sparse_matrix(obj, dtype, split, is_split, device, comm, orientation="row")
[docs] def sparse_csc_matrix( obj: Iterable, dtype: Optional[Type[datatype]] = None, split: Optional[int] = None, is_split: Optional[int] = None, device: Optional[Device] = None, comm: Optional[Communication] = None, ) -> DCSC_matrix: """ Create a :class:`~heat.sparse.DCSC_matrix`. Parameters ---------- obj : array_like A tensor or array, any object exposing the array interface, an object whose ``__array__`` method returns an array, or any (nested) sequence. Sparse tensor that needs to be distributed. dtype : datatype, optional The desired data-type for the sparse matrix. If not given, then the type will be determined as the minimum type required to hold the objects in the sequence. This argument can only be used to ‘upcast’ the array. For downcasting, use the :func:`~heat.sparse.DCSC_matrix.astype` method. split : int or None, optional The axis along which the passed array content ``obj`` is split and distributed in memory. DCSC_matrix only supports distribution along axis 1. Mutually exclusive with ``is_split``. is_split : int or None, optional Specifies the axis along which the local data portions, passed in obj, are split across all machines. DCSC_matrix only supports distribution along axis 1. Useful for interfacing with other distributed-memory code. The shape of the global array is automatically inferred. Mutually exclusive with ``split``. device : str or Device, optional Specifies the :class:`~heat.core.devices.Device` the array shall be allocated on (i.e. globally set default device). comm : Communication, optional Handle to the nodes holding distributed array chunks. Raises ------ ValueError If split and is_split parameters are not one of 1 or None. Examples -------- Create a :class:`~heat.sparse.DCSC_matrix` from :class:`torch.Tensor` (layout ==> torch.sparse_csc) >>> indptr = torch.tensor([0, 2, 3, 6]) >>> indices = torch.tensor([0, 2, 2, 0, 1, 2]) >>> data = torch.tensor([1.0, 4.0, 5.0, 2.0, 3.0, 6.0], dtype=torch.float) >>> torch_sparse_csc = torch.sparse_csc_tensor(indptr, indices, data) >>> heat_sparse_csc = ht.sparse.sparse_csc_matrix(torch_sparse_csc, split=1) >>> heat_sparse_csc (indptr: tensor([0, 2, 3, 6]), indices: tensor([0, 2, 2, 0, 1, 2]), data: tensor([1., 4., 5., 2., 3., 6.]), dtype=ht.float32, device=cpu:0, split=1) Create a :class:`~heat.sparse.DCSC_matrix` from :class:`scipy.sparse.csc_matrix` >>> scipy_sparse_csc = scipy.sparse.csc_matrix((data, indices, indptr)) >>> heat_sparse_csc = ht.sparse.sparse_csc_matrix(scipy_sparse_csc, split=1) >>> heat_sparse_csc (indptr: tensor([0, 2, 3, 6], dtype=torch.int32), indices: tensor([0, 2, 2, 0, 1, 2], dtype=torch.int32), data: tensor([1., 4., 5., 2., 3., 6.]), dtype=ht.float32, device=cpu:0, split=1) Create a :class:`~heat.sparse.DCSC_matrix` using data that is already distributed (with `is_split`) >>> indptrs = [torch.tensor([0, 2, 3]), torch.tensor([0, 3])] >>> indices = [torch.tensor([0, 2, 2]), torch.tensor([0, 1, 2])] >>> data = [torch.tensor([1, 2, 3], dtype=torch.float), torch.tensor([4, 5, 6], dtype=torch.float)] >>> rank = ht.MPI_WORLD.rank >>> local_indptr = indptrs[rank] >>> local_indices = indices[rank] >>> local_data = data[rank] >>> local_torch_sparse_csr = torch.sparse_csr_tensor(local_indptr, local_indices, local_data) >>> heat_sparse_csr = ht.sparse.sparse_csr_matrix(local_torch_sparse_csr, is_split=0) >>> heat_sparse_csr (indptr: tensor([0, 2, 3, 6]), indices: tensor([0, 2, 2, 0, 1, 2]), data: tensor([1., 2., 3., 4., 5., 6.]), dtype=ht.float32, device=cpu:0, split=1) Create a :class:`~heat.sparse.DCSC_matrix` from List >>> ht.sparse.sparse_csc_matrix([[0, 0, 1], [1, 0, 2], [0, 0, 3]]) (indptr: tensor([0, 1, 1, 4]), indices: tensor([1, 0, 1, 2]), data: tensor([1, 1, 2, 3]), dtype=ht.int64, device=cpu:0, split=None) """ return __sparse_matrix(obj, dtype, split, is_split, device, comm, orientation="col")
def __sparse_matrix( obj: Iterable, dtype: Optional[Type[datatype]] = None, split: Optional[int] = None, is_split: Optional[int] = None, device: Optional[Device] = None, comm: Optional[Communication] = None, orientation: str = "row", ) -> __DCSX_matrix: """ Create a :class:`~heat.sparse.__DCSX_matrix`. This is a common method for converting a distributed array to a sparse matrix representation. Raises ------ ValueError If the orientation is not ``'row'`` or ``'col'``. ValueError If the number of dimensions of the input array is not 2. ValueError If the split or is_split axis is not supported for the type. """ # version check if int(torch.__version__.split(".")[0]) <= 1 and int(torch.__version__.split(".")[1]) < 10: raise RuntimeError(f"ht.sparse requires torch >= 1.10. Found version {torch.__version__}.") # sanitize the data type if dtype is not None: dtype = types.canonical_heat_type(dtype) # sanitize device if device is not None: device = devices.sanitize_device(device) if orientation not in ["row", "col"]: raise ValueError(f"Invalid orientation: '{orientation}'. Options: 'row' or 'col'") torch_class = torch.sparse_csr_tensor if orientation == "row" else torch.sparse_csc_tensor # Convert input into torch.Tensor (layout ==> torch.sparse_csr) # TODO: Check if conversion works across types if isinstance(obj, (scipy_csr_matrix, scipy_csc_matrix)): obj = torch_class( obj.indptr, obj.indices, obj.data, device=device.torch_device if device is not None else devices.get_device().torch_device, size=obj.shape, ) if not isinstance(obj, torch.Tensor): try: obj = torch.tensor( obj, device=( device.torch_device if device is not None else devices.get_device().torch_device ), ) except RuntimeError: raise TypeError(f"Invalid data of type {type(obj)}") if obj.ndim != 2: raise ValueError(f"The number of dimensions must be 2, found {str(obj.ndim)}") torch_layout = torch.sparse_csr if orientation == "row" else torch.sparse_csc if obj.layout != torch_layout: if torch_layout == torch.sparse_csr: obj = obj.to_sparse_csr() else: obj = obj.to_sparse_csc() # infer dtype from obj if not explicitly given if dtype is None: dtype = types.canonical_heat_type(obj.dtype) else: torch_dtype = dtype.torch_type() if obj.dtype != torch_dtype: obj = obj.type(torch_dtype) # infer device from obj if not explicitly given if device is None: device = devices.sanitize_device(obj.device.type) if str(obj.device) != device.torch_device: warnings.warn( f"Array 'obj' is not on device '{device}'. It will be moved to it.", UserWarning, ) obj = obj.to(device.torch_device) comm = sanitize_comm(comm) gshape = tuple(obj.shape) lshape = gshape gnnz = obj.values().shape[0] compressed_indices = obj.crow_indices() if orientation == "row" else obj.ccol_indices() element_indices = obj.col_indices() if orientation == "row" else obj.row_indices() cls = DCSR_matrix if orientation == "row" else DCSC_matrix if (split == 0 and orientation == "row") or (split == 1 and orientation == "col"): start, end = comm.chunk(gshape, split, sparse=True) # Find the starting and ending indices for # element_indices and values tensors for this process indices_start = compressed_indices[start] indices_end = compressed_indices[end] # Slice the data belonging to this process data = obj.values()[indices_start:indices_end] # start:(end + 1) because indptr is of size (n + 1) for array with n rows indptr = compressed_indices[start : end + 1] indices = element_indices[indices_start:indices_end] indptr = indptr - indptr[0] lshape = list(lshape) lshape[split] = end - start lshape = tuple(lshape) elif split is not None: raise ValueError(f"Split axis {split} not supported for class {cls.__name__}") elif (is_split == 0 and orientation == "row") or (is_split == 1 and orientation == "col"): # Check whether the distributed data matches in # all dimensions other than axis 0 neighbour_shape = np.array(gshape) lshape = np.array(lshape) if comm.rank < comm.size - 1: comm.Isend(lshape, dest=comm.rank + 1) if comm.rank != 0: # Dont have to check whether the number of dimensions are same since # both torch.sparse_csr_tensor and scipy.sparse.csr_matrix are 2D only # check whether the individual shape elements match comm.Recv(neighbour_shape, source=comm.rank - 1) for i in range(len(lshape)): if i == is_split: continue elif lshape[i] != neighbour_shape[i]: neighbour_shape[is_split] = np.iinfo(neighbour_shape.dtype).min lshape = tuple(lshape) # sum up the elements along the split dimension reduction_buffer = np.array(neighbour_shape[is_split]) # To check if any process has found that its neighbour # does not match with itself in shape comm.Allreduce(MPI.IN_PLACE, reduction_buffer, MPI.MIN) if reduction_buffer < 0: raise ValueError( f"Unable to construct {cls.__name__}. Local data slices have inconsistent shapes or dimensions." ) data = obj.values() indptr = compressed_indices indices = element_indices # Calculate gshape gshape_split = torch.tensor(gshape[is_split]) comm.Allreduce(MPI.IN_PLACE, gshape_split, MPI.SUM) gshape = list(gshape) gshape[is_split] = gshape_split.item() gshape = tuple(gshape) # Calculate gnnz lnnz = data.shape[0] gnnz_buffer = torch.tensor(lnnz) comm.Allreduce(MPI.IN_PLACE, gnnz_buffer, MPI.SUM) gnnz = gnnz_buffer.item() split = is_split elif is_split is not None: raise ValueError(f"Split axis {is_split} not supported for class {cls.__name__}") else: # split is None and is_split is None data = obj.values() indptr = compressed_indices indices = element_indices sparse_array = torch_class( indptr.to(torch.int64), indices.to(torch.int64), data, size=lshape, dtype=dtype.torch_type(), device=device.torch_device, ) return cls( array=sparse_array, gnnz=gnnz, gshape=gshape, dtype=dtype, split=split, device=device, comm=comm, balanced=True, )