"""Defines a class storing data on the file system using the hierarchical data format
(hdf)
.. codeauthor:: David Zwicker <david.zwicker@ds.mpg.de>
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any, Literal
import numpy as np
from numpy.typing import DTypeLike
from ..fields.base import FieldBase
from ..tools.misc import ensure_directory_exists, hdf_write_attributes
from .base import InfoDict, StorageBase, WriteModeType
[docs]
class FileStorage(StorageBase):
"""Store discretized fields in a hdf5 file."""
def __init__(
self,
filename: str | Path,
*,
info: InfoDict | None = None,
write_mode: WriteModeType = "truncate_once",
max_length: int | None = None,
compression: bool = True,
keep_opened: bool = True,
check_mpi: bool = True,
):
"""
Args:
filename (str):
The path to the hdf5-file where the data is stored
info (dict):
Supplies extra information that is stored in the storage
write_mode (str):
Determines how new data is added to already existing data. Possible
values are: 'append' (data is always appended), 'truncate' (data is
cleared every time this storage is used for writing), or 'truncate_once'
(data is cleared for the first writing, but appended subsequently).
Alternatively, specifying 'readonly' will disable writing completely.
max_length (int, optional):
Maximal number of entries that will be stored in the file. This can be
used to preallocate data, which can lead to smaller files, but is also
less flexible. Giving `max_length = None`, allows for arbitrarily large
data, which might lead to larger files.
compression (bool):
Whether to store the data in compressed form. Automatically enabled
chunked storage.
keep_opened (bool):
Flag indicating whether the file should be kept opened after each
writing. If `False`, the file will be closed after writing a dataset.
This keeps the file in a consistent state, but also requires more work
before data can be written.
check_mpi (bool):
If True, files will only be opened in the main node for an parallel
simulation using MPI. This flag has no effect in serial code.
"""
from ..tools import mpi
super().__init__(info=info, write_mode=write_mode)
self.filename = Path(filename)
self.compression = compression
self.keep_opened = keep_opened
self.check_mpi = check_mpi
self._file: Any = None
self._is_writing = False
self._data_length: int = None # type: ignore
self._max_length: int | None = max_length
if not self.check_mpi or mpi.is_main: # noqa: SIM102
# we are on the main process and can thus open the file directly
if self.filename.is_file() and self.filename.stat().st_size > 0:
try:
self._open("reading")
except (OSError, KeyError):
self.close()
self._logger.warning(
"File `%s` could not be opened for reading", filename
)
def __del__(self):
self.close() # ensure open files are closed when the FileStorage is deleted
@property
def _file_state(self) -> str:
"""str: the state that the file is currently in"""
if self._file is None:
return "closed"
elif self._file.mode == "r":
return "reading"
elif self._file.mode == "r+":
return "writing"
else:
raise NotImplementedError(f"Do not understand mode `{self._file.mode}")
[docs]
def close(self) -> None:
"""Close the currently opened file."""
if self._file is not None:
self._logger.info("Close file `%s`", self.filename)
self._file.close()
self._file = None
self._data_length = None # type: ignore
def __enter__(self) -> FileStorage:
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
self.close()
def _create_hdf_dataset(
self,
name: str,
shape: tuple[int, ...] = (),
dtype: DTypeLike = np.double,
):
"""Create a hdf5 dataset with the given name and data_shape.
Args:
name (str): Identifier of the hdf5 dataset
shape (tuple): Data shape of the dataset
dtype: The data type of the dataset
"""
if self.compression:
kwargs = {"chunks": (1,) + shape, "compression": "gzip"}
else:
kwargs = {}
if self._max_length:
shape = (self._max_length,) + shape
return self._file.create_dataset(name, shape=shape, dtype=dtype, **kwargs)
else:
return self._file.create_dataset(
name,
shape=(0,) + shape,
dtype=dtype,
maxshape=(None,) + shape,
**kwargs,
)
def _open(
self,
mode: Literal["reading", "appending", "writing", "closed"] = "reading",
info: InfoDict | None = None,
) -> None:
"""Open the hdf file in a particular mode.
Args:
mode (str):
Determines how the file is opened. Possible values are `reading`,
`appending`, `writing`, and `closed`.
info (dict):
Supplies extra information that is stored in the storage
"""
import h5py # lazy loading so it's not a hard dependence
from ..tools import mpi
if self.check_mpi and not mpi.is_main:
self._logger.warning("Do not open file on MPI child nodes")
return
state = self._file_state
if mode == "reading":
# open file for reading
if state in ["reading", "appending", "writing"]:
return # we can read data when file is open for writing
# close file to open it again for reading or appending
if self._file:
self._file.close()
self._logger.info("Open file `%s` for reading", self.filename)
self._file = h5py.File(self.filename, mode="r")
self._times = self._file["times"]
self._data = self._file["data"]
for k, v in self._file.attrs.items():
self.info[k] = json.loads(v)
if info:
self.info.update(info)
self._data_shape = self.data.shape[1:]
self._dtype = self.data.dtype
self._data_length = self.info.get("data_length") # type: ignore
elif mode == "appending":
# open file for writing without deleting data
if state in ["appending", "writing"]:
return # we are already in a mode where we can append data
if self.keep_opened and self._is_writing:
raise RuntimeError(
"Currently writing data, so mode cannot be switched."
)
self.close()
# open file for reading or appending
self._logger.info("Open file `%s` for appending", self.filename)
self._file = h5py.File(self.filename, mode="a")
if "times" in self._file and "data" in self._file:
# extract data from datasets in the existing file
self._times = self._file["times"]
self._data = self._file["data"]
# extract information
for k, v in self._file.attrs.items():
self.info[k] = json.loads(v)
self._data_shape = self.data.shape[1:]
self._dtype = self.data.dtype
self._data_length = self.info.get("data_length", self.data.shape[0])
else:
# create new datasets
self._times = self._create_hdf_dataset("times")
self._data = self._create_hdf_dataset(
"data", self.data_shape, dtype=self._dtype
)
self._data_length = 0
if info:
self.info.update(info)
elif mode == "writing":
# open file for writing data; delete potentially present data
if self._is_writing:
raise RuntimeError("Currently writing data, so mode cannot be switched")
if self._file:
self.close()
else:
ensure_directory_exists(self.filename.parent)
self._logger.info("Open file `%s` for writing", self.filename)
self._file = h5py.File(self.filename, "w")
self._times = self._create_hdf_dataset("times")
self._data = self._create_hdf_dataset(
"data", self.data_shape, dtype=self._dtype
)
if info:
self.info.update(info)
self._data_length = 0 # start writing from the beginning
elif mode == "closed":
self.close()
else:
raise RuntimeError(f"Mode `{mode}` not implemented")
def __len__(self):
"""Return the number of stored items, i.e., time steps."""
# determine size of data in HDF5 file
try:
length = len(self.times)
except OSError:
length = 0
if self._data_length is None:
return length
else:
# size of stored data is smaller since preallocation was used
return min(length, self._data_length)
@property
def times(self):
""":class:`~numpy.ndarray`: The times at which data is available."""
self._open("reading")
return self._times
@property
def data(self):
""":class:`~numpy.ndarray`: The actual data for all time."""
self._open("reading")
return self._data
[docs]
def clear(self, clear_data_shape: bool = False):
"""Truncate the storage by removing all stored data.
Args:
clear_data_shape (bool):
Flag determining whether the data shape is also deleted.
"""
from ..tools import mpi
if self.check_mpi and not mpi.is_main:
self._logger.warning("Do not clear file on MPI child nodes")
return
if self._is_writing:
# remove data from opened file
self._logger.info("Truncate data in hdf5 file")
if "times" in self._file:
del self._file["times"]
self._times = self._create_hdf_dataset("times")
if "data" in self._file:
del self._file["data"]
self._data = self._create_hdf_dataset(
"data", self.data_shape, dtype=self.dtype
)
self._data_length = 0 # start writing from start
elif self.filename.is_file():
# we do not currently write to the file => clear by removing file completely
self._logger.info("Truncate data by removing hdf5 file")
self._open("closed") # close file if it was opened
self.filename.unlink() # remove file
else:
self._logger.debug("Truncate is no-op since file does not exist")
super().clear(clear_data_shape=clear_data_shape)
[docs]
def start_writing(self, field: FieldBase, info: InfoDict | None = None) -> None:
"""Initialize the storage for writing data.
Args:
field (:class:`~pde.fields.FieldBase`):
An example of the data that will be written to extract the grid and the
data_shape
info (dict):
Supplies extra information that is stored in the storage
"""
if self._is_writing:
raise RuntimeError(f"{self.__class__.__name__} is already in writing mode")
# delete data if truncation is requested. This is for instance necessary
# to remove older data with incompatible data_shape
if self.write_mode == "truncate" or self.write_mode == "truncate_once":
self.clear(clear_data_shape=True)
# initialize the writing, setting current data shape
super().start_writing(field, info=info)
# initialize the file for writing with the correct mode
self._logger.debug("Start writing with mode '%s'", self.write_mode)
if self.write_mode == "truncate_once":
self._open("writing", info)
self.write_mode = "append" # do not truncate for next writing
elif self.write_mode == "truncate":
self._open("writing", info)
elif self.write_mode == "append":
self._open("appending", info)
elif self.write_mode == "readonly":
raise RuntimeError("Cannot write in read-only mode")
else:
raise ValueError(
f"Unknown write mode `{self.write_mode}`. Possible values are "
"`truncate_once`, `truncate`, and `append`"
)
if not self.keep_opened:
# store extra information as attributes
hdf_write_attributes(self._file, self.info)
self._is_writing = True
def _append_data(self, data: np.ndarray, time: float) -> None:
"""Append a new data set.
Args:
data (:class:`~numpy.ndarray`): The actual data
time (float): The time point associated with the data
"""
if self.keep_opened:
if not self._is_writing or self._data_length is None:
raise RuntimeError(
"Writing not initialized. Call "
f"`{self.__class__.__name__}.start_writing`"
)
else:
# need to reopen the file
self._open("appending")
# write the new data
if self._data_length >= len(self._data):
self._data.resize((self._data_length + 1,) + self.data_shape)
self._data[self._data_length] = data
# write the new time
if time is None:
if len(self._times) == 0:
time = 0
else:
time = self._times[self._data_length - 1] + 1
if self._data_length >= len(self._times):
self._times.resize((self._data_length + 1,))
self._times[self._data_length] = time
self._data_length += 1
self.info["data_length"] = self._data_length
if not self.keep_opened:
self.close()
[docs]
def end_writing(self) -> None:
"""Finalize the storage after writing.
This makes sure the data is actually written to a file when self.keep_opened ==
False
"""
if not self._is_writing:
return # writing mode was already ended
self._logger.debug("End writing")
# store extra information as attributes
hdf_write_attributes(self._file, self.info)
self._file.flush()
self.close()
self._is_writing = False