Source code for pde.solvers.explicit

"""
Defines an explicit solver supporting various methods
   
.. codeauthor:: David Zwicker <david.zwicker@ds.mpg.de> 
"""

from typing import Callable, Tuple

import numpy as np

from ..fields.base import FieldBase
from ..pdes.base import PDEBase
from ..tools.numba import jit
from .base import SolverBase


[docs]class ExplicitSolver(SolverBase): """class for solving partial differential equations explicitly""" name = "explicit" dt_min = 1e-10 dt_max = 1e10 def __init__( self, pde: PDEBase, scheme: str = "euler", backend: str = "auto", adaptive: bool = False, tolerance: float = 1e-4, ): """ Args: pde (:class:`~pde.pdes.base.PDEBase`): The instance describing the pde that needs to be solved scheme (str): Defines the explicit scheme to use. Supported values are 'euler' and 'runge-kutta' (or 'rk' for short). backend (str): Determines how the function is created. Accepted values are 'numpy` and 'numba'. Alternatively, 'auto' lets the code decide for the most optimal backend. adaptive (bool): When enabled, the time step is adjusted during the simulation using the error tolerance set with `tolerance`. tolerance (float): The error tolerance used in adaptive time stepping. This is used in adaptive time stepping to choose a time step which is small enough so the truncation error of a single step is below `tolerance`. """ super().__init__(pde) self.scheme = scheme self.backend = backend self.adaptive = adaptive self.tolerance = tolerance def _make_fixed_euler_stepper( self, state: FieldBase, dt: float ) -> Callable[[np.ndarray, float, int], Tuple[float, float]]: """make a simple Euler stepper with fixed time step Args: state (:class:`~pde.fields.base.FieldBase`): An example for the state from which the grid and other information can be extracted dt (float): Time step of the explicit stepping. Returns: Function that can be called to advance the `state` from time `t_start` to time `t_end`. The function call signature is `(state: numpy.ndarray, t_start: float, steps: int)` """ # obtain post-step action function modify_after_step = jit(self.pde.make_modify_after_step(state)) if self.pde.is_sde: # handle stochastic version of the pde rhs_sde = self._make_sde_rhs(state, backend=self.backend) def stepper( state_data: np.ndarray, t_start: float, steps: int ) -> Tuple[float, float]: """compiled inner loop for speed""" modifications = 0.0 for i in range(steps): # calculate the right hand side t = t_start + i * dt evolution_rate, noise_realization = rhs_sde(state_data, t) state_data += dt * evolution_rate if noise_realization is not None: state_data += np.sqrt(dt) * noise_realization modifications += modify_after_step(state_data) return t + dt, modifications self.info["stochastic"] = True self._logger.info( f"Initialized explicit Euler-Maruyama stepper with dt=%g", dt ) else: # handle deterministic version of the pde rhs_pde = self._make_pde_rhs(state, backend=self.backend) def stepper( state_data: np.ndarray, t_start: float, steps: int ) -> Tuple[float, float]: """compiled inner loop for speed""" modifications = 0 for i in range(steps): # calculate the right hand side t = t_start + i * dt state_data += dt * rhs_pde(state_data, t) modifications += modify_after_step(state_data) return t + dt, modifications self.info["stochastic"] = False self.info["adaptive"] = False self._logger.info(f"Initialized explicit Euler stepper with dt=%g", dt) return stepper def _make_adaptive_euler_stepper( self, state: FieldBase, dt: float ) -> Callable[[FieldBase, float, float], float]: """make a simple adaptive Euler stepper Args: state (:class:`~pde.fields.base.FieldBase`): An example for the state from which the grid and other information can be extracted dt (float): Initial time step of the explicit stepping. Returns: Function that can be called to advance the `state` from time `t_start` to time `t_end`. The function call signature is `(state: numpy.ndarray, t_start: float, t_end: float)` """ if self.pde.is_sde: raise RuntimeError( "Cannot use adaptive Euler stepper with stochastic equation" ) # obtain post-step action function modify_after_step = jit(self.pde.make_modify_after_step(state)) # handle deterministic version of the pde using an adaptive stepper rhs_pde = self._make_pde_rhs(state, backend=self.backend) tolerance = self.tolerance self.info["dt_last"] = dt self.info["stochastic"] = False self.info["adaptive"] = True dt_min = self.dt_min dt_min_err = f"Time step below {dt_min}" dt_max = self.dt_max def inner_stepper( state_data: np.ndarray, t_start: float, t_end: float, dt_init: float ) -> Tuple[float, float, int, float]: """compiled inner loop for speed""" modifications = 0.0 dt = dt_init t = t_start steps = 0 while t < t_end: steps += 1 # single step with dt rate = rhs_pde(state_data, t) k1 = state_data + dt * rate # double step with half the dt k2 = state_data + 0.5 * dt * rate k2 += 0.5 * dt * rhs_pde(k2, t + 0.5 * dt) # calculate maximal error error = 0.0 for i in range(state_data.size): error = max(error, abs(k1.flat[i] - k2.flat[i])) error *= dt # error estimate should be independent of magnitude of dt # do the step if the error is sufficiently small if error <= tolerance: t += dt steps += 1 state_data[:] = k2 modifications += modify_after_step(state_data) # adjust the time step dt *= min(max(0.9 * (tolerance / error) ** 0.2, 0.1), 4.0) if dt > dt_max: dt = dt_max elif dt < dt_min: raise RuntimeError(dt_min_err) return t, dt, steps, modifications if self.info["backend"] == "numba": inner_stepper = jit(inner_stepper) # compile inner stepper def stepper(state: FieldBase, t_start: float, t_end: float) -> float: """use Euler stepping to advance `state` from `t_start` to `t_end`""" t_last, dt, steps, modifications = inner_stepper( state.data, t_start, t_end, dt_init=self.info["dt_last"] ) self.info["dt_last"] = dt self.info["steps"] += steps self.info["state_modifications"] += modifications return t_last self._logger.info(f"Initialized adaptive Euler stepper with dt_0={dt}") return stepper def _make_rk45_stepper( self, state: FieldBase, dt: float ) -> Callable[[np.ndarray, float, int], Tuple[float, float]]: """make a simple stepper for the explicit Runge-Kutta method of order 5(4) Args: state (:class:`~pde.fields.base.FieldBase`): An example for the state from which the grid and other information can be extracted dt (float): Time step of the explicit stepping. Returns: Function that can be called to advance the `state` from time `t_start` to time `t_end`. The function call signature is `(state: numpy.ndarray, t_start: float, steps: int)` """ if self.pde.is_sde: raise RuntimeError( "Cannot use Runge-Kutta stepper with stochastic equation" ) rhs = self._make_pde_rhs(state, backend=self.backend) self.info["stochastic"] = False self.info["adaptive"] = self.adaptive # obtain post-step action function modify_after_step = jit(self.pde.make_modify_after_step(state)) def stepper( state_data: np.ndarray, t_start: float, steps: int ) -> Tuple[float, float]: """compiled inner loop for speed""" modifications = 0.0 for i in range(steps): # calculate the right hand side t = t_start + i * dt # calculate the intermediate values in Runge-Kutta k1 = dt * rhs(state_data, t) k2 = dt * rhs(state_data + 0.5 * k1, t + 0.5 * dt) k3 = dt * rhs(state_data + 0.5 * k2, t + 0.5 * dt) k4 = dt * rhs(state_data + k3, t + dt) state_data += (k1 + 2 * k2 + 2 * k3 + k4) / 6 modifications += modify_after_step(state_data) return t + dt, modifications self._logger.info(f"Initialized explicit Runge-Kutta-45 stepper with dt=%g", dt) return stepper def _make_rkf_stepper( self, state: FieldBase, dt: float ) -> Callable[[FieldBase, float, float], float]: """make a simple stepper for the explicit Runge-Kutta-Fehlberg method Args: state (:class:`~pde.fields.base.FieldBase`): An example for the state from which the grid and other information can be extracted dt (float): Initial time step of the explicit stepping. Returns: Function that can be called to advance the `state` from time `t_start` to time `t_end`. The function call signature is `(state: numpy.ndarray, t_start: float, t_end: float)` """ if self.pde.is_sde: raise RuntimeError( "Cannot use Runge-Kutta-Fehlberg stepper with stochastic equation" ) rhs = self._make_pde_rhs(state, backend=self.backend) self.info["stochastic"] = False self.info["adaptive"] = self.adaptive self.info["dt_last"] = dt # obtain post-step action function modify_after_step = jit(self.pde.make_modify_after_step(state)) # use Runge-Kutta-Fehlberg method tolerance = self.tolerance dt_min = self.dt_min dt_min_err = f"Time step below {dt_min}" dt_max = self.dt_max # define coefficients for RK4(5), formula 2 Table III in Fehlberg a2 = 1 / 4 a3 = 3 / 8 a4 = 12 / 13 a5 = 1.0 a6 = 1 / 2 b21 = 1 / 4 b31 = 3 / 32 b32 = 9 / 32 b41 = 1932 / 2197 b42 = -7200 / 2197 b43 = 7296 / 2197 b51 = 439 / 216 b52 = -8.0 b53 = 3680 / 513 b54 = -845 / 4104 b61 = -8 / 27 b62 = 2.0 b63 = -3544 / 2565 b64 = 1859 / 4104 b65 = -11 / 40 r1 = 1 / 360 # r2 = 0 r3 = -128 / 4275 r4 = -2197 / 75240 r5 = 1 / 50 r6 = 2 / 55 c1 = 25 / 216 # c2 = 0 c3 = 1408 / 2565 c4 = 2197 / 4104 c5 = -1 / 5 def inner_stepper( state_data: np.ndarray, t_start: float, t_end: float, dt_init: float ) -> Tuple[float, float, int, float]: """compiled inner loop for speed""" modifications = 0.0 dt = dt_init t = t_start steps = 0 while t < t_end: steps += 1 # do the six intermediate steps k1 = dt * rhs(state_data, t) k2 = dt * rhs(state_data + b21 * k1, t + a2 * dt) k3 = dt * rhs(state_data + b31 * k1 + b32 * k2, t + a3 * dt) k4 = dt * rhs(state_data + b41 * k1 + b42 * k2 + b43 * k3, t + a4 * dt) k5 = dt * rhs( state_data + b51 * k1 + b52 * k2 + b53 * k3 + b54 * k4, t + a5 * dt ) k6 = dt * rhs( state_data + b61 * k1 + b62 * k2 + b63 * k3 + b64 * k4 + b65 * k5, t + a6 * dt, ) # estimate the maximal error error = 0.0 for i in range(state_data.size): error_local: float = abs( r1 * k1.flat[i] + r3 * k3.flat[i] + r4 * k4.flat[i] + r5 * k5.flat[i] + r6 * k6.flat[i] ) error = max(error, error_local) # do the step if the error is sufficiently small if error <= tolerance: t += dt state_data += c1 * k1 + c3 * k3 + c4 * k4 + c5 * k5 modifications += modify_after_step(state_data) # adjust the time step dt *= min(max(0.9 * (tolerance / error) ** 0.2, 0.1), 4.0) if dt > dt_max: dt = dt_max elif dt < dt_min: raise RuntimeError(dt_min_err) return t, dt, steps, modifications if self.info["backend"] == "numba": inner_stepper = jit(inner_stepper) # compile inner stepper def stepper(state: FieldBase, t_start: float, t_end: float) -> float: """use Euler stepping to advance `state` from `t_start` to `t_end`""" t_last, dt, steps, modifications = inner_stepper( state.data, t_start, t_end, dt_init=self.info["dt_last"] ) self.info["dt_last"] = dt self.info["steps"] += steps self.info["state_modifications"] += modifications return t_last self._logger.info( f"Initialized Runge-Kutta-Fehlberg stepper with initial dt={dt}" ) return stepper
[docs] def make_stepper( self, state: FieldBase, dt=None ) -> Callable[[FieldBase, float, float], float]: """return a stepper function using an explicit scheme Args: state (:class:`~pde.fields.base.FieldBase`): An example for the state from which the grid and other information can be extracted dt (float): Time step of the explicit stepping. If `None`, this solver specifies 1e-3 as a default value. Returns: Function that can be called to advance the `state` from time `t_start` to time `t_end`. The function call signature is `(state: numpy.ndarray, t_start: float, t_end: float)` """ # support `None` as a default value, so the controller can signal that # the solver should use a default time step if dt is None: dt = 1e-3 if not self.adaptive: self._logger.warning( "Explicit stepper with a fixed time step did not receive any " f"initial value for `dt`. Using dt={dt}, but specifying a value or " "enabling adaptive stepping is advisable." ) self.info["dt"] = dt self.info["steps"] = 0 self.info["scheme"] = self.scheme self.info["state_modifications"] = 0.0 if self.pde.is_sde and self.adaptive: self._logger.warning( "Stochastic stepping cannot be adaptive. Using fixed time step instead." ) if self.adaptive and not self.pde.is_sde: # create stepper with adaptive dt self.info["dt_adaptive"] = True if self.scheme == "euler": adaptive_stepper = self._make_adaptive_euler_stepper(state, dt) elif self.scheme in {"runge-kutta", "rk", "rk45"}: adaptive_stepper = self._make_rkf_stepper(state, dt) else: raise ValueError( f"Explicit adaptive scheme {self.scheme} is not supported" ) return adaptive_stepper else: # create stepper with fixed dt self.info["dt_adaptive"] = False if self.scheme == "euler": inner_stepper = self._make_fixed_euler_stepper(state, dt) elif self.scheme in {"runge-kutta", "rk", "rk45"}: inner_stepper = self._make_rk45_stepper(state, dt) else: raise ValueError(f"Explicit scheme {self.scheme} is not supported") if self.info["backend"] == "numba": inner_stepper = jit(inner_stepper) # compile inner stepper def fixed_stepper(state: FieldBase, t_start: float, t_end: float) -> float: """advance `state` from `t_start` to `t_end` using fixed dt""" # calculate number of steps (which is at least 1) steps = max(1, int(np.ceil((t_end - t_start) / dt))) t_last, modifications = inner_stepper(state.data, t_start, steps) self.info["steps"] += steps self.info["state_modifications"] += modifications return t_last return fixed_stepper