Source code for silx.io.h5py_utils

# /*##########################################################################
# Copyright (C) 2016-2023 European Synchrotron Radiation Facility
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# ############################################################################*/
"""
This module provides utility methods on top of h5py, mainly to handle
parallel writing and reading.
"""

__authors__ = ["W. de Nolf"]
__license__ = "MIT"
__date__ = "28/11/2023"


import os
import sys
import traceback
import logging
import h5py

from .._version import calc_hexversion
from ..utils import retry as retry_mod
from silx.utils.deprecation import deprecated_warning

_logger = logging.getLogger(__name__)

IS_WINDOWS = sys.platform == "win32"

H5PY_HEX_VERSION = calc_hexversion(*h5py.version.version_tuple[:3])
HDF5_HEX_VERSION = calc_hexversion(*h5py.version.hdf5_version_tuple[:3])

if h5py.version.version_tuple >= (3, 10):
    HDF5_SWMR_VERSION = 1, 9, 178
else:
    HDF5_SWMR_VERSION = h5py.get_config().swmr_min_hdf5_version[:3]
HAS_SWMR = HDF5_HEX_VERSION >= calc_hexversion(*HDF5_SWMR_VERSION)

HAS_TRACK_ORDER = H5PY_HEX_VERSION >= calc_hexversion(2, 9, 0)

if h5py.version.hdf5_version_tuple[:2] == (1, 10):
    HDF5_HAS_LOCKING_ARGUMENT = HDF5_HEX_VERSION >= calc_hexversion(1, 10, 7)
else:
    HDF5_HAS_LOCKING_ARGUMENT = HDF5_HEX_VERSION >= calc_hexversion(1, 12, 1)
H5PY_HAS_LOCKING_ARGUMENT = H5PY_HEX_VERSION >= calc_hexversion(3, 5, 0)
HAS_LOCKING_ARGUMENT = HDF5_HAS_LOCKING_ARGUMENT & H5PY_HAS_LOCKING_ARGUMENT

LATEST_LIBVER_IS_V108 = HDF5_HEX_VERSION < calc_hexversion(1, 10, 0)


def _libver_low_bound_is_v108(libver) -> bool:
    if libver is None:
        return True
    if LATEST_LIBVER_IS_V108:
        return True
    if isinstance(libver, str):
        low = libver
    else:
        low = libver[0]
    if low == "latest":
        return False
    return low == "v108"


def _hdf5_file_locking(mode="r", locking=None, swmr=None, libver=None, **_):
    """Concurrent access by disabling file locking is not supported
    in these cases:

        * mode != "r": causes file corruption
        * SWMR: does not work
        * libver > v108 and file already locked: does not work
        * windows and HDF5_HAS_LOCKING_ARGUMENT and file already locked: does not work

    :param str or None mode: read-only by default
    :param bool or None locking: by default it is disabled for `mode='r'`
                                 and `swmr=False` and enabled for all
                                 other modes.
    :param bool or None swmr: try both modes when `mode='r'` and `swmr=None`
    :param None or str or tuple libver:
    :returns bool:
    """
    if locking is None:
        locking = bool(mode != "r" or swmr)
    if not locking:
        if mode != "r":
            raise ValueError("Locking is mandatory for HDF5 writing")
        if swmr:
            raise ValueError("Locking is mandatory for HDF5 SWMR mode")
        if IS_WINDOWS and HDF5_HAS_LOCKING_ARGUMENT:
            _logger.debug(
                "Non-locking readers will fail when a writer has already locked the HDF5 file (this restriction applies to libhdf5 >= 1.12.1 or libhdf5 >= 1.10.7 on Windows)"
            )
        if not _libver_low_bound_is_v108(libver):
            _logger.debug(
                "Non-locking readers will fail when a writer has already locked the HDF5 file (this restriction applies to libver >= v110)"
            )
    return locking


def _is_h5py_exception(e):
    """
    :param BaseException e:
    :returns bool:
    """
    for frame in traceback.walk_tb(e.__traceback__):
        for namespace in (frame[0].f_locals, frame[0].f_globals):
            if namespace.get("__package__", None) == "h5py":
                return True
    return False


