Source code for rkstiff.solveras

r"""
Base solver infrastructure for rkstiff adaptive step PDE solvers
================================================================

Adaptive-step solver infrastructure for stiff time-dependent systems.

This module defines the :class:`BaseSolverAS` abstract class and its
associated :class:`SolverConfig`, which provide the foundation for
adaptive time-stepping solvers such as ETD(3,5) and other embedded
Runge–Kutta–type integrators.

Adaptive solvers automatically adjust their time step to maintain
a user-specified accuracy tolerance, based on local error estimates
computed during integration.
"""

from abc import abstractmethod
from typing import Tuple, Optional, Callable, Union, Literal
import numpy as np
from .solver import BaseSolver
from .util.solver_type import SolverType


# ======================================================================
# Solver Configuration
# ======================================================================
[docs] class SolverConfig: r""" Configuration parameters for adaptive-step stiff solvers. Defines numerical tolerances and scaling factors that control adaptive time step selection. Used by all adaptive solvers derived from :class:`BaseSolverAS`. Parameters ---------- epsilon : float, default=1e-4 Relative error tolerance for adaptive stepping. Smaller values yield higher accuracy but may reduce efficiency. incr_f : float, default=1.25 Step-size increase factor applied after successful steps. Must satisfy ``incr_f > 1.0``. decr_f : float, default=0.85 Step-size reduction factor applied after rejected steps. Must satisfy ``decr_f < 1.0``. safety_f : float, default=0.8 Safety factor for damping overly aggressive changes in step size. Must satisfy ``safety_f <= 1.0``. adapt_cutoff : float, default=0.01 Relative magnitude threshold for excluding small modes when computing adaptive step corrections. minh : float, default=1e-16 Minimum allowable step size. Raises ------ ValueError If any parameter violates its constraint. Notes ----- The combination of ``epsilon``, ``safety_f``, and ``adapt_cutoff`` primarily determines the balance between stability and adaptivity. Typical recommended settings are: .. code-block:: python SolverConfig(epsilon=1e-5, safety_f=0.9, adapt_cutoff=0.001) """
[docs] def __init__( self, epsilon: float = 1e-4, incr_f: float = 1.25, decr_f: float = 0.85, safety_f: float = 0.8, adapt_cutoff: float = 0.01, minh: float = 1e-16, ) -> None: """Initialize and validate adaptive solver configuration.""" self._epsilon = None self._incr_f = None self._decr_f = None self._safety_f = None self._adapt_cutoff = None self._minh = None # Use property setters for validation self.epsilon = epsilon self.incr_f = incr_f self.decr_f = decr_f self.safety_f = safety_f self.adapt_cutoff = adapt_cutoff self.minh = minh
# ------------------------------------------------------------------ # Properties (documented for autodoc clarity) # ------------------------------------------------------------------ @property def epsilon(self) -> float: """Relative error tolerance for adaptive stepping.""" return self._epsilon @epsilon.setter def epsilon(self, value: float) -> None: """Set relative error tolerance, must be positive.""" if value <= 0: raise ValueError(f"epsilon must be positive but is {value}") self._epsilon = float(value) @property def incr_f(self) -> float: """Increment factor for adaptive step sizing (must be > 1.0).""" return self._incr_f @incr_f.setter def incr_f(self, value: float) -> None: """Set increment factor, must be greater than 1.0.""" if value <= 1.0: raise ValueError(f"incr_f must be > 1.0 but is {value}") self._incr_f = float(value) @property def decr_f(self) -> float: """Decrement factor for adaptive step sizing (must be < 1.0).""" return self._decr_f @decr_f.setter def decr_f(self, value: float) -> None: """Set decrement factor, must be less than 1.0.""" if value >= 1.0: raise ValueError(f"decr_f must be < 1.0 but is {value}") self._decr_f = float(value) @property def safety_f(self) -> float: """Safety factor for adaptive stepping (must be ≤ 1.0).""" return self._safety_f @safety_f.setter def safety_f(self, value: float) -> None: """Set safety factor, must be ≤ 1.0.""" if value > 1.0: raise ValueError(f"safety_f must be <= 1.0 but is {value}") self._safety_f = float(value) @property def adapt_cutoff(self) -> float: """Cutoff threshold for ignoring small modes during adaptivity.""" return self._adapt_cutoff @adapt_cutoff.setter def adapt_cutoff(self, value: float) -> None: """Set adaptivity cutoff, must be less than 1.0.""" if value >= 1.0: raise ValueError(f"adapt_cutoff must be < 1.0 but is {value}") self._adapt_cutoff = float(value) @property def minh(self) -> float: """Minimum allowable step size.""" return self._minh @minh.setter def minh(self, value: float) -> None: """Set minimum step size, must be positive.""" if value <= 0: raise ValueError(f"minh must be positive but is {value}") self._minh = float(value)
# ====================================================================== # Adaptive Solver Base Class # ======================================================================
[docs] class BaseSolverAS(BaseSolver): r""" Abstract base class for adaptive-step stiff solvers. Provides an adaptive time-stepping framework for integrating semi-linear systems of the form .. math:: \frac{\partial \mathbf{U}}{\partial t} = \mathcal{L}\mathbf{U} + \mathcal{N}(\mathbf{U}), where :math:`\mathcal{L}` is a (possibly stiff) linear operator and :math:`\mathcal{N}` is a nonlinear function. Subclasses must implement :meth:`_update_stages` (to perform the embedded Runge–Kutta stage updates), :meth:`_reset` (to clear solver state), and :meth:`_q` (to define the order of accuracy for step size control). Parameters ---------- lin_op : np.ndarray Linear operator :math:`\mathcal{L}` defining the stiff part of the system. Can be either 1D (diagonal) or a full 2D matrix. nl_func : Callable[[np.ndarray], np.ndarray] Nonlinear function :math:`\mathcal{N}(\mathbf{U})`. config : SolverConfig, optional Configuration object specifying tolerances and scaling factors for adaptive step control. loglevel : str or int, default='WARNING' Logging level for runtime diagnostics. Attributes ---------- config : SolverConfig Adaptive step configuration (tolerances and safety parameters). epsilon : float Relative error tolerance (:attr:`config.epsilon`). minh : float Minimum allowable step size (:attr:`config.minh`). _accept : bool Flag indicating whether the most recent step was accepted. Raises ------ ValueError If ``lin_op`` has invalid dimensions or configuration parameters are inconsistent. Notes ----- Adaptive time-stepping uses local error control via embedded Runge–Kutta pairs, dynamically adjusting :math:`h` such that .. math:: \| \mathbf{e}_n \| \leq \varepsilon \, \| \mathbf{u}_n \|, where :math:`\mathbf{e}_n` is the local truncation error and :math:`\varepsilon` is the tolerance. .. tip:: Implementations typically define `_update_stages()` to return both a candidate solution and an error estimate, which are used to adjust the next step size. """ # ------------------------------------------------------------------ # Internal exceptions # ------------------------------------------------------------------
[docs] class SolverError(RuntimeError): """Base exception for solver-related runtime errors."""
[docs] class MaxLoopsExceeded(SolverError): """Raised when too many attempts are made to find a valid adaptive step."""
[docs] class MinimumStepReached(SolverError): """Raised when the adaptive step size falls below the minimum allowed value."""
MAX_LOOPS = 50 #: Maximum retry attempts per adaptive step MAX_S = 4.0 #: Maximum allowed step size increase factor MIN_S = 0.25 #: Minimum allowed step size reduction factor # ------------------------------------------------------------------ # Initialization # ------------------------------------------------------------------
[docs] def __init__( self, lin_op: np.ndarray, nl_func: Callable[[np.ndarray], np.ndarray], config: SolverConfig = SolverConfig(), loglevel: Union[Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], int] = "WARNING", ) -> None: """Initialize an adaptive-step solver with validated configuration.""" super().__init__(lin_op, nl_func, loglevel) self.config = config self.logger.debug( "Adaptive configuration: epsilon=%s, incr_f=%s, decr_f=%s, safety_f=%s", config.epsilon, config.incr_f, config.decr_f, config.safety_f, ) self.__t0, self.__tf, self.__tc = 0, 0, 0 self._accept = False
# ------------------------------------------------------------------ # Core Interface # ------------------------------------------------------------------ @property def solver_type(self) -> SolverType: """ Return the solver type for adaptive-step solvers. Returns ------- SolverType Always returns ``SolverType.ADAPTIVE_STEP``. Examples -------- >>> from rkstiff.etd35 import ETD35 >>> solver = ETD35(lin_op, nl_func) >>> solver.solver_type == SolverType.ADAPTIVE_STEP True """ return SolverType.ADAPTIVE_STEP
[docs] def reset(self) -> None: """Reset solver and clear adaptive-step state.""" self.logger.debug("Resetting adaptive solver state") self.t, self.u = [], [] self.__t0, self.__tf, self.__tc = 0, 0, 0 self._accept = False self._reset()
@abstractmethod def _reset(self) -> None: """Reset solver-specific internal state. Must be implemented by subclasses.""" @abstractmethod def _update_stages(self, u: np.ndarray, h: float) -> Tuple[np.ndarray, np.ndarray]: """ Advance the solution one adaptive step. Must be implemented by subclasses to compute both the candidate next state and an estimate of the local truncation error. Returns ------- tuple of (np.ndarray, np.ndarray) Updated solution and local error estimate. """ @abstractmethod def _q(self): """Return the method’s order (used in adaptive step size scaling)."""
[docs] def step(self, u: np.ndarray, h_suggest: float) -> Tuple[np.ndarray, float, float]: """ Perform one adaptive integration step. Attempts to advance the solution by time ``h_suggest`` and adjusts the step size automatically based on local error estimates. Parameters ---------- u : np.ndarray Current state vector. h_suggest : float Suggested time step size. Returns ------- unew : np.ndarray Updated solution vector after one accepted step. h : float Actual step size used. h_suggest : float Suggested step size for the next iteration. Raises ------ MaxLoopsExceeded If too many attempts are made to find an acceptable step size. MinimumStepReached If the step size drops below the configured minimum ``minh``. Notes ----- The algorithm follows this pattern: 1. Try the proposed step. 2. Estimate local error and compute scaling factor ``s``. 3. If ``s < 1`` → reject step and reduce ``h``. 4. If ``s ≥ 1`` → accept step and update ``h_suggest`` for next step. .. warning:: Very small or divergent ``s`` values may indicate instability or excessively tight tolerances. """ h = h_suggest assert h >= 0.0 self.logger.debug("Starting step with h_suggest=%s", h_suggest) numloops = 0 while True: unew, err = self._update_stages(u, h) # Compute step size change factor s s = self._compute_s(unew, err) self.logger.debug("Computed s=%s for h=%s", s, h) # If s is less than 1, inf, or nan, reject step and reduce step size if np.isinf(s) or np.isnan(s) or s < 1.0: h = self._reject_step_size(s, h) # If s is bigger than 1 accept h and the step else: h_suggest = self._accept_step_size(s, h) self.logger.debug("Step accepted, returning h=%s, h_suggest=%s", h, h_suggest) return unew, h, h_suggest numloops += 1 if numloops > self.MAX_LOOPS: failure_str = ( "Solver failed: adaptive step made too many attempts to find a step " "size with an acceptible amount of error." ) self.logger.error(failure_str) raise self.MaxLoopsExceeded(failure_str) if h < self.config.minh: failure_str = "Solver failed: adaptive step reached minimum step size" self.logger.error(failure_str) raise self.MinimumStepReached(failure_str)
def _compute_s(self, u: np.ndarray, err: np.ndarray) -> float: r""" Compute the step size scaling factor :math:`s` for adaptive control. The scaling factor determines how the next step size should be adjusted based on the estimated local error: .. math:: s = \text{safety\_f} \, \left( \frac{\varepsilon \, \|\mathbf{u}\|} {\|\mathbf{e}\|} \right)^{1/q}, where :math:`\varepsilon` is the tolerance, :math:`\mathbf{u}` is the current solution, :math:`\mathbf{e}` is the local error estimate, and :math:`q` is the method order returned by :meth:`_q`. To avoid instability from low-magnitude components, modes with relative amplitudes below ``config.adapt_cutoff`` are ignored. Parameters ---------- u : np.ndarray Current solution vector :math:`\mathbf{u}_n`. err : np.ndarray Local error estimate :math:`\mathbf{e}_n`. Returns ------- float Step size scaling factor :math:`s`. - ``s > 1`` → step accepted (can increase step size) - ``s < 1`` → step rejected (reduce step size) Notes ----- The new suggested step size is computed as :math:`h_{\text{new}} = s h`. """ # Use adapt_cutoff to ignore small modes/values in the computation of the step size magu = np.abs(u) idx = magu / magu.max() > self.config.adapt_cutoff tol = self.config.epsilon * np.linalg.norm(u[idx]) s = self.config.safety_f * np.power(tol / np.linalg.norm(err[idx]), 1.0 / self._q()) return s def _reject_step_size(self, s: float, h: float) -> float: r""" Reduce the time step after a rejected attempt. Called when the estimated local error exceeds the tolerance. The method scales the current step size :math:`h` by the factor :math:`s`, clamped within stability limits. .. math:: h_{\text{new}} = \max(s, s_{\min}) \; \min(s, \text{decr\_f}) \; h Parameters ---------- s : float Step size scaling factor computed by :meth:`_compute_s`. h : float Current step size before rejection. Returns ------- float Reduced step size for retrying the integration. Notes ----- - Enforces minimum allowed factor ``MIN_S`` to prevent collapse. - Logs detailed information if ``s`` is NaN or infinite. - Sets the acceptance flag ``_accept`` to ``False``. .. warning:: Frequent rejections may indicate an unstable problem or overly strict tolerance. """ self._accept = False # Check that s is a number if np.isinf(s) or np.isnan(s): msg = "inf or nan number encountered: reducing step size to %s" % h self.logger.warning(msg) return self.MIN_S * h s = np.max([s, self.MIN_S]) # dont let s be too small s = np.min([s, self.config.decr_f]) # dont let s be too close to 1 msg = "step rejected with s = %.2f" % s self.logger.debug(msg) hnew = s * h msg = "reducing step size to %s" % hnew self.logger.debug(msg) return hnew def _accept_step_size(self, s: float, h: float) -> float: r""" Update the suggested step size after a successful integration step. When the estimated local error is within tolerance, the step is accepted, and the next step size is increased according to the scaling factor :math:`s`. .. math:: h_{\text{next}} = \begin{cases} \min(s, s_{\max}) \, h, & \text{if } s > \text{incr\_f} \\ h, & \text{otherwise.} \end{cases} Parameters ---------- s : float Step size scaling factor computed by :meth:`_compute_s`. h : float Step size used for the accepted step. Returns ------- float Suggested step size for the next step. Notes ----- - Sets the acceptance flag ``_accept`` to ``True``. - Caps increases using ``MAX_S`` and ``config.incr_f`` to avoid overly aggressive growth. - Designed to ensure smooth variation of step size. """ self._accept = True s = np.min([s, self.MAX_S]) # dont let s be too big msg = "step accepted with s = %.2f" % s self.logger.debug(msg) # if s much larger than 1, increase the step size if s > self.config.incr_f: h_suggest = s * h msg = "increasing step size to %s" % h_suggest self.logger.debug(msg) return h_suggest return h
[docs] def evolve( self, u: np.ndarray, t0: float, tf: float, h_init: Optional[float] = None, store_data: bool = True, store_freq: int = 1, ) -> np.ndarray: r""" Integrate the system from :math:`t_0` to :math:`t_f` using adaptive time steps. Repeatedly applies :meth:`step` to propagate the solution forward while dynamically adjusting the time step size based on local error estimates. Parameters ---------- u : np.ndarray Initial solution vector at :math:`t_0`. t0 : float Initial time. tf : float Final time. h_init : float, optional Initial step size. Defaults to ``(tf - t0) / 100`` if not provided. store_data : bool, default=True Whether to store intermediate results in :attr:`t` and :attr:`u`. store_freq : int, default=1 Frequency of data storage; store every ``store_freq`` accepted steps. Returns ------- np.ndarray Final solution at :math:`t = t_f`. Notes ----- - The evolution proceeds until :math:`t \geq t_f`, automatically adjusting step sizes as needed. - Stored data is accessible via :attr:`t` and :attr:`u`. Example ------- >>> solver = ETD35(lin_op, nl_func) >>> u_final = solver.evolve(u0, t0=0.0, tf=10.0) >>> solver.t[-1], np.linalg.norm(solver.u[-1]) (10.0, 0.0134) """ self.reset() self.logger.info("Starting evolution from t=%s to t=%s", t0, tf) self.__t0, self.__tf, self.__tc = t0, tf, t0 if store_data: self.t.append(t0) self.u.append(u) # Set initial step size if none given if h_init is None: h_init = (self.__tf - self.__t0) / 100.0 h = h_init self.logger.debug("Initial step size h=%s, store_freq=%s", h, store_freq) # Make sure step size isn't larger than entire propagation time if self.__tc + h > self.__tf: h = self.__tf - self.__tc step_count = 0 while self.__tc < self.__tf: u, h, h_suggest = self.step(u, h) self.__tc += h step_count += 1 if step_count % 100 == 0: self.logger.info( "Progress: t=%.6f/%.6f (%.1f%%), steps=%d", self.__tc, self.__tf, 100 * self.__tc / self.__tf, step_count, ) if self.__tc + h_suggest > self.__tf: h = self.__tf - self.__tc else: h = h_suggest if store_data and (step_count % store_freq == 0): self.t.append(self.__tc) self.u.append(u) self.logger.debug("Stored solution at t=%.6f (step %d)", self.__tc, step_count) self.logger.info("Evolution complete after %d steps", step_count) self.logger.info("Stored %d solution snapshots", len(self.u)) return u