Source code for silx.io.fioh5

# /*##########################################################################
# Copyright (C) 2021 Timo Fuchs
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# ############################################################################*/
"""This module provides a h5py-like API to access FioFile data.

API description
+++++++++++++++

Fiofile data structure exposed by this API:

::

  /
      n.1/
          title = "…"
          start_time = "…"
          instrument/
              fiofile/
                  comments = "…"
                  parameter = "…"
              comment = "…"
              parameter/
                  parameter_name = value

          measurement/
              colname0 = …
              colname1 = …



The top level scan number ``n.1`` is determined from the filename as in
``prefix_n.fio``. (e.g. ``eh1_sixc_00045.fio`` would give ``45.1``)
If no number is available, will use the filename instead.

``comments`` and ``parameter`` in group ``fiofile`` are the raw headers as they
appear in the original file, as a string of lines separated by newline
(``\\n``) characters. ``comment`` are the remaining comments,
which were not parsed.



The title is the content of the first comment header line
(e.g ``"ascan  ss1vo -4.55687 -0.556875  40 0.2"``).
The start_time is parsed from the second comment line.

Datasets are stored in the data format specified in the fio file header.

Scan data  (e.g. ``/1.1/measurement/colname0``) is accessed by column,
the dataset name ``colname0`` being the column label as defined in the
``Col …`` header line.

If a ``/`` character is present in a column label or in a motor name in the
original FIO file, it will be substituted with a ``%`` character in the
corresponding dataset name.

MCA data is not yet supported.

This reader requires a fio file as defined in
src/sardana/macroserver/recorders/storage.py of the Sardana project
(https://github.com/sardana-org/sardana).


Accessing data
++++++++++++++

Data and groups are accessed in :mod:`h5py` fashion::

    from silx.io.fioh5 import FioH5

    # Open a FioFile
    fiofh5 = FioH5("test_00056.fio")

    # using FioH5 as a regular group to access scans
    scan1group = fiofh5["56.1"]
    instrument_group = scan1group["instrument"]

    # alternative: full path access
    measurement_group = fiofh5["/56.1/measurement"]

    # accessing a scan data column by name as a 1D numpy array
    data_array = measurement_group["Pslit HGap"]


:class:`FioH5` files and groups provide a :meth:`keys` method::

    >>> fiofh5.keys()
    ['96.1', '97.1', '98.1']
    >>> fiofh5['96.1'].keys()
    ['title', 'start_time', 'instrument', 'measurement']

They can also be treated as iterators:

.. code-block:: python

    from silx.io import is_dataset

    for scan_group in FioH5("test_00056.fio"):
        dataset_names = [item.name in scan_group["measurement"] if
                         is_dataset(item)]
        print("Found data columns in scan " + scan_group.name)
        print(", ".join(dataset_names))

You can test for existence of data or groups::

    >>> "/1.1/measurement/Pslit HGap" in fiofh5
    True
    >>> "positioners" in fiofh5["/2.1/instrument"]
    True
    >>> "spam" in fiofh5["1.1"]
    False

"""

__authors__ = ["T. Fuchs"]
__license__ = "MIT"
__date__ = "09/04/2021"


import os

import datetime
import logging
import io

import h5py
import numpy

from silx import version as silx_version
from . import commonh5

from .spech5 import to_h5py_utf8

logger1 = logging.getLogger(__name__)

if h5py.version.version_tuple[0] < 3:
    text_dtype = h5py.special_dtype(vlen=str)  # old API
else:
    text_dtype = "O"  # variable-length string (supported as of h5py > 3.0)

ABORTLINENO = 5

dtypeConverter = {
    "STRING": text_dtype,
    "DOUBLE": "f8",
    "FLOAT": "f4",
    "INTEGER": "i8",
    "BOOLEAN": "?",
}


