Source code for pde.storage.modelrunner

"""Defines a class storing data using :mod:`modelrunner`.

.. codeauthor:: David Zwicker <david.zwicker@ds.mpg.de>
"""

from __future__ import annotations

import modelrunner as mr
import numpy as np

from ..fields.base import FieldBase
from .base import InfoDict, StorageBase, WriteModeType


[docs] class ModelrunnerStorage(StorageBase): """Store discretized fields in a :mod:`modelrunner` storage. This storage class acts as a wrapper for the :mod:`~modelrunner.storage.trajectory` module, which allows handling time-dependent data in :mod:`modelrunner` storages. In principle, all backends are supported, but it is advisable to use binary formats like :class:`~modelrunner.storage.backend.hdf.HDFStorage` or :class:`~modelrunner.storage.backend.zarr.ZarrStorage` to write large amounts of data. .. code-block:: python from modelrunner import Result r = Result.from_file("data.hdf5") r.result.plot() # plots the final state r.storage["trajectory"] # allows accessing the stored trajectory """ def __init__( self, storage: mr.storage.StorageGroup, *, loc: mr.storage.Location = "trajectory", info: InfoDict | None = None, write_mode: WriteModeType = "truncate_once", ): """ Args: storage (:class:`~modelrunner.storage.group.StorageGroup`): Modelrunner storage used for storing the trajectory loc (str or list of str): The location in the storage where the trajectory data is written. info (dict): Supplies extra information that is stored in the storage write_mode (str): Determines how new data is added to already existing data. Possible values are: 'append' (data is always appended), 'truncate' (data is cleared every time this storage is used for writing), or 'truncate_once' (data is cleared for the first writing, but appended subsequently). Alternatively, specifying 'readonly' will disable writing completely. """ super().__init__(info=info, write_mode=write_mode) self.storage = storage self.loc = loc self._writer: mr.storage.TrajectoryWriter | None = None self._reader: mr.storage.Trajectory | None = None
[docs] def close(self) -> None: """Close the currently opened trajectory writer.""" if self._writer is not None: self._writer.close() self._writer = None
def __enter__(self) -> ModelrunnerStorage: return self def __exit__(self, exc_type, exc_value, exc_traceback): self.close() def __len__(self): """Return the number of stored items, i.e., time steps.""" return len(self.times) @property def _io(self) -> mr.storage.TrajectoryWriter | mr.storage.Trajectory: """:class:`~modelrunner.storage.group.StorageGroup`: Group with all data.""" if self._writer is not None: return self._writer if self._reader is None: self._reader = mr.storage.Trajectory(self.storage, loc=self.loc) return self._reader @property def times(self): """:class:`~numpy.ndarray`: The times at which data is available.""" return self._io.times @property def data(self): """:class:`~numpy.ndarray`: The actual data for all time.""" return self._io._storage.read_array(self._io._loc + ["data"])
[docs] def clear(self, clear_data_shape: bool = False): """Truncate the storage by removing all stored data. Args: clear_data_shape (bool): Flag determining whether the data shape is also deleted. """ if self.loc in self.storage: raise NotImplementedError("Cannot delete existing trajectory") super().clear(clear_data_shape=clear_data_shape)
[docs] def start_writing(self, field: FieldBase, info: InfoDict | None = None) -> None: """Initialize the storage for writing data. Args: field (:class:`~pde.fields.FieldBase`): An example of the data that will be written to extract the grid and the data_shape info (dict): Supplies extra information that is stored in the storage """ if self._writer: raise RuntimeError(f"{self.__class__.__name__} is already in writing mode") if self._reader: self._reader.close() # delete data if truncation is requested. This is for instance necessary # to remove older data with incompatible data_shape if self.write_mode == "truncate" or self.write_mode == "truncate_once": self.clear(clear_data_shape=True) # initialize the writing, setting current data shape super().start_writing(field, info=info) # initialize the file for writing with the correct mode self._logger.debug("Start writing with mode '%s'", self.write_mode) if self.write_mode == "truncate_once": self.write_mode = "append" # do not truncate for next writing elif self.write_mode == "readonly": raise RuntimeError("Cannot write in read-only mode") elif self.write_mode not in {"truncate", "append"}: raise ValueError( f"Unknown write mode `{self.write_mode}`. Possible values are " "`truncate_once`, `truncate`, and `append`" ) if info: self.info.update(info) self._writer = mr.storage.TrajectoryWriter( self.storage, loc=self.loc, attrs=self.info, mode="append" )
def _append_data(self, data: np.ndarray, time: float) -> None: """Append a new data set. Args: data (:class:`~numpy.ndarray`): The actual data time (float): The time point associated with the data """ assert self._writer is not None self._writer.append(data, float(time))
[docs] def end_writing(self) -> None: """Finalize the storage after writing. This makes sure the data is actually written to a file when self.keep_opened == False """ self.close()