"""
Defines a class storing data on the file system using the hierarchical data format (hdf)
.. codeauthor:: David Zwicker <david.zwicker@ds.mpg.de>
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any, Literal
import numpy as np
from numpy.typing import DTypeLike
from ..fields.base import FieldBase
from ..tools import mpi
from ..tools.misc import ensure_directory_exists, hdf_write_attributes
from .base import InfoDict, StorageBase, WriteModeType
[docs]
class FileStorage(StorageBase):
"""store discretized fields in a hdf5 file"""
def __init__(
self,
filename: str | Path,
*,
info: InfoDict | None = None,
write_mode: WriteModeType = "truncate_once",
max_length: int | None = None,
compression: bool = True,
keep_opened: bool = True,
check_mpi: bool = True,
):
"""
Args:
filename (str):
The path to the hdf5-file where the data is stored
info (dict):
Supplies extra information that is stored in the storage
write_mode (str):
Determines how new data is added to already existing data. Possible
values are: 'append' (data is always appended), 'truncate' (data is
cleared every time this storage is used for writing), or 'truncate_once'
(data is cleared for the first writing, but appended subsequently).
Alternatively, specifying 'readonly' will disable writing completely.
max_length (int, optional):
Maximal number of entries that will be stored in the file. This can be
used to preallocate data, which can lead to smaller files, but is also
less flexible. Giving `max_length = None`, allows for arbitrarily large
data, which might lead to larger files.
compression (bool):
Whether to store the data in compressed form. Automatically enabled
chunked storage.
keep_opened (bool):
Flag indicating whether the file should be kept opened after each
writing. If `False`, the file will be closed after writing a dataset.
This keeps the file in a consistent state, but also requires more work
before data can be written.
check_mpi (bool):
If True, files will only be opened in the main node for an parallel
simulation using MPI. This flag has no effect in serial code.
"""
super().__init__(info=info, write_mode=write_mode)
self.filename = Path(filename)
self.compression = compression
self.keep_opened = keep_opened
self.check_mpi = check_mpi
self._file: Any = None
self._is_writing = False
self._data_length: int = None # type: ignore
self._max_length: int | None = max_length
if not self.check_mpi or mpi.is_main:
# we are on the main process and can thus open the file directly
if self.filename.is_file() and self.filename.stat().st_size > 0:
try:
self._open("reading")
except (OSError, KeyError):
self.close()
self._logger.warning(
f"File `{filename}` could not be opened for reading"
)
def __del__(self):
self.close() # ensure open files are closed when the FileStorage is deleted
@property
def _file_state(self) -> str:
"""str: the state that the file is currently in"""
if self._file is None:
return "closed"
elif self._file.mode == "r":
return "reading"
elif self._file.mode == "r+":
return "writing"
else:
raise NotImplementedError(f"Do not understand mode `{self._file.mode}")
[docs]
def close(self) -> None:
"""close the currently opened file"""
if self._file is not None:
self._logger.info(f"Close file `{self.filename}`")
self._file.close()
self._file = None
self._data_length = None # type: ignore
def __enter__(self) -> FileStorage:
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
self.close()
def _create_hdf_dataset(
self,
name: str,
shape: tuple[int, ...] = tuple(),
dtype: DTypeLike = np.double,
):
"""create a hdf5 dataset with the given name and data_shape
Args:
name (str): Identifier of the hdf5 dataset
shape (tuple): Data shape of the dataset
dtype: The data type of the dataset
"""
if self.compression:
kwargs = {"chunks": (1,) + shape, "compression": "gzip"}
else:
kwargs = {}
if self._max_length:
shape = (self._max_length,) + shape
return self._file.create_dataset(name, shape=shape, dtype=dtype, **kwargs)
else:
return self._file.create_dataset(
name,
shape=(0,) + shape,
dtype=dtype,
maxshape=(None,) + shape,
**kwargs,
)
def _open(
self,
mode: Literal["reading", "appending", "writing", "closed"] = "reading",
info: InfoDict | None = None,
) -> None:
"""open the hdf file in a particular mode
Args:
mode (str):
Determines how the file is opened. Possible values are `reading`,
`appending`, `writing`, and `closed`.
info (dict):
Supplies extra information that is stored in the storage
"""
import h5py # lazy loading so it's not a hard dependence
if self.check_mpi and not mpi.is_main:
self._logger.warning("Do not open file on MPI child nodes")
return
state = self._file_state
if mode == "reading":
# open file for reading
if state in ["reading", "appending", "writing"]:
return # we can read data when file is open for writing
# close file to open it again for reading or appending
if self._file:
self._file.close()
self._logger.info(f"Open file `{self.filename}` for reading")
self._file = h5py.File(self.filename, mode="r")
self._times = self._file["times"]
self._data = self._file["data"]
for k, v in self._file.attrs.items():
self.info[k] = json.loads(v)
if info:
self.info.update(info)
self._data_shape = self.data.shape[1:]
self._dtype = self.data.dtype
self._data_length = self.info.get("data_length") # type: ignore
elif mode == "appending":
# open file for writing without deleting data
if state in ["appending", "writing"]:
return # we are already in a mode where we can append data
if self.keep_opened and self._is_writing:
raise RuntimeError(
"Currently writing data, so mode cannot be switched."
)
self.close()
# open file for reading or appending
self._logger.info(f"Open file `{self.filename}` for appending")
self._file = h5py.File(self.filename, mode="a")
if "times" in self._file and "data" in self._file:
# extract data from datasets in the existing file
self._times = self._file["times"]
self._data = self._file["data"]
# extract information
for k, v in self._file.attrs.items():
self.info[k] = json.loads(v)
self._data_shape = self.data.shape[1:]
self._dtype = self.data.dtype
self._data_length = self.info.get("data_length", self.data.shape[0])
else:
# create new datasets
self._times = self._create_hdf_dataset("times")
self._data = self._create_hdf_dataset(
"data", self.data_shape, dtype=self._dtype
)
self._data_length = 0
if info:
self.info.update(info)
elif mode == "writing":
# open file for writing data; delete potentially present data
if self._is_writing:
raise RuntimeError("Currently writing data, so mode cannot be switched")
if self._file:
self.close()
else:
ensure_directory_exists(self.filename.parent)
self._logger.info(f"Open file `{self.filename}` for writing")
self._file = h5py.File(self.filename, "w")
self._times = self._create_hdf_dataset("times")
self._data = self._create_hdf_dataset(
"data", self.data_shape, dtype=self._dtype
)
if info:
self.info.update(info)
self._data_length = 0 # start writing from the beginning
elif mode == "closed":
self.close()
else:
raise RuntimeError(f"Mode `{mode}` not implemented")
def __len__(self):
"""return the number of stored items, i.e., time steps"""
# determine size of data in HDF5 file
try:
length = len(self.times)
except OSError:
length = 0
if self._data_length is None:
return length
else:
# size of stored data is smaller since preallocation was used
return min(length, self._data_length)
@property
def times(self):
""":class:`~numpy.ndarray`: The times at which data is available"""
self._open("reading")
return self._times
@property
def data(self):
""":class:`~numpy.ndarray`: The actual data for all time"""
self._open("reading")
return self._data
[docs]
def clear(self, clear_data_shape: bool = False):
"""truncate the storage by removing all stored data.
Args:
clear_data_shape (bool):
Flag determining whether the data shape is also deleted.
"""
if self.check_mpi and not mpi.is_main:
self._logger.warning("Do not clear file on MPI child nodes")
return
if self._is_writing:
# remove data from opened file
self._logger.info("Truncate data in hdf5 file")
if "times" in self._file:
del self._file["times"]
self._times = self._create_hdf_dataset("times")
if "data" in self._file:
del self._file["data"]
self._data = self._create_hdf_dataset(
"data", self.data_shape, dtype=self.dtype
)
self._data_length = 0 # start writing from start
elif self.filename.is_file():
# we do not currently write to the file => clear by removing file completely
self._logger.info("Truncate data by removing hdf5 file")
self._open("closed") # close file if it was opened
self.filename.unlink() # remove file
else:
self._logger.debug("Truncate is no-op since file does not exist")
super().clear(clear_data_shape=clear_data_shape)
[docs]
def start_writing(self, field: FieldBase, info: InfoDict | None = None) -> None:
"""initialize the storage for writing data
Args:
field (:class:`~pde.fields.FieldBase`):
An example of the data that will be written to extract the grid and the
data_shape
info (dict):
Supplies extra information that is stored in the storage
"""
if self._is_writing:
raise RuntimeError(f"{self.__class__.__name__} is already in writing mode")
# delete data if truncation is requested. This is for instance necessary
# to remove older data with incompatible data_shape
if self.write_mode == "truncate" or self.write_mode == "truncate_once":
self.clear(clear_data_shape=True)
# initialize the writing, setting current data shape
super().start_writing(field, info=info)
# initialize the file for writing with the correct mode
self._logger.debug(f"Start writing with mode `{self.write_mode}`")
if self.write_mode == "truncate_once":
self._open("writing", info)
self.write_mode = "append" # do not truncate for next writing
elif self.write_mode == "truncate":
self._open("writing", info)
elif self.write_mode == "append":
self._open("appending", info)
elif self.write_mode == "readonly":
raise RuntimeError("Cannot write in read-only mode")
else:
raise ValueError(
f"Unknown write mode `{self.write_mode}`. Possible values are "
"`truncate_once`, `truncate`, and `append`"
)
if not self.keep_opened:
# store extra information as attributes
hdf_write_attributes(self._file, self.info)
self._is_writing = True
def _append_data(self, data: np.ndarray, time: float) -> None:
"""append a new data set
Args:
data (:class:`~numpy.ndarray`): The actual data
time (float): The time point associated with the data
"""
if self.keep_opened:
if not self._is_writing or self._data_length is None:
raise RuntimeError(
"Writing not initialized. Call "
f"`{self.__class__.__name__}.start_writing`"
)
else:
# need to reopen the file
self._open("appending")
# write the new data
if self._data_length >= len(self._data):
self._data.resize((self._data_length + 1,) + self.data_shape)
self._data[self._data_length] = data
# write the new time
if time is None:
if len(self._times) == 0:
time = 0
else:
time = self._times[self._data_length - 1] + 1
if self._data_length >= len(self._times):
self._times.resize((self._data_length + 1,))
self._times[self._data_length] = time
self._data_length += 1
self.info["data_length"] = self._data_length
if not self.keep_opened:
self.close()
[docs]
def end_writing(self) -> None:
"""finalize the storage after writing.
This makes sure the data is actually written to a file when
self.keep_opened == False
"""
if not self._is_writing:
return # writing mode was already ended
self._logger.debug("End writing")
# store extra information as attributes
hdf_write_attributes(self._file, self.info)
self._file.flush()
self.close()
self._is_writing = False