def _retry_h5py_error(e):
    """
    :param BaseException e:
    :returns bool:
    """
    if _is_h5py_exception(e):
        if isinstance(e, (OSError, RuntimeError)):
            return True
        elif isinstance(e, KeyError):
            # For example this needs to be retried:
            # KeyError: 'Unable to open object (bad object header version number)'
            return "Unable to open object" in str(e)
    elif isinstance(e, retry_mod.RetryError):
        return True
    return False


[docs] def retry(**kw): r"""Decorator for a method that needs to be executed until it not longer fails on HDF5 IO. Mainly used for reading an HDF5 file that is being written. :param \**kw: see `silx.utils.retry` """ kw.setdefault("retry_on_error", _retry_h5py_error) return retry_mod.retry(**kw)
[docs] def retry_contextmanager(**kw): r"""Decorator to make a context manager from a method that needs to be entered until it not longer fails on HDF5 IO. Mainly used for reading an HDF5 file that is being written. :param \**kw: see `silx.utils.retry_contextmanager` """ kw.setdefault("retry_on_error", _retry_h5py_error) return retry_mod.retry_contextmanager(**kw)
[docs] def retry_in_subprocess(**kw): r"""Same as `retry` but it also retries segmentation faults. On Window you cannot use this decorator with the "@" syntax: .. code-block:: python def _method(*args, **kw): ... method = retry_in_subprocess()(_method) :param \**kw: see `silx.utils.retry_in_subprocess` """ kw.setdefault("retry_on_error", _retry_h5py_error) return retry_mod.retry_in_subprocess(**kw)
[docs] def group_has_end_time(h5item): """Returns True when the HDF5 item is a Group with an "end_time" dataset. A reader can use this as an indication that the Group has been fully written (at least if the writer supports this). :param Union[h5py.Group,h5py.Dataset] h5item: :returns bool: """ if isinstance(h5item, h5py.Group): return "end_time" in h5item else: return False
[docs] @retry_contextmanager() def open_item(filename, name, retry_invalid=False, validate=None, **open_options): r"""Yield an HDF5 dataset or group (retry until it can be instantiated). :param str filename: :param bool retry_invalid: retry when item is missing or not valid :param callable or None validate: :param \**open_options: see `File.__init__` :yields Dataset, Group or None: """ with File(filename, **open_options) as h5file: try: item = h5file[name] except KeyError as e: if "doesn't exist" in str(e): if retry_invalid: raise retry_mod.RetryError else: item = None else: raise if callable(validate) and item is not None: if not validate(item): if retry_invalid: raise retry_mod.RetryError else: item = None yield item
def _top_level_names(filename, include_only=group_has_end_time, **open_options): r"""Return all valid top-level HDF5 names. :param str filename: :param callable or None include_only: :param \**open_options: see `File.__init__` :returns list(str): """ with File(filename, **open_options) as h5file: try: if callable(include_only): return [name for name in h5file["/"] if include_only(h5file[name])] else: return list(h5file["/"]) except KeyError: raise retry_mod.RetryError top_level_names = retry()(_top_level_names) if hasattr(sys, "frozen") and sys.frozen: # multiprocessing not working on frozen binaries safe_top_level_names = top_level_names else: safe_top_level_names = retry_in_subprocess()(_top_level_names)
[docs] class Hdf5FileLockingManager: """Manage HDF5 file locking in the current process through the HDF5_USE_FILE_LOCKING environment variable. """ def __init__(self) -> None: self._hdf5_file_locking = None self._nfiles_open = 0 def opened(self): self._add_nopen(1) def closed(self): self._add_nopen(-1) if not self._nfiles_open: self._restore_locking_env() def set_locking(self, locking): if self._nfiles_open: self._check_locking_env(locking) else: self._set_locking_env(locking) def _add_nopen(self, v): self._nfiles_open = max(self._nfiles_open + v, 0) def _set_locking_env(self, enable): self._backup_locking_env() if enable: os.environ["HDF5_USE_FILE_LOCKING"] = "TRUE" elif enable is None: try: del os.environ["HDF5_USE_FILE_LOCKING"] except KeyError: pass else: os.environ["HDF5_USE_FILE_LOCKING"] = "FALSE" def _get_locking_env(self): v = os.environ.get("HDF5_USE_FILE_LOCKING") if v == "TRUE": return True elif v is None: return None else: return False def _check_locking_env(self, enable): if enable != self._get_locking_env(): if enable: raise RuntimeError( "Close all HDF5 files before enabling HDF5 file locking" ) else: raise RuntimeError( "Close all HDF5 files before disabling HDF5 file locking" ) def _backup_locking_env(self): v = os.environ.get("HDF5_USE_FILE_LOCKING") if v is None: self._hdf5_file_locking = None else: self._hdf5_file_locking = v == "TRUE" def _restore_locking_env(self): self._set_locking_env(self._hdf5_file_locking) self._hdf5_file_locking = None
[docs] class File(h5py.File): """Takes care of HDF5 file locking and SWMR mode without the need to handle those explicitely. When file locking is managed through the HDF5_USE_FILE_LOCKING environment variable, you cannot open different files simultaneously with different modes. """ _SWMR_LIBVER = "latest" if HAS_LOCKING_ARGUMENT: _LOCKING_MGR = None else: _LOCKING_MGR = Hdf5FileLockingManager() def __init__( self, filename, mode=None, locking=None, enable_file_locking=None, swmr=None, libver=None, **kwargs, ): r"""The arguments `locking` and `swmr` should not be specified explicitly for normal use cases. :param str filename: :param str or None mode: read-only by default :param bool or None locking: by default it is disabled for `mode='r'` and `swmr=False` and enabled for all other modes. :param bool or None enable_file_locking: deprecated :param bool or None swmr: try both modes when `mode='r'` and `swmr=None` :param None or str or tuple libver: :param \**kwargs: see `h5py.File.__init__` """ # File locking behavior has changed in recent versions of libhdf5 if HDF5_HAS_LOCKING_ARGUMENT != H5PY_HAS_LOCKING_ARGUMENT: _logger.critical( "The version of libhdf5 ({}) used by h5py ({}) is not supported: " "Do not expect file locking to work.".format( h5py.version.hdf5_version, h5py.version.version ) ) if mode is None: mode = "r" elif mode not in ("r", "w", "w-", "x", "a", "r+"): raise ValueError("invalid mode {}".format(mode)) if not HAS_SWMR: swmr = False if swmr and libver is None: libver = self._SWMR_LIBVER if enable_file_locking is not None: deprecated_warning( type_="argument", name="enable_file_locking", replacement="locking", since_version="1.0", ) if locking is None: locking = enable_file_locking locking = _hdf5_file_locking( mode=mode, locking=locking, swmr=swmr, libver=libver ) if self._LOCKING_MGR is None: kwargs.setdefault("locking", locking) else: self._LOCKING_MGR.set_locking(locking) if HAS_TRACK_ORDER: kwargs.setdefault("track_order", True) try: super().__init__(filename, mode=mode, swmr=swmr, libver=libver, **kwargs) except OSError as e: # wlock wSWMR rlock rSWMR OSError: Unable to open file (...) # 1 TRUE FALSE FALSE FALSE - # 2 TRUE FALSE FALSE TRUE - # 3 TRUE FALSE TRUE FALSE unable to lock file, errno = 11, error message = 'Resource temporarily unavailable' # 4 TRUE FALSE TRUE TRUE unable to lock file, errno = 11, error message = 'Resource temporarily unavailable' # 5 TRUE TRUE FALSE FALSE file is already open for write (may use <h5clear file> to clear file consistency flags) # 6 TRUE TRUE FALSE TRUE - # 7 TRUE TRUE TRUE FALSE file is already open for write (may use <h5clear file> to clear file consistency flags) # 8 TRUE TRUE TRUE TRUE - if ( mode == "r" and swmr is None and "file is already open for write" in str(e) ): # Try reading in SWMR mode (situation 5 and 7) swmr = True if libver is None: libver = self._SWMR_LIBVER super().__init__( filename, mode=mode, swmr=swmr, libver=libver, **kwargs ) else: raise else: self._file_open_callback() try: if mode != "r" and swmr: # Try setting writer in SWMR mode self.swmr_mode = True except Exception: self.close() raise
[docs] def close(self): super().close() self._file_close_callback()
def _file_open_callback(self): if self._LOCKING_MGR is not None: self._LOCKING_MGR.opened() def _file_close_callback(self): if self._LOCKING_MGR is not None: self._LOCKING_MGR.closed()