"""
Module defining classes for time intervals for trackers
The provided interval classes are:
.. autosummary::
:nosignatures:
ConstantIntervals
LogarithmicIntervals
RealtimeIntervals
.. codeauthor:: David Zwicker <david.zwicker@ds.mpg.de>
"""
import copy
import math
import time
from typing import Any, Dict, Optional, Union
from ..tools.parse_duration import parse_duration
InfoDict = Optional[Dict[str, Any]]
[docs]class ConstantIntervals:
"""class representing equidistantly spaced time intervals"""
def __init__(self, dt: float = 1, t_start: Optional[float] = None):
"""
Args:
dt (float): The duration between subsequent intervals. This is
measured in simulation time units.
t_start (float, optional): The time after which the tracker becomes
active. If omitted, the tracker starts recording right away.
This argument can be used for an initial equilibration period
during which no data is recorded.
"""
self.dt = float(dt)
self.t_start = t_start
self._t_next: Optional[float] = None # next time it should be called
def __repr__(self):
return f"{self.__class__.__name__}(dt={self.dt:g}, t_start={self.t_start})"
[docs] def copy(self):
"""return a copy of this instance"""
return copy.copy(self)
def _initialize(self, t: float) -> float:
"""initialize the tracker
Args:
t (float): The starting time of the simulation
Returns:
float: The first time the tracker needs to handle data
"""
if self.t_start is None:
self._t_next = t
else:
self._t_next = max(t, self.t_start)
return self._t_next
[docs] def next(self, t: float) -> float:
"""computes the next time point based on the current time t
Args:
t (float): The current time point of the simulation
"""
if self._t_next is None:
self._initialize(t)
# move next interrupt time by the appropriate interval
self._t_next += self.dt # type: ignore
# make sure that the new interrupt time is in the future
if self._t_next <= t:
# add `dt` until _t_next is in the future (larger than t)
n = math.ceil((t - self._t_next) / self.dt)
self._t_next += self.dt * n
# adjust in special cases where float-point math fails us
if self._t_next < t:
self._t_next += self.dt
return self._t_next
[docs]class LogarithmicIntervals(ConstantIntervals):
"""class representing logarithmically spaced time intervals"""
def __init__(
self, dt_initial: float = 1, factor: float = 1, t_start: Optional[float] = None
):
"""
Args:
dt_initial (float): The initial duration between subsequent
intervals. This is measured in simulation time units.
factor (float): The factor by which the time between intervals is
increased every time. Values larger than one lead to time
intervals that are increasingly further apart.
t_start (float, optional): The time after which the tracker becomes
active. If omitted, the tracker starts recording right away.
This argument can be used for an initial equilibration period
during which no data is recorded.
"""
super().__init__(dt=dt_initial / factor, t_start=t_start)
self.factor = float(factor)
def __repr__(self):
return (
f"{self.__class__.__name__}(dt={self.dt:g}, "
f"factor={self.factor:g}, t_start={self.t_start})"
)
[docs] def next(self, t: float) -> float:
"""computes the next time point based on the current time t
Args:
t (float): The current time point of the simulation
"""
self.dt *= self.factor
return super().next(t)
[docs]class RealtimeIntervals(ConstantIntervals):
"""class representing time intervals spaced equidistantly in real time
This spacing is only achieved approximately and depends on the initial value
set by `dt_initial` and the actual variation in computation speed.
"""
def __init__(self, duration: Union[float, str], dt_initial: float = 0.01):
"""
Args:
duration (float or str): The duration (in realtime seconds) that the
intervals should be spaced apart. The duration can also be given
as a string, which is then parsed using the function
:func:`~pde.tools.parse_duration.parse_duration`.
dt_initial (float): The initial duration between subsequent
intervals. This is measured in simulation time units.
"""
super().__init__(dt=dt_initial)
try:
self.duration = float(duration)
except Exception:
td = parse_duration(str(duration))
self.duration = td.total_seconds()
self._last_time: Optional[float] = None
def __repr__(self):
return (
f"{self.__class__.__name__}(duration={self.duration:g}, "
f"dt_initial={self.dt:g})"
)
def _initialize(self, t: float) -> float:
"""initialize the tracker
Args:
t (float): The starting time of the simulation
Returns:
float: The first time the tracker needs to handle data
"""
self._last_time = time.monotonic()
return super()._initialize(t)
[docs] def next(self, t: float) -> float:
"""computes the next time point based on the current time t
Args:
t (float): The current time point of the simulation
"""
if self._last_time is None:
self._last_time = time.monotonic()
else:
# adapt time step
current_time = time.monotonic()
time_passed = current_time - self._last_time
if time_passed > 0:
# predict new time step, but limit it from below, to avoid problems with
# simulations where a single step takes a long time
dt_predict = max(1e-3, self.dt * self.duration / time_passed)
# use geometric average to provide some smoothing
self.dt = math.sqrt(self.dt * dt_predict)
else:
self.dt *= 2
self._last_time = current_time
return super().next(t)
IntervalType = ConstantIntervals
IntervalData = Union[IntervalType, float, int, str]
[docs]def get_interval(interval: IntervalData) -> IntervalType:
"""create IntervalType from various data formats
If interval is of type :class:`IntervalType` it is simply returned
"""
if isinstance(interval, IntervalType):
return interval
elif isinstance(interval, (int, float)):
return ConstantIntervals(interval)
elif isinstance(interval, str):
return RealtimeIntervals(interval)
else:
raise TypeError(f"Do not understand interval type {interval}")