[docs] def is_fiofile(filename): """Test if a file is a FIO file, by checking if three consecutive lines start with *!*. Tests up to ABORTLINENO lines at the start of the file. :param str filename: File path :return: *True* if file is a FIO file, *False* if it is not a FIO file :rtype: bool """ if not os.path.isfile(filename): return False # test for presence of three ! in first lines with open(filename, "rb") as f: chunk = f.read(2500) count = 0 for i, line in enumerate(chunk.split(b"\n")): if line.startswith(b"!"): count += 1 if count >= 3: return True else: count = 0 if i >= ABORTLINENO: break return False
[docs] class FioFile(object): """This class opens a FIO file and reads the data.""" def __init__(self, filepath): # parse filename filename = os.path.basename(filepath) fnowithsuffix = filename.split("_")[-1] try: self.scanno = int(fnowithsuffix.split(".")[0]) except Exception: self.scanno = None logger1.warning("Cannot parse scan number of file %s", filename) with open(filepath, "r") as fiof: prev = 0 line_counter = 0 while True: line = fiof.readline() if line.startswith("!"): # skip comments prev = fiof.tell() line_counter = 0 continue if line.startswith("%c"): # comment section line_counter = 0 self.commentsection = "" line = fiof.readline() while not line.startswith("%") and not line.startswith("!"): self.commentsection += line prev = fiof.tell() line = fiof.readline() if line.startswith("%p"): # parameter section line_counter = 0 self.parameterssection = "" line = fiof.readline() while not line.startswith("%") and not line.startswith("!"): self.parameterssection += line prev = fiof.tell() line = fiof.readline() if line.startswith("%d"): # data type definitions line_counter = 0 self.datacols = [] self.names = [] self.dtypes = [] line = fiof.readline() while line.startswith(" Col"): splitline = line.split() name = splitline[-2] self.names.append(name) dtype = dtypeConverter[splitline[-1]] self.dtypes.append(dtype) self.datacols.append((name, dtype)) prev = fiof.tell() line = fiof.readline() fiof.seek(prev) break line_counter += 1 if line_counter > ABORTLINENO: raise IOError( "Invalid fio file: Found no data " "after %s lines" % ABORTLINENO ) self.data = numpy.loadtxt( fiof, dtype={"names": tuple(self.names), "formats": tuple(self.dtypes)}, comments="!", ) # ToDo: read only last line of file, # which sometimes contains the end of acquisition timestamp. self.parameter = {} # parse parameter section: try: for line in self.parameterssection.splitlines(): param, value = line.split(" = ") self.parameter[param] = value except Exception: logger1.warning("Cannot parse parameter section") # parse default sardana comments: username and start time try: acquiMarker = "acquisition started at" # indicates timestamp commentlines = self.commentsection.splitlines() if len(commentlines) >= 2: self.title = commentlines[0] l2 = commentlines[1] acqpos = l2.lower().find(acquiMarker) if acqpos < 0: raise Exception("acquisition str not found") self.user = l2[:acqpos][4:].strip() self.start_time = l2[acqpos + len(acquiMarker) :].strip() commentlines = commentlines[2:] self.comments = "\n".join(commentlines[2:]) except Exception: logger1.warning("Cannot parse default comment section") self.comments = self.commentsection self.user = "" self.start_time = "" self.title = ""
class FioH5NodeDataset(commonh5.Dataset): """This class inherits :class:`commonh5.Dataset`, to which it adds little extra functionality. The main additional functionality is the proxy behavior that allows to mimic the numpy array stored in this class. """ def __init__(self, name, data, parent=None, attrs=None): # get proper value types, to inherit from numpy # attributes (dtype, shape, size) if isinstance(data, str): # use unicode (utf-8 when saved to HDF5 output) value = to_h5py_utf8(data) elif isinstance(data, float): # use 32 bits for float scalars value = numpy.float32(data) elif isinstance(data, int): value = numpy.int_(data) else: # Enforce numpy array array = numpy.array(data) data_kind = array.dtype.kind if data_kind in ["S", "U"]: value = numpy.asarray(array, dtype=text_dtype) else: value = array # numerical data is already the correct datatype commonh5.Dataset.__init__(self, name, value, parent, attrs) def __getattr__(self, item): """Proxy to underlying numpy array methods.""" if hasattr(self[()], item): return getattr(self[()], item) raise AttributeError("FioH5NodeDataset has no attribute %s" % item)
[docs] class FioH5(commonh5.File): """This class reads a FIO file and exposes it as a *h5py.File*. It inherits :class:`silx.io.commonh5.Group` (via :class:`commonh5.File`), which implements most of its API. """ def __init__(self, filename, order=1): """ :param filename: Path to FioFile in filesystem :type filename: str """ if isinstance(filename, io.IOBase): # see https://github.com/silx-kit/silx/issues/858 filename = filename.name if not is_fiofile(filename): raise IOError("File %s is not a FIO file." % filename) try: fiof = FioFile(filename) # reads complete file except Exception as e: raise IOError("FIO file %s cannot be read.") from e attrs = { "NX_class": to_h5py_utf8("NXroot"), "file_time": to_h5py_utf8(datetime.datetime.now().isoformat()), "file_name": to_h5py_utf8(filename), "creator": to_h5py_utf8("silx fioh5 %s" % silx_version), } commonh5.File.__init__(self, filename, attrs=attrs) if fiof.scanno is not None: scan_key = "%s.%s" % (fiof.scanno, int(order)) else: scan_key = os.path.splitext(os.path.basename(filename))[0] scan_group = FioScanGroup(scan_key, parent=self, scan=fiof) self.add_node(scan_group)
class FioScanGroup(commonh5.Group): def __init__(self, scan_key, parent, scan): """ :param parent: parent Group :param str scan_key: Scan key (e.g. "1.1") :param scan: FioFile object """ if hasattr(scan, "user"): userattr = to_h5py_utf8(scan.user) else: userattr = to_h5py_utf8("") commonh5.Group.__init__( self, scan_key, parent=parent, attrs={"NX_class": to_h5py_utf8("NXentry"), "user": userattr}, ) # 'title', 'start_time' and 'user' are defaults # in Sardana created files: if hasattr(scan, "title"): title = scan.title else: title = scan_key # use scan number as default title self.add_node( FioH5NodeDataset(name="title", data=to_h5py_utf8(title), parent=self) ) if hasattr(scan, "start_time"): start_time = scan.start_time self.add_node( FioH5NodeDataset( name="start_time", data=to_h5py_utf8(start_time), parent=self ) ) self.add_node( FioH5NodeDataset( name="comments", data=to_h5py_utf8(scan.comments), parent=self ) ) self.add_node(FioInstrumentGroup(parent=self, scan=scan)) self.add_node(FioMeasurementGroup(parent=self, scan=scan)) class FioMeasurementGroup(commonh5.Group): def __init__(self, parent, scan): """ :param parent: parent Group :param scan: FioFile object """ commonh5.Group.__init__( self, name="measurement", parent=parent, attrs={"NX_class": to_h5py_utf8("NXcollection")}, ) for label in scan.names: safe_label = label.replace("/", "%") self.add_node( FioH5NodeDataset(name=safe_label, data=scan.data[label], parent=self) ) class FioInstrumentGroup(commonh5.Group): def __init__(self, parent, scan): """ :param parent: parent Group :param scan: FioFile object """ commonh5.Group.__init__( self, name="instrument", parent=parent, attrs={"NX_class": to_h5py_utf8("NXinstrument")}, ) self.add_node(FioParameterGroup(parent=self, scan=scan)) self.add_node(FioFileGroup(parent=self, scan=scan)) self.add_node( FioH5NodeDataset( name="comment", data=to_h5py_utf8(scan.comments), parent=self ) ) class FioFileGroup(commonh5.Group): def __init__(self, parent, scan): """ :param parent: parent Group :param scan: FioFile object """ commonh5.Group.__init__( self, name="fiofile", parent=parent, attrs={"NX_class": to_h5py_utf8("NXcollection")}, ) self.add_node( FioH5NodeDataset( name="comments", data=to_h5py_utf8(scan.commentsection), parent=self ) ) self.add_node( FioH5NodeDataset( name="parameter", data=to_h5py_utf8(scan.parameterssection), parent=self ) ) class FioParameterGroup(commonh5.Group): def __init__(self, parent, scan): """ :param parent: parent Group :param scan: FioFile object """ commonh5.Group.__init__( self, name="parameter", parent=parent, attrs={"NX_class": to_h5py_utf8("NXcollection")}, ) for label in scan.parameter: safe_label = label.replace("/", "%") self.add_node( FioH5NodeDataset( name=safe_label, data=to_h5py_utf8(scan.parameter[label]), parent=self, ) )