"""Module for TestCases used for heat"""
import os
import platform
import unittest
import tempfile
import numpy as np
import torch
from typing import Optional, Callable, Any, Union
import heat as ht
from heat.core import MPI, MPICommunication, dndarray, factories, types, Device
from heat.core.random import seed
# TODO adapt for GPU once this is working properly
[docs]
class TestCase(unittest.TestCase):
"""Helper functions for unit tests"""
__comm = MPICommunication()
device: Device = ht.cpu
_hostnames: Optional[list[str]] = None
other_device: Optional[Device] = None
envar: Optional[str] = None
[docs]
def setUp(self) -> None:
"""Sets initial RNG seed for testing"""
seed(42)
[docs]
@classmethod
def setUpClass(cls) -> None:
"""
Read the environment variable 'HEAT_TEST_USE_DEVICE' and return the requested devices.
Supported values
- cpu: Use CPU only (default)
- gpu: Use GPU only
Raises
------
RuntimeError if value of 'HEAT_TEST_USE_DEVICE' is not recognized
"""
envar = os.getenv("HEAT_TEST_USE_DEVICE", "cpu")
is_mps = False
if envar == "cpu":
ht.use_device("cpu")
ht_device = ht.cpu
other_device = ht.cpu
if torch.cuda.is_available():
torch.cuda.set_device(torch.device(ht.gpu.torch_device))
other_device = ht.gpu
elif torch.backends.mps.is_built() and torch.backends.mps.is_available():
other_device = ht.gpu
elif envar == "gpu":
if torch.cuda.is_available():
ht.use_device("gpu")
torch.cuda.set_device(torch.device(ht.gpu.torch_device))
ht_device = ht.gpu
other_device = ht.cpu
elif torch.backends.mps.is_built() and torch.backends.mps.is_available():
ht.use_device("gpu")
ht_device = ht.gpu
other_device = ht.cpu
is_mps = True
else:
raise RuntimeError(
f"Value '{envar}' of environment variable 'HEAT_TEST_USE_DEVICE' is unsupported"
)
cls.device, cls.other_device, cls.envar, cls.is_mps = ht_device, other_device, envar, is_mps
@property
def comm(self) -> MPICommunication:
"""Returns the MPI communicator"""
return self.__comm
[docs]
def get_rank(self) -> Optional[int]:
"""Returns the MPI rank"""
return self.comm.rank
[docs]
def get_size(self) -> Optional[int]:
"""Returns the MPI size"""
return self.comm.size
[docs]
@classmethod
def get_hostnames(cls) -> list[str]:
"""Returns the name of the host machine(s)."""
if not cls._hostnames:
if platform.system() == "Windows":
host = platform.uname().node
else:
host = os.uname()[1]
cls._hostnames = list(set(cls.__comm.handle.allgather(host)))
return cls._hostnames
[docs]
def assert_array_equal(
self,
heat_array: ht.DNDarray,
expected_array: Union[np.ndarray, torch.Tensor],
rtol: float = 1e-5,
atol: float = 1e-08,
) -> None:
"""
Check if the heat_array is equivalent to the expected_array. Therefore first the split heat_array is compared to
the corresponding expected_array slice locally and second the heat_array is combined and fully compared with the
expected_array.
Note if the heat array is split it also needs to be balanced.
Parameters
----------
heat_array: heat.DNDarray
The heat array which should be checked.
expected_array: numpy.ndarray or torch.Tensor
The array against which the heat_array should be checked.
rtol: float
The relative tolerance parameter.
atol: float
The absolute tolerance parameter.
Raises
------
AssertionError if the arrays do not equal.
Examples
--------
>>> import numpy as np
>>> import heat as ht
>>> a = ht.ones((5, 5), split=1, dtype=ht.int32)
>>> b = np.ones((5, 5), dtype=np.int32)
>>> self.assert_array_equal(a, b)
>>> c = np.ones((5, 5), dtype=np.int64)
>>> self.assert_array_equal(a, c)
AssertionError: [...]
>>> c = np.zeros((5, 5), dtype=np.int32)
>>> self.assert_array_equal(a, c)
AssertionError: [...]
"""
self._comm = heat_array.comm
if isinstance(expected_array, torch.Tensor):
# Does not work because heat sets an index while torch does not
# self.assertEqual(expected_array.device, torch.device(heat_array.device.torch_device))
expected_array = expected_array.cpu().numpy()
self.assertIsInstance(
heat_array,
dndarray.DNDarray,
f"The array to test was not a instance of ht.DNDarray. Instead got {type(heat_array)}.",
)
self.assertIsInstance(
expected_array,
np.ndarray,
f"The array to test against was not a instance of numpy.ndarray or torch.Tensor Instead got {type(expected_array)}.",
)
self.assertEqual(
heat_array.shape,
expected_array.shape,
f"Global shapes do not match. Got {heat_array.shape} expected {expected_array.shape}",
)
if not heat_array.is_balanced():
# Array is not distributed correctly
heat_array.balance_()
split = heat_array.split
offset, local_shape, slices = heat_array.comm.chunk(heat_array.gshape, split)
self.assertEqual(
heat_array.lshape,
expected_array[slices].shape,
f"Local shapes do not match. Got {heat_array.lshape} expected {expected_array[slices].shape}",
)
# compare local tensors to corresponding slice of expected_array
is_allclose = torch.tensor(
np.allclose(heat_array.larray.cpu(), expected_array[slices], atol=atol, rtol=rtol),
dtype=torch.int32,
)
heat_array.comm.Allreduce(MPI.IN_PLACE, is_allclose, MPI.SUM)
self.assertTrue(is_allclose == heat_array.comm.size)
[docs]
def assert_func_equal(
self,
shape: Union[tuple[Any, ...], list[Any]],
heat_func: Callable[..., Any],
numpy_func: Callable[..., Any],
distributed_result: bool = True,
heat_args: Optional[dict[str, Any]] = None,
numpy_args: Optional[dict[str, Any]] = None,
data_types: tuple[type, ...] = (np.int32, np.int64, np.float32, np.float64),
low: int = -10000,
high: int = 10000,
) -> None:
"""
Creates random tensors of the given shape with different data types.
All of these tensors will be tested with `ht.assert_func_equal_for_tensor`.
Parameters
----------
shape: tuple or list
The shape of which a random tensors will be created and tested against
heat_func: function
The function that is to be tested
numpy_func: function
The numpy implementation of an equivalent function to test against
heat_args: dictionary, optional
The keyword arguments that will be passed to the heat function. Array and split function don't need to be
specified. Default is {}.
numpy_args: dictionary, optional
The keyword arguments that will be passed to the numpy function. Array doesn't need to be specified.
Default is {}.
distributed_result: bool, optional
Specify whether the result of the heat function is distributed across all nodes or all nodes have the full
result. Default is True.
data_types: list of numpy dtypes, optional
Tensors with all of these dtypes will be created and tested. Each type must to be a numpy dtype.
Default is [numpy.int32, numpy.int64, numpy.float32, numpy.float64]
low: int, optional
In case one of the data_types has integer types, this is the lower bound for the random values.
Default is -10000
high: int, optional
In case one of the data_types has integer types, this is the upper bound for the random values.
Default is 10000
Raises
------
AssertionError if the functions do not perform equally.
Examples
--------
>>> import numpy as np
>>> import heat as ht
>>> self.assert_func_equal((2, 2), ht.exp, np.exp)
>>> self.assert_func_equal((2, 2), ht.exp, np.log)
AssertionError: [...]
>>> self.assert_func_equal((1, 3, 5), ht.any, np.any, distributed_result=False)
>>> heat_args = {"sorted": True, "axis": 0}
>>> numpy_args = {"axis": 0}
>>> self.assert_func_equal(
... [5, 5, 5, 5], ht.unique, np.unique, heat_arg=heat_args, numpy_args=numpy_args
... )
"""
if not isinstance(shape, tuple) and not isinstance(shape, list):
raise ValueError(f"The shape must be either a list or a tuple but was {type(shape)}")
if self.is_mps and np.float64 in data_types:
# MPS does not support float64
data_types = [dtype for dtype in data_types if dtype != np.float64]
for dtype in data_types:
tensor = self.__create_random_np_array(shape, dtype=dtype, low=low, high=high)
self.assert_func_equal_for_tensor(
tensor=tensor,
heat_func=heat_func,
numpy_func=numpy_func,
heat_args=heat_args,
numpy_args=numpy_args,
distributed_result=distributed_result,
)
[docs]
def assert_func_equal_for_tensor(
self,
tensor: Union[np.ndarray, torch.Tensor],
heat_func: Callable[..., Any],
numpy_func: Callable[..., Any],
heat_args: Optional[dict[str, Any]] = None,
numpy_args: Optional[dict[str, Any]] = None,
distributed_result: bool = True,
) -> None:
"""
Tests if the heat function and the numpy function create the equal result on the given tensor.
Parameters
----------
tensor: torch.Tensor or numpy.ndarray
The tensor on which the heat function will be executed.
heat_func: function
The function that is to be tested
numpy_func: function
The numpy implementation of an equivalent function to test against
heat_args: dictionary, optional
The keyword arguments that will be passed to the heat function. Array and split function don't need to be
specified. Default is {}.
numpy_args: dictionary, optional
The keyword arguments that will be passed to the numpy function. Array doesn't need to be specified.
Default is {}.
distributed_result: bool, optional
Specify whether the result of the heat function is distributed across all nodes or all nodes have the full
result. Default is True.
Raises
------
AssertionError if the functions to not perform equally.
Examples
--------
>>> import numpy as np
>>> import heat as ht
>>> a = np.arange(10)
>>> self.assert_func_equal_for_tensor(a, ht.exp, np.exp)
>>> self.assert_func_equal_for_tensor(a, ht.exp, np.log)
AssertionError: [...]
>>> self.assert_func_equal_for_tensor(a, ht.any, np.any, distributed_result=False)
>>> a = torch.ones([5, 5, 5, 5])
>>> heat_args = {"sorted": True, "axis": 0}
>>> numpy_args = {"axis": 0}
>>> self.assert_func_equal_for_tensor(
... a, ht.unique, np.unique, heat_arg=heat_args, numpy_args=numpy_args
... )
"""
self.assertTrue(callable(heat_func))
self.assertTrue(callable(numpy_func))
if heat_args is None:
heat_args = {}
if numpy_args is None:
numpy_args = {}
if isinstance(tensor, np.ndarray):
torch_tensor = torch.from_numpy(tensor.copy())
torch_tensor = torch_tensor.to(self.device.torch_device)
np_array = tensor
elif isinstance(tensor, torch.Tensor):
torch_tensor = tensor
np_array = tensor.cpu().numpy().copy()
else:
raise TypeError(
f"The input tensors type must be one of [tuple, list, numpy.ndarray, torch.tensor] but is {type(tensor)}"
)
dtype = types.canonical_heat_type(torch_tensor.dtype)
np_res = numpy_func(np_array, **numpy_args)
if not isinstance(np_res, np.ndarray):
np_res = np.array(np_res)
for i in range(len(tensor.shape)):
ht_array = factories.array(
torch_tensor, split=i, dtype=dtype, device=self.device, comm=self.comm
)
ht_res = heat_func(ht_array, **heat_args)
self.assertEqual(ht_array.device, ht_res.device)
self.assertEqual(ht_array.larray.device, ht_res.larray.device)
if distributed_result:
self.assert_array_equal(ht_res, np_res)
else:
self.assertTrue(np.array_equal(ht_res.larray.cpu().numpy(), np_res))
[docs]
def assertTrue_memory_layout(self, tensor: ht.DNDarray, order: str) -> None:
"""
Checks that the memory layout of a given heat tensor is as specified by argument order.
Parameters
----------
tensor: DNDarray
The input array.
order: str
'C' for C-like (row-major), 'F' for Fortran-like (column-major) memory layout.
"""
stride = tensor.larray.stride()
row_major = all(np.diff(list(stride)) <= 0)
column_major = all(np.diff(list(stride)) >= 0)
if order == "C":
return self.assertTrue(row_major)
elif order == "F":
return self.assertTrue(column_major)
else:
raise ValueError(f"expected order to be 'C' or 'F', but was {order}")
def __create_random_np_array(
self,
shape: Union[list[Any], tuple[Any]],
dtype: type = np.float32,
low: int = -10000,
high: int = 10000,
) -> np.ndarray:
"""
Creates a random array based on the input parameters.
The used seed will be printed to stdout for debugging purposes.
Parameters
----------
shape: list or tuple
The shape of the random array to be created.
dtype: np.dtype, optional
The datatype of the resulting array.
If dtype is subclass of np.floating then numpy.random.randn is used to create random values.
If dtype is subclass of np.integer then numpy.random.randint is called with the low and high argument to
create the random values.
Default is numpy.float64
low: int, optional
In case dtype is an integer type, this is the lower bound for the random values.
Default is -10000
high: int, optional
In case dtype is an integer type, this is the upper bound for the random values.
Default is 10000
Returns
-------
res: numpy.ndarray
An array of random values with the specified shape and dtype.
Raises
------
ValueError if the dtype is not a subtype of numpy.integer or numpy.floating
"""
seed = np.random.randint(1000000, size=(1,))
# print("using seed {} for random values".format(seed))
self.comm.Bcast(seed, root=0)
np.random.seed(seed=seed.item())
if issubclass(dtype, np.floating):
array: np.ndarray = np.random.randn(*shape)
elif issubclass(dtype, np.integer):
array = np.random.randint(low=low, high=high, size=shape)
else:
raise ValueError(
f"Unsupported dtype. Expected a subclass of `np.floating` or `np.integer` but got {dtype}"
)
array = array.astype(dtype)
return array
[docs]
def get_tmpdir(self):
"""
Create a temporary directory in the current working directory.
When running with multiple tasks, only rank 0 creates and destroys the directory, but the path is communicated across all ranks.
The directory is cleaned up / deleted when the variable tmpdir is garbage collected on rank 0.
Returns
-------
path : str
Path to the temporary directory
tmpdir : object or None
On rank 0: A Python object administering the temporary directory, on other ranks, None.
"""
if ht.MPI_WORLD.rank == 0:
tmpdir = tempfile.TemporaryDirectory(dir="./")
path = tmpdir.name
else:
tmpdir = None
path = None
path = ht.MPI_WORLD.bcast(path, root=0)
return path, tmpdir # need to return tmpdir here so that its not cleaned up at this point