Source code for heat.decomposition.dmd

"""
Module implementing the Dynamic Mode Decomposition (DMD) algorithm.
"""

import heat as ht
from typing import Optional, Union, List
import torch

try:
    from typing import Self
except ImportError:
    from typing_extensions import Self


[docs] def _torch_matrix_diag(diagonal): # auxiliary function to create a batch of diagonal matrices from a batch of diagonal vectors # source: fmassas comment on Oct 4, 2018 in https://github.com/pytorch/pytorch/issues/12160 [Accessed Oct 09, 2024] N = diagonal.shape[-1] shape = diagonal.shape[:-1] + (N, N) device, dtype = diagonal.device, diagonal.dtype result = torch.zeros(shape, dtype=dtype, device=device) indices = torch.arange(result.numel(), device=device).reshape(shape) indices = indices.diagonal(dim1=-2, dim2=-1) result.view(-1)[indices] = diagonal return result
[docs] class DMD(ht.RegressionMixin, ht.BaseEstimator): """ Dynamic Mode Decomposition (DMD), plain vanilla version with SVD-based implementation. The time series of which DMD shall be computed must be provided as a 2-D DNDarray of shape (n_features, n_timesteps). Please, note that this deviates from Heat's convention that data sets are handeled as 2-D arrays with the feature axis being the second axis. Parameters ---------- svd_solver : str, optional Specifies the algorithm to use for the singular value decomposition (SVD). Options are 'full' (default), 'hierarchical', and 'randomized'. svd_rank : int, optional The rank to which SVD shall be truncated. For `'full'` SVD, `svd_rank = None` together with `svd_tol = None` (default) will result in no truncation. For `svd_solver='full'`, at most one of `svd_rank` or `svd_tol` may be specified. For `svd_solver='hierarchical'`, either `svd_rank` (rank to truncate to) or `svd_tol` (tolerance to truncate to) must be specified. For `svd_solver='randomized'`, `svd_rank` must be specified and determines the the rank to truncate to. svd_tol : float, optional The tolerance to which SVD shall be truncated. For `'full'` SVD, `svd_tol = None` together with `svd_rank = None` (default) will result in no truncation. For `svd_solver='hierarchical'`, either `svd_tol` (accuracy to truncate to) or `svd_rank` (rank to truncate to) must be specified. For `svd_solver='randomized'`, `svd_tol` is meaningless and must be None. Attributes ---------- svd_solver : str The algorithm used for the singular value decomposition (SVD). svd_rank : int The rank to which SVD shall be truncated. svd_tol : float The tolerance to which SVD shall be truncated. rom_basis_ : DNDarray The reduced order model basis. rom_transfer_matrix_ : DNDarray The reduced order model transfer matrix. rom_eigenvalues_ : DNDarray The reduced order model eigenvalues. rom_eigenmodes_ : DNDarray The reduced order model eigenmodes ("DMD modes") Notes ----- We follow the "exact DMD" method as described in [1], Sect. 2.2. Please note that "rank" in the context of SVD always refers to the number of singular values/vectors to compute (i.e., "rank" refers to the mathematical rank of a matrix). This is completely different from the notion of "(MPI-)rank", i.e., the ID given to a process, in a parallel MPI-application. References ---------- [1] J. L. Proctor, S. L. Brunton, and J. N. Kutz, "Dynamic Mode Decomposition with Control," SIAM Journal on Applied Dynamical Systems, vol. 15, no. 1, pp. 142-161, 2016. """ def __init__( self, svd_solver: Optional[str] = "full", svd_rank: Optional[int] = None, svd_tol: Optional[float] = None, ): # Check if the specified SVD algorithm is valid if not isinstance(svd_solver, str): raise TypeError( f"Invalid type '{type(svd_solver)}' for 'svd_solver'. Must be a string." ) # check if the specified SVD algorithm is valid if svd_solver not in ["full", "hierarchical", "randomized"]: raise ValueError( f"Invalid SVD algorithm '{svd_solver}'. Must be one of 'full', 'hierarchical', 'randomized'." ) # check if the respective algorithm got the right combination of non-None parameters if svd_solver == "full" and svd_rank is not None and svd_tol is not None: raise ValueError( "For 'full' SVD, at most one of 'svd_rank' or 'svd_tol' may be specified." ) if svd_solver == "hierarchical": if svd_rank is None and svd_tol is None: raise ValueError( "For 'hierarchical' SVD, exactly one of 'svd_rank' or 'svd_tol' must be specified, but none of them is specified." ) if svd_rank is not None and svd_tol is not None: raise ValueError( "For 'hierarchical' SVD, exactly one of 'svd_rank' or 'svd_tol' must be specified, but currently both are specified." ) if svd_solver == "randomized": if svd_rank is None: raise ValueError("For 'randomized' SVD, 'svd_rank' must be specified.") if svd_tol is not None: raise ValueError("For 'randomized' SVD, 'svd_tol' must be None.") # check correct data types of non-None parameters if svd_rank is not None: if not isinstance(svd_rank, int): raise TypeError( f"Invalid type '{type(svd_rank)}' for 'svd_rank'. Must be an integer." ) if svd_rank < 1: raise ValueError( f"Invalid value '{svd_rank}' for 'svd_rank'. Must be a positive integer." ) if svd_tol is not None: if not isinstance(svd_tol, float): raise TypeError(f"Invalid type '{type(svd_tol)}' for 'svd_tol'. Must be a float.") if svd_tol <= 0: raise ValueError(f"Invalid value '{svd_tol}' for 'svd_tol'. Must be non-negative.") # set or initialize the attributes self.svd_solver = svd_solver self.svd_rank = svd_rank self.svd_tol = svd_tol self.rom_basis_ = None self.rom_transfer_matrix_ = None self.rom_eigenvalues_ = None self.rom_eigenmodes_ = None self.dmdmodes_ = None self.n_modes_ = None
[docs] def fit(self, X: ht.DNDarray) -> Self: """ Fits the DMD model to the given data. Parameters ---------- X : DNDarray The time series data to fit the DMD model to. Must be of shape (n_features, n_timesteps). """ ht.sanitize_in(X) # check if the input data is a 2-D DNDarray if X.ndim != 2: raise ValueError( f"Invalid shape '{X.shape}' for input data 'X'. Must be a 2-D DNDarray of shape (n_features, n_timesteps)." ) # check if the input data has at least two time steps if X.shape[1] < 2: raise ValueError( f"Invalid number of time steps '{X.shape[1]}' in input data 'X'. Must have at least two time steps." ) # first step of DMD: compute the SVD of the input data from first to second last time step if self.svd_solver == "full" or not X.is_distributed(): U, S, Vt = ht.linalg.svd( X[:, :-1] if X.split == 0 else X[:, :-1].balance(), full_matrices=False ) if self.svd_tol is not None: # truncation w.r.t. prescribed bound on explained variance # determine svd_rank accordingly total_variance = (S**2).sum() variance_threshold = (1 - self.svd_tol) * total_variance.larray.item() variance_cumsum = (S**2).larray.cumsum(0) self.n_modes_ = len(variance_cumsum[variance_cumsum <= variance_threshold]) + 1 elif self.svd_rank is not None: # truncation w.r.t. prescribed rank self.n_modes_ = self.svd_rank else: # no truncation self.n_modes_ = S.shape[0] self.rom_basis_ = U[:, : self.n_modes_] Vt = Vt[: self.n_modes_, :] S = S[: self.n_modes_] # compute SVD via "hierarchical" SVD elif self.svd_solver == "hierarchical": if self.svd_tol is not None: # hierarchical SVD with prescribed upper bound on relative error U, S, Vt, _ = ht.linalg.hsvd_rtol( X[:, :-1] if X.split == 0 else X[:, :-1].balance(), self.svd_tol, compute_sv=True, safetyshift=5, ) else: # hierarchical SVD with prescribed, fixed rank U, S, Vt, _ = ht.linalg.hsvd_rank( X[:, :-1] if X.split == 0 else X[:, :-1].balance(), self.svd_rank, compute_sv=True, safetyshift=5, ) self.rom_basis_ = U self.n_modes_ = U.shape[1] else: # compute SVD via "randomized" SVD U, S, Vt = ht.linalg.rsvd( X[:, :-1] if X.split == 0 else X[:, :-1].balance_(), self.svd_rank, ) self.rom_basis_ = U self.n_modes_ = U.shape[1] # second step of DMD: compute the reduced order model transfer matrix # we need to assume that the transfer matrix of the ROM is small enough to fit into memory of one process if X.split == 0 or X.split is None: # if split axis of the input data is 0, using X[:,1:] does not result in un-balancedness and corresponding problems in matmul self.rom_transfer_matrix_ = self.rom_basis_.T @ X[:, 1:] @ Vt.T / S else: # if input is split along columns, X[:,1:] will be un-balanced and cause problems in matmul Xplus = X[:, 1:] Xplus.balance_() self.rom_transfer_matrix_ = self.rom_basis_.T @ Xplus @ Vt.T / S self.rom_transfer_matrix_.resplit_(None) # third step of DMD: compute the reduced order model eigenvalues and eigenmodes eigvals_loc, eigvec_loc = torch.linalg.eig(self.rom_transfer_matrix_.larray) self.rom_eigenvalues_ = ht.array(eigvals_loc, split=None, device=X.device) self.rom_eigenmodes_ = ht.array(eigvec_loc, split=None, device=X.device) self.dmdmodes_ = self.rom_basis_ @ self.rom_eigenmodes_
[docs] def predict_next(self, X: ht.DNDarray, n_steps: int = 1) -> ht.DNDarray: """ Predicts and returns the state(s) after n_steps-many time steps for given a current state(s). Parameters ---------- X : DNDarray The current state(s) for the prediction. Must have the same number of features as the training data, but can be batched for multiple current states, i.e., X can be of shape (n_features,) or (n_features, n_current_states). The output will have the same shape as the input. n_steps : int, optional The number of steps to predict into the future. Default is 1, i.e., the next time step is predicted. """ if not isinstance(n_steps, int): raise TypeError(f"Invalid type '{type(n_steps)}' for 'n_steps'. Must be an integer.") if self.rom_basis_ is None: raise RuntimeError("Model has not been fitted yet. Call 'fit' first.") # sanitize input data ht.sanitize_in(X) # if X is a 1-D DNDarray, we add an artificial batch dimension if X.ndim == 1: X = X.expand_dims(1) # check if the input data has the right number of features if X.shape[0] != self.rom_basis_.shape[0]: raise ValueError( f"Invalid number of features '{X.shape[0]}' in input data 'X'. Must have the same number of features as the training data." ) rom_mat = self.rom_transfer_matrix_.copy() rom_mat.larray = torch.linalg.matrix_power(rom_mat.larray, n_steps) # the following line looks that complicated because we have to make sure that splits of the resulting matrices in # each of the products are split along the axis that deserves being splitted nextX = (self.rom_basis_.T @ X).T.resplit_(None) @ (self.rom_basis_ @ rom_mat).T return (nextX.T).squeeze()
[docs] def predict(self, X: ht.DNDarray, steps: Union[int, List[int]]) -> ht.DNDarray: """ Predics and returns future states given a current state(s) and returns them all as an array of size (n_steps, n_features). This function avoids a time-stepping loop (i.e., repeated calls to 'predict_next') and computes the future states in one go. To do so, the number of future times to predict must be of moderate size as an array of shape (n_steps, self.n_modes_, self.n_modes_) must fit into memory. Moreover, it must be ensured that: - the array of initial states is not split or split along the batch axis (axis 1) and the feature axis is small (i.e., self.rom_basis_ is not split) Parameters ---------- X : DNDarray The current state(s) for the prediction. Must have the same number of features as the training data, but can be batched for multiple current states, i.e., X can be of shape (n_features,) or (n_current_states, n_features). steps : int or List[int] if int: predictions at time step 0, 1, ..., steps-1 are computed if List[int]: predictions at time steps given in the list are computed """ if self.rom_basis_ is None: raise RuntimeError("Model has not been fitted yet. Call 'fit' first.") # sanitize input data ht.sanitize_in(X) # if X is a 1-D DNDarray, we add an artificial batch dimension if X.ndim == 1: X = X.expand_dims(1) # check if the input data has the right number of features if X.shape[0] != self.rom_basis_.shape[0]: raise ValueError( f"Invalid number of features '{X.shape[0]}' in input data 'X'. Must have the same number of features as the training data." ) if isinstance(steps, int): steps = torch.arange(steps, dtype=torch.int32, device=X.device.torch_device) elif isinstance(steps, list): steps = torch.tensor(steps, dtype=torch.int32, device=X.device.torch_device) else: raise TypeError( f"Invalid type '{type(steps)}' for 'steps'. Must be an integer or a list of integers." ) steps = steps.reshape(-1, 1).repeat(1, self.rom_eigenvalues_.shape[0]) X_rom = self.rom_basis_.T @ X transfer_mat = _torch_matrix_diag(torch.pow(self.rom_eigenvalues_.larray, steps)) transfer_mat = ( self.rom_eigenmodes_.larray @ transfer_mat @ self.rom_eigenmodes_.larray.inverse() ) transfer_mat = torch.real( transfer_mat ) # necessary to avoid imaginary parts due to numerical errors if self.rom_basis_.split is None and (X.split is None or X.split == 1): result = ( transfer_mat @ X_rom.larray ) # here we assume that X_rom is not split or split along the second axis (axis 1) del transfer_mat result = ( self.rom_basis_.larray @ result ) # here we assume that self.rom_basis_ is not split (i.e., the feature number is small) result = ht.array(result, is_split=2 if X.split == 1 else None) return result.squeeze().T else: raise NotImplementedError( "Predicting multiple time steps in one go is not supported for the given data layout. Please, use 'predict_next' instead, or open an issue on GitHub if you require this feature." )
[docs] def __str__(self): if ht.MPI_WORLD.rank == 0: if self.rom_basis_ is not None: return ( f"-------------------- DMD (Dynamic Mode Decomposition) --------------------\n" f"Number of modes: {self.n_modes_}\n" f"State space dimension: {self.rom_basis_.shape[0]}\n" f"DMD eigenvalues: {self.rom_eigenvalues_.larray}\n" f"--------------------------------------------------------------------------\n" f"ROM basis of shape {self.rom_basis_.shape}:\n" f"\t split axis: {self.rom_basis_.split}\n" f"\t device: {self.rom_basis_.device.__str__().split(':')[-2]}\n" f"--------------------------------------------------------------------------\n" ) else: return ( f"---------------- UNFITTED DMD (Dynamic Mode Decomposition) ---------------\n" f"Parameters for fit are as follows: \n" f"\t SVD-solver: {self.svd_solver}\n" f"\t SVD-rank: {self.svd_rank}\n" f"\t SVD-tolerance: {self.svd_tol}\n" f"--------------------------------------------------------------------------\n" ) else: return ""
[docs] class DMDc(ht.RegressionMixin, ht.BaseEstimator): """ Dynamic Mode Decomposition with Control (DMDc), plain vanilla version with SVD-based implementation. The time series of states and controls must be provided as 2-D DNDarrays of shapes (n_state_features, n_timesteps) and (n_control_features, n_timesteps), respectively. Please, note that this deviates from Heat's convention that data sets are handeled as 2-D arrays with the feature axis being the second axis. Parameters ---------- svd_solver : str, optional Specifies the algorithm to use for the singular value decomposition (SVD). Options are 'full' (default), 'hierarchical', and 'randomized'. svd_rank : int, optional The rank to which SVD of the states shall be truncated. For `'full'` SVD, `svd_rank = None` together with `svd_tol = None` (default) will result in no truncation. For `svd_solver='full'`, at most one of `svd_rank` or `svd_tol` may be specified. For `svd_solver='hierarchical'`, either `svd_rank` (rank to truncate to) or `svd_tol` (tolerance to truncate to) must be specified. For `svd_solver='randomized'`, `svd_rank` must be specified and determines the the rank to truncate to. svd_tol : float, optional The tolerance to which SVD of the states shall be truncated. For `'full'` SVD, `svd_tol = None` together with `svd_rank = None` (default) will result in no truncation. For `svd_solver='hierarchical'`, either `svd_tol` (accuracy to truncate to) or `svd_rank` (rank to truncate to) must be specified. For `svd_solver='randomized'`, `svd_tol` is meaningless and must be None. Attributes ---------- svd_solver : str The algorithm used for the singular value decomposition (SVD). svd_rank : int The rank to which SVD shall be truncated. svd_tol : float The tolerance to which SVD shall be truncated. rom_basis_ : DNDarray The reduced order model basis. rom_transfer_matrix_ : DNDarray The reduced order model transfer matrix. rom_control_matrix_ : DNDarray The reduced order model control matrix. rom_eigenvalues_ : DNDarray The reduced order model eigenvalues. rom_eigenmodes_ : DNDarray The reduced order model eigenmodes ("DMD modes") Notes ----- We follow the approach described in [1], Sects. 3.3 and 3.4. In the case that svd_rank is prescribed, the rank of the SVD of the full system matrix is set to svd_rank + n_control_features; cf. https://github.com/dynamicslab/pykoopman for the same approach. Please note that "rank" in the context of SVD always refers to the number of singular values/vectors to compute (i.e., "rank" refers to the mathematical rank of a matrix). This is completely different from the notion of "(MPI-)rank", i.e., the ID given to a process, in a parallel MPI-application. References ---------- [1] J. L. Proctor, S. L. Brunton, and J. N. Kutz, "Dynamic Mode Decomposition with Control," SIAM Journal on Applied Dynamical Systems, vol. 15, no. 1, pp. 142-161, 2016. """ def __init__( self, svd_solver: Optional[str] = "full", svd_rank: Optional[int] = None, svd_tol: Optional[float] = None, ): # Check if the specified SVD algorithm is valid if not isinstance(svd_solver, str): raise TypeError( f"Invalid type '{type(svd_solver)}' for 'svd_solver'. Must be a string." ) # check if the specified SVD algorithm is valid if svd_solver not in ["full", "hierarchical", "randomized"]: raise ValueError( f"Invalid SVD algorithm '{svd_solver}'. Must be one of 'full', 'hierarchical', 'randomized'." ) # check if the respective algorithm got the right combination of non-None parameters if svd_solver == "full" and svd_rank is not None and svd_tol is not None: raise ValueError( "For 'full' SVD, at most one of 'svd_rank' or 'svd_tol' may be specified." ) if svd_solver == "hierarchical": if svd_rank is None and svd_tol is None: raise ValueError( "For 'hierarchical' SVD, exactly one of 'svd_rank' or 'svd_tol' must be specified, but none of them is specified." ) if svd_rank is not None and svd_tol is not None: raise ValueError( "For 'hierarchical' SVD, exactly one of 'svd_rank' or 'svd_tol' must be specified, but currently both are specified." ) if svd_solver == "randomized": if svd_rank is None: raise ValueError("For 'randomized' SVD, 'svd_rank' must be specified.") if svd_tol is not None: raise ValueError("For 'randomized' SVD, 'svd_tol' must be None.") # check correct data types of non-None parameters if svd_rank is not None: if not isinstance(svd_rank, int): raise TypeError( f"Invalid type '{type(svd_rank)}' for 'svd_rank'. Must be an integer." ) if svd_rank < 1: raise ValueError( f"Invalid value '{svd_rank}' for 'svd_rank'. Must be a positive integer." ) if svd_tol is not None: if not isinstance(svd_tol, float): raise TypeError(f"Invalid type '{type(svd_tol)}' for 'svd_tol'. Must be a float.") if svd_tol <= 0: raise ValueError(f"Invalid value '{svd_tol}' for 'svd_tol'. Must be non-negative.") # set or initialize the attributes self.svd_solver = svd_solver self.svd_rank = svd_rank self.svd_tol = svd_tol self.rom_basis_ = None self.rom_transfer_matrix_ = None self.rom_control_matrix_ = None self.rom_eigenvalues_ = None self.rom_eigenmodes_ = None self.dmdmodes_ = None self.n_modes_ = None self.n_modes_system_ = None
[docs] def fit(self, X: ht.DNDarray, C: ht.DNDarray) -> Self: """ Fits the DMD model to the given data. Parameters ---------- X : DNDarray The time series data of states to fit the DMD model to. Must be of shape (n_state_features, n_timesteps). C : DNDarray The time series of control inputs to fit the DMD model to. Must be of shape (n_control_features, n_timesteps). """ ht.sanitize_in(X) ht.sanitize_in(C) # check if the input data is a 2-D DNDarray if X.ndim != 2: raise ValueError( f"Invalid shape '{X.shape}' for input data 'X'. Must be a 2-D DNDarray of shape (n_state_features, n_timesteps)." ) if C.ndim != 2: raise ValueError( f"Invalid shape '{C.shape}' for input data 'C'. Must be a 2-D DNDarray of shape (n_control_features, n_timesteps)." ) # check if the input data has at least two time steps if X.shape[1] < 2: raise ValueError( f"Invalid number of time steps '{X.shape[1]}' in input data 'X'. Must have at least two time steps." ) if C.shape[1] < 2: raise ValueError( f"Invalid number of time steps '{C.shape[1]}' in input data 'C'. Must have at least two time steps." ) # check if the input data has the same number of time steps if X.shape[1] != C.shape[1]: raise ValueError( f"Invalid number of time steps {X.shape[1]} in input data 'X' and {C.shape[1]} in input data 'C'. Must have the same number of time steps." ) if X.split is not None and C.split is not None and X.split != C.split: raise ValueError( f"If both input data 'X' and 'C' are distributed, they must be distributed along the same axis, but X.split={X.split}, C.split={C.split}." ) Xplus = X[:, 1:] Xplus.balance_() Omega = ht.concatenate((X, C), axis=0)[:, :-1] # first step of DMDc: compute the SVD of the input data from first to second last time step # as well as of the full system matrix if self.svd_solver == "full" or not X.is_distributed(): U, S, Vt = ht.linalg.svd(Xplus, full_matrices=False) Utilde, Stilde, Vttilde = ht.linalg.svd(Omega, full_matrices=False) if self.svd_tol is not None: # truncation w.r.t. prescribed bound on explained variance # determine svd_rank accordingly total_variance = (S**2).sum() variance_threshold = (1 - self.svd_tol) * total_variance.larray.item() variance_cumsum = (S**2).larray.cumsum(0) self.n_modes_ = len(variance_cumsum[variance_cumsum <= variance_threshold]) + 1 total_variance_system = (Stilde**2).sum() variance_threshold_system = (1 - self.svd_tol) * total_variance_system.larray.item() variance_cumsum_system = (Stilde**2).larray.cumsum(0) self.n_modes_system_ = ( len(variance_cumsum_system[variance_cumsum_system <= variance_threshold_system]) + 1 ) elif self.svd_rank is not None: # truncation w.r.t. prescribed rank self.n_modes_ = self.svd_rank self.n_modes_system_ = self.svd_rank + C.shape[0] else: # no truncation self.n_modes_ = S.shape[0] self.n_modes_system = Stilde.shape[0] self.rom_basis_ = U[:, : self.n_modes_] Vt = Vt[: self.n_modes_, :] S = S[: self.n_modes_] Vttilde = Vttilde[: self.n_modes_system_, :] Stilde = Stilde[: self.n_modes_system_] Utilde1 = Utilde[: X.shape[0], : self.n_modes_system_] Utilde2 = Utilde[X.shape[0] :, : self.n_modes_system_] # compute SVD via "hierarchical" SVD elif self.svd_solver == "hierarchical": if self.svd_tol is not None: # hierarchical SVD with prescribed upper bound on relative error U, S, Vt, _ = ht.linalg.hsvd_rtol( Xplus, self.svd_tol, compute_sv=True, safetyshift=5, ) Utilde, Stilde, Vttilde, _ = ht.linalg.hsvd_rtol( Omega, self.svd_tol, compute_sv=True, safetyshift=5, ) else: # hierarchical SVD with prescribed, fixed rank U, S, Vt, _ = ht.linalg.hsvd_rank( Xplus, self.svd_rank, compute_sv=True, safetyshift=5, ) Utilde, Stilde, Vttilde, _ = ht.linalg.hsvd_rank( Omega, self.svd_rank + C.shape[0], compute_sv=True, safetyshift=5, ) self.rom_basis_ = U self.n_modes_ = U.shape[1] self.n_modes_system_ = Utilde.shape[1] Utilde1 = Utilde[: X.shape[0], :] Utilde2 = Utilde[X.shape[0] :, :] else: # compute SVD via "randomized" SVD U, S, Vt = ht.linalg.rsvd( Xplus, self.svd_rank, ) Utilde, Stilde, Vttilde = ht.linalg.rsvd( Omega, self.svd_rank + C.shape[0], ) self.rom_basis_ = U self.n_modes_ = U.shape[1] self.n_modes_system_ = Utilde.shape[1] Utilde1 = Utilde[: X.shape[0], :] Utilde2 = Utilde[X.shape[0] :, :] # ensure that everything is balanced for the following steps Utilde2.balance_() Utilde1.balance_() Vtilde = Vttilde.T Vtilde.balance_() if Utilde2.split is not None and Utilde2.shape[Utilde2.split] < Utilde2.comm.size: Utilde2.resplit_((Utilde2.split + 1) % 2) if Utilde1.split is not None and Utilde1.shape[Utilde1.split] < Utilde1.comm.size: Utilde1.resplit_((Utilde1.split + 1) % 2) if Vtilde.split is not None and Vtilde.shape[Vtilde.split] < Vtilde.comm.size: Vtilde.resplit_((Vtilde.split + 1) % 2) # second step of DMD: compute the reduced order model transfer matrix # we need to assume that the the transfer matrix of the ROM is small enough to fit into memory of one process self.rom_transfer_matrix_ = ( self.rom_basis_.T
[docs] @ Xplus @ (Vtilde / Stilde) @ (Utilde1.T @ self.rom_basis_).resplit_(None) ) self.rom_control_matrix_ = (self.rom_basis_.T @ Xplus) @ ( (Vtilde / Stilde) @ Utilde2.T ).resplit_(0) self.rom_transfer_matrix_.resplit_(None) self.rom_control_matrix_.resplit_(None) # third step of DMD: compute the reduced order model eigenvalues and eigenmodes eigvals_loc, eigvec_loc = torch.linalg.eig(self.rom_transfer_matrix_.larray) self.rom_eigenvalues_ = ht.array(eigvals_loc, split=None, device=X.device) self.rom_eigenmodes_ = ht.array(eigvec_loc, split=None, device=X.device) self.dmdmodes_ = ( Xplus @ (Vtilde / Stilde) @ Utilde1.T @ self.rom_basis_ @ self.rom_eigenmodes_ )
def predict(self, X: ht.DNDarray, C: ht.DNDarray) -> ht.DNDarray: """ Predicts and returns future states given the current state(s) ``X`` and control trajectory ``C``. Parameters ---------- X : DNDarray The current state(s) for the prediction. Must have the same number of features as the training data, but can be batched for multiple current states, i.e., X can be of shape (n_state_features,) or (n_batch, n_state_features). C : DNDarray The control trajectory for the prediction. Must have the same number of control features as the training data, i.e., C must be of shape (n_control_features,) --for a single time step-- or (n_control_features, n_timesteps). """ if self.rom_basis_ is None: raise RuntimeError("Model has not been fitted yet. Call 'fit' first.") # sanitize input data ht.sanitize_in(X) ht.sanitize_in(C) # if X is a 1-D DNDarray, we add an artificial batch dimension; check correct dimensions for X if X.ndim == 1: X = X.expand_dims(0) if X.ndim > 2: raise ValueError( f"Invalid shape '{X.shape}' for input data 'X'. Must be a 2-D DNDarray of shape (n_batch, n_state_features) or a 1-D DNDarray of shape (n_state_features,)." ) # if C is a 1-D DNDarray, we add an artificial dimension for the single time step; check correct dimensions for C if C.ndim == 1: C = C.expand_dims(1) if C.ndim > 2: raise ValueError( f"Invalid shape '{C.shape}' for input data 'C'. Must be a 2-D DNDarray of shape (n_control_features, n_timesteps) or a 1-D DNDarray of shape (n_control_features,) for a single time step." ) # check if the input data has the right number of features for control and state space if X.shape[1] != self.rom_basis_.shape[0]: raise ValueError( f"Invalid number of features '{X.shape[1]}' in input data 'X'. Must have the same number of features as the training data (={self.rom_basis_.shape[0]})." ) if C.shape[0] != self.rom_control_matrix_.shape[1]: raise ValueError( f"Invalid number of features '{C.shape[1]}' in input data 'C'. Must have the same number of features as the training data (={self.rom_control_matrix_.shape[1]})." ) # different cases if C.split is not None: raise ValueError("So far only C.split = None is supported .") # time evolution in the reduced order model X_red = X @ self.rom_basis_ X_red_full = ht.zeros( (X.shape[0], self.rom_basis_.shape[1], C.shape[1]), split=X_red.split, device=X.device, dtype=X.dtype, ) X_red_full[:, :, 0] = X_red for i in range(1, C.shape[1]): X_red_full[:, :, i] = (self.rom_transfer_matrix_ @ X_red_full[:, :, i - 1].T).T + ( self.rom_control_matrix_ @ C[:, i - 1] ).T # reshape in order to be able to multiply with basis again X_red_full = X_red_full.reshape(self.rom_basis_.shape[1], -1).resplit_( 1 if X_red_full.split == 0 else None ) X_pred = self.rom_basis_ @ X_red_full # reshape again and return return X_pred.reshape(X.shape[0], X.shape[1], C.shape[1])
[docs] def __str__(self): if ht.MPI_WORLD.rank == 0: if self.rom_basis_ is not None: return ( f"----------- DMDc (Dynamic Mode Decomposition with control) ---------------\n" f"Number of modes: {self.n_modes_}\n" f"State space dimension: {self.rom_basis_.shape[0]}\n" f"Control space dimension: {self.rom_control_matrix_.shape[1]}\n" f"DMD eigenvalues: {self.rom_eigenvalues_.larray}\n" f"--------------------------------------------------------------------------\n" f"ROM basis of shape {self.rom_basis_.shape}:\n" f"\t split axis: {self.rom_basis_.split}\n" f"\t device: {self.rom_basis_.device.__str__().split(':')[-2]}\n" f"--------------------------------------------------------------------------\n" ) else: return ( f"-------- UNFITTED DMDc (Dynamic Mode Decomposition with control) ---------\n" f"Parameters for fit are as follows: \n" f"\t SVD-solver: {self.svd_solver}\n" f"\t SVD-rank: {self.svd_rank}\n" f"\t SVD-tolerance: {self.svd_tol}\n" f"--------------------------------------------------------------------------\n" ) else: return ""