Skip to content

nabu.thirdparty.tomwer_load_flats_darks

[docs] module nabu.thirdparty.tomwer_load_flats_darks

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
"""
script embedding all function needed to read flats and darks.

For information calculation method is stored in the results/configuration
dictionary
"""

"""
IMPORTANT: this script is used as long as flat-fielding with "raw" flats/daks
is not implemented in nabu.
For now we load results from tomwer.
"""

import h5py
from silx.io.url import DataUrl
from tomoscan.io import HDF5File
from silx.io.utils import h5py_read_dataset
import logging

MAX_DEPTH = 2

logger = logging.getLogger(__name__)


def get_process_entries(root_node: h5py.Group, depth: int) -> tuple:
    """
    return the list of 'Nxtomo' entries at the root level

    :param str file_path:
    :return: list of valid Nxtomo node (ordered alphabetically)
    :rtype: tuple

    ..note: entries are sorted to insure consistency
    """

    def _get_entries(node, depth_):
        if isinstance(node, h5py.Dataset):
            return {}
        res_buf = {}
        if is_process_node(node) is True:
            res_buf[node.name] = int(node["sequence_index"][()])
        assert isinstance(node, h5py.Group)
        if depth_ >= 1:
            for sub_node in node.values():
                res_buf[node.name] = _get_entries(node=sub_node, depth_=depth_ - 1)
        return res_buf

    res = {}
    for node in root_node.values():
        res.update(_get_entries(node=node, depth_=depth - 1))

    return res


def is_process_node(node):
    return (
        node.name.split("/")[-1].startswith("tomwer_process_")
        and "NX_class" in node.attrs
        and node.attrs["NX_class"] == "NXprocess"
        and "program" in node
        and h5py_read_dataset(node["program"]) == "tomwer_dark_refs"
        and "version" in node
        and "sequence_index" in node
    )


def get_darks_frm_process_file(process_file, entry) -> None | dict:
    """

    :param process_file:
    :return:
    """
    if entry is None:
        with HDF5File(process_file, "r", swmr=True) as h5f:
            entries = get_process_entries(root_node=h5f, depth=MAX_DEPTH)
            if len(entries) == 0:
                logger.info("unable to find a DarkRef process in %s" % process_file)
                return None
            elif len(entries) > 0:
                raise ValueError("several entry found, entry should be " "specify")
            else:
                entry = list(entries.keys())[0]
                logger.info("take %s as default entry" % entry)

    with HDF5File(process_file, "r", swmr=True) as h5f:
        dark_nodes = get_process_entries(root_node=h5f[entry], depth=MAX_DEPTH - 1)
        index_to_path = {}
        for key, value in dark_nodes.items():
            index_to_path[key] = key

        if len(dark_nodes) == 0:
            return {}
        # take the last processed dark ref
        last_process_index = sorted(dark_nodes.keys())[-1]
        last_process_dark = index_to_path[last_process_index]
        if (len(index_to_path)) > 1:
            logger.warning(
                "several processing found for dark-ref,"
                "take the last one: %s" % last_process_dark
            )

        res = {}
        if "results" in h5f[last_process_dark].keys():
            results_node = h5f[last_process_dark]["results"]
            if "darks" in results_node.keys():
                darks = results_node["darks"]
                for index in darks:
                    res[int(index)] = DataUrl(
                        file_path=process_file,
                        data_path=darks[index].name,
                        scheme="silx",
                    )
        return res


def get_flats_frm_process_file(process_file, entry) -> None | dict:
    """

    :param process_file:
    :return:
    """
    if entry is None:
        with HDF5File(process_file, "r", swmr=True) as h5f:
            entries = get_process_entries(root_node=h5f, depth=MAX_DEPTH)
            if len(entries) == 0:
                logger.info("unable to find a DarkRef process in %s" % process_file)
                return None
            elif len(entries) > 0:
                raise ValueError("several entry found, entry should be " "specify")
            else:
                entry = list(entries.keys())[0]
                logger.info("take %s as default entry" % entry)

    with HDF5File(process_file, "r", swmr=True) as h5f:
        dkref_nodes = get_process_entries(root_node=h5f[entry], depth=MAX_DEPTH - 1)
        if len(dkref_nodes) == 0:
            return {}
        index_to_path = {}
        for key, value in dkref_nodes.items():
            index_to_path[key] = key

        # take the last processed dark ref
        last_process_index = sorted(dkref_nodes.keys())[-1]
        last_process_dkrf = index_to_path[last_process_index]
        if (len(index_to_path)) > 1:
            logger.warning(
                "several processing found for dark-ref,"
                "take the last one: %s" % last_process_dkrf
            )

        res = {}
        if "results" in h5f[last_process_dkrf].keys():
            results_node = h5f[last_process_dkrf]["results"]
            if "flats" in results_node.keys():
                flats = results_node["flats"]
                for index in flats:
                    res[int(index)] = DataUrl(
                        file_path=process_file,
                        data_path=flats[index].name,
                        scheme="silx",
                    )
        return res