import os
from nabu.misc.utils import rescale_data
from nabu.pipeline.params import files_formats
from tomoscan.volumebase import VolumeBase
from tomoscan.scanbase import TomoScanBase
from tomoscan.esrf.volume import (
EDFVolume,
HDF5Volume,
JP2KVolume,
MultiTIFFVolume,
TIFFVolume,
)
from tomoscan.io import HDF5File
from silx.io.utils import get_data
from silx.utils.enum import Enum as _Enum
import numpy
from silx.io.url import DataUrl
from typing import Optional
import logging
_logger = logging.getLogger(__name__)
__all__ = ["get_default_output_volume", "cast_volume"]
_DEFAULT_OUTPUT_DIR = "vol_cast"
RESCALE_MIN_PERCENTILE = 10
RESCALE_MAX_PERCENTILE = 90
[docs]
def get_default_output_volume(
input_volume: VolumeBase, output_type: str, output_dir: str = _DEFAULT_OUTPUT_DIR
) -> VolumeBase:
"""
For a given input volume and output type return output volume as an instance of VolumeBase
:param VolumeBase intput_volume: volume for which we want to get the resulting output volume for a cast
:param str output_type: output_type of the volume (edf, tiff, hdf5...)
:param str output_dir: output dir to save the cast volume
"""
if not isinstance(input_volume, VolumeBase):
raise TypeError(f"input_volume is expected to be an instance of {VolumeBase}")
valid_file_formats = set(files_formats.values())
if not output_type in valid_file_formats:
raise ValueError(f"output_type is not a valid value ({output_type}). Valid values are {valid_file_formats}")
if isinstance(input_volume, (EDFVolume, TIFFVolume, JP2KVolume)):
if output_type == "hdf5":
file_path = os.path.join(
input_volume.data_url.file_path(),
output_dir,
input_volume.get_volume_basename() + ".hdf5",
)
volume = HDF5Volume(
file_path=file_path,
data_path="/volume",
)
assert volume.get_identifier() is not None, "volume should be able to create an identifier"
return volume
elif output_type in ("tiff", "edf", "jp2"):
if output_type == "tiff":
Constructor = TIFFVolume
elif output_type == "edf":
Constructor = EDFVolume
elif output_type == "jp2":
Constructor = JP2KVolume
return Constructor(
folder=os.path.join(
os.path.dirname(input_volume.data_url.file_path()),
output_dir,
),
volume_basename=input_volume.get_volume_basename(),
)
else:
raise NotImplementedError(f"output volume format {output_type} is not handled")
elif isinstance(input_volume, (HDF5Volume, MultiTIFFVolume)):
if output_type == "hdf5":
data_file_parent_path, data_file_name = os.path.split(input_volume.data_url.file_path())
# replace extension:
data_file_name = ".".join(
[
os.path.splitext(data_file_name)[0],
"hdf5",
]
)
if isinstance(input_volume, HDF5Volume):
data_data_path = input_volume.data_url.data_path()
metadata_data_path = input_volume.metadata_url.data_path()
try:
data_path = os.path.commonprefix([data_data_path, metadata_data_path])
except Exception:
data_path = "volume"
else:
data_data_path = HDF5Volume.DATA_DATASET_NAME
metadata_data_path = HDF5Volume.METADATA_GROUP_NAME
file_path = data_file_name
data_path = "volume"
volume = HDF5Volume(
file_path=os.path.join(data_file_parent_path, output_dir, data_file_name),
data_path=data_path,
)
assert volume.get_identifier() is not None, "volume should be able to create an identifier"
return volume
elif output_type in ("tiff", "edf", "jp2"):
if output_type == "tiff":
Constructor = TIFFVolume
elif output_type == "edf":
Constructor = EDFVolume
elif output_type == "jp2":
Constructor = JP2KVolume
file_parent_path, file_name = os.path.split(input_volume.data_url.file_path())
file_name = os.path.splitext(file_name)[0]
return Constructor(
folder=os.path.join(
file_parent_path,
output_dir,
os.path.basename(file_name),
)
)
else:
raise NotImplementedError(f"output volume format {output_type} is not handled")
else:
raise NotImplementedError(f"input volume format {input_volume} is not handled")
[docs]
def cast_volume(
input_volume: VolumeBase,
output_volume: VolumeBase,
output_data_type: numpy.dtype,
data_min=None,
data_max=None,
scan: Optional[TomoScanBase] = None,
rescale_min_percentile=RESCALE_MIN_PERCENTILE,
rescale_max_percentile=RESCALE_MAX_PERCENTILE,
save=True,
store=False,
) -> VolumeBase:
"""
cast givent volume to output_volume of 'output_data_type' type
:param VolumeBase input_volume:
:param VolumeBase output_volume:
:param numpy.dtype output_data_type: output data type
:param number data_min: `data` min value to clamp to new_min. Any lower value will also be clamp to new_min.
:param number data_max: `data` max value to clamp to new_max. Any hight value will also be clamp to new_max.
:param TomoScanBase scan: source scan that produced input_volume. Can be used to find histogram for example.
:param rescale_min_percentile: if `data_min` is None will set data_min to 'rescale_min_percentile'
:param rescale_max_percentile: if `data_max` is None will set data_min to 'rescale_max_percentile'
:param bool save: if True dump the slice on disk (one by one)
:param bool store: if True once the volume is cast then set `output_volume.data`
:return: output_volume with data and metadata set
.. warning::
the created will volume will not be saved in this processing. If you want
to save the cast volume you must do it yourself.
.. note::
if you want to tune compression ratios (for jp2k) then please update the `cratios` attributes of the output volume
"""
if not isinstance(input_volume, VolumeBase):
raise TypeError(f"input_volume is expected to be a {VolumeBase}. {type(input_volume)} provided")
if not isinstance(output_volume, VolumeBase):
raise TypeError(f"output_volume is expected to be a {VolumeBase}. {type(output_volume)} provided")
try:
output_data_type = numpy.dtype(
output_data_type
) # User friendly API in case user provides np.uint16 e.g. (see issue #482)
except Exception:
pass
if not isinstance(output_data_type, numpy.dtype):
raise TypeError(f"output_data_type is expected to be a {numpy.dtype}. {type(output_data_type)} provided")
# start processing
# check for data_min and data_max
if data_min is None or data_max is None:
found_data_min, found_data_max = _try_to_find_min_max_from_histo(
input_volume=input_volume,
scan=scan,
rescale_min_percentile=rescale_min_percentile,
rescale_max_percentile=rescale_max_percentile,
)
if found_data_min is None or found_data_max is None:
_logger.warning("couldn't find histogram, recompute volume min and max values")
data_min, data_max = input_volume.get_min_max()
_logger.info(f"min and max found ({data_min} ; {data_max})")
data_min = data_min if data_min is not None else found_data_min
data_max = data_max if data_max is not None else found_data_max
data = []
for input_slice, frame_dumper in zip(
input_volume.browse_slices(),
output_volume.data_file_saver_generator(
input_volume.get_volume_shape()[0],
data_url=output_volume.data_url,
overwrite=output_volume.overwrite,
),
):
if numpy.issubdtype(output_data_type, numpy.integer):
new_min = numpy.iinfo(output_data_type).min
new_max = numpy.iinfo(output_data_type).max
output_slice = clamp_and_rescale_data(
data=input_slice,
new_min=new_min,
new_max=new_max,
data_min=data_min,
data_max=data_max,
rescale_min_percentile=rescale_min_percentile,
rescale_max_percentile=rescale_max_percentile,
).astype(output_data_type)
else:
output_slice = input_slice.astype(output_data_type)
if save:
frame_dumper[:] = output_slice
if store:
# only keep data in cache if not dump to disk
data.append(output_slice)
if store:
output_volume.data = numpy.asarray(data)
# try also to append some metadata to it
try:
output_volume.metadata = input_volume.metadata or input_volume.load_metadata()
except (OSError, KeyError):
# if no metadata provided and or saved in disk or if some key are missing
pass
return output_volume
def clamp_and_rescale_data(
data: numpy.ndarray,
new_min,
new_max,
data_min=None,
data_max=None,
rescale_min_percentile=RESCALE_MIN_PERCENTILE,
rescale_max_percentile=RESCALE_MAX_PERCENTILE,
):
"""
rescale data to 'new_min', 'new_max'
:param numpy.ndarray data: data to be rescaled
:param dtype output_dtype: output dtype
:param new_min: rescaled data new min (clamp min value)
:param new_max: rescaled data new max (clamp max value)
:param data_min: `data` min value to clamp to new_min. Any lower value will also be clamp to new_min.
:param data_max: `data` max value to clamp to new_max. Any hight value will also be clamp to new_max.
:param rescale_min_percentile: if `data_min` is None will set data_min to 'rescale_min_percentile'
:param rescale_max_percentile: if `data_max` is None will set data_min to 'rescale_max_percentile'
"""
if data_min is None:
data_min = numpy.percentile(data, rescale_min_percentile)
if data_max is None:
data_max = numpy.percentile(data, rescale_max_percentile)
# rescale data
rescaled_data = rescale_data(data, new_min=new_min, new_max=new_max, data_min=data_min, data_max=data_max)
# clamp data
rescaled_data[rescaled_data < new_min] = new_min
rescaled_data[rescaled_data > new_max] = new_max
return rescaled_data
def find_histogram(volume: VolumeBase, scan: Optional[TomoScanBase] = None) -> Optional[DataUrl]:
"""
Look for histogram of the provided url. If found one return the DataUrl of the nabu histogram
"""
if not isinstance(volume, VolumeBase):
raise TypeError(f"volume is expected to be an instance of {VolumeBase} not {type(volume)}")
elif isinstance(volume, HDF5Volume):
histogram_file = volume.data_url.file_path()
if volume.url is not None:
data_path = volume.url.data_path()
if data_path.endswith("reconstruction"):
data_path = "/".join(
[
*data_path.split("/")[:-1],
"histogram/results/data",
]
)
else:
data_path = "/".join((volume.url.data_path(), "histogram/results/data"))
else:
# TODO: FIXME: in some case (if the users provides the full data_url and if the 'DATA_DATASET_NAME' is not used we
# will endup with an invalid data_path. Hope this case will not happen. Anyway this is a case that we can't handle.)
# if trouble: check if data_path exists. If not raise an error saying this we can't find an histogram for this volume
data_path = volume.data_url.data_path().replace(HDF5Volume.DATA_DATASET_NAME, "histogram/results/data")
elif isinstance(volume, (EDFVolume, JP2KVolume, TIFFVolume, MultiTIFFVolume)):
if isinstance(volume, (EDFVolume, JP2KVolume, TIFFVolume)):
# TODO: check with pierre what is the policy of histogram files names
histogram_file = os.path.join(
volume.data_url.file_path(),
volume.get_volume_basename() + "histogram.hdf5",
)
else:
# TODO: check with pierre what is the policy of histogram files names
file_path, _ = os.path.splitext(volume.data_url.file_path())
histogram_file = os.path.join(file_path + "histogram.hdf5")
if scan is not None:
data_path = getattr(scan, "entry", "entry")
else:
# TODO: FIXME: how to get the entry name in every case ?
# possible solutions are:
# * look at the different entries and check for histogram: will work if only one histogram in the file
# * Add a histogram request so the user can provide it (can be done at tomoscan level or nabu if we think this is specific to nabu)
_logger.info("histogram file found but unable to find relevant histogram")
return None
else:
raise NotImplementedError(f"volume {type(volume)} not handled")
if not os.path.exists(histogram_file):
_logger.info(f"{histogram_file} not found")
return None
with HDF5File(histogram_file, mode="r") as h5f:
if not data_path in h5f:
_logger.info(f"{data_path} in {histogram_file} not found")
return None
else:
_logger.info(f"Found histogram {histogram_file}::/{data_path}")
return DataUrl(
file_path=histogram_file,
data_path=data_path,
scheme="silx",
)
def _get_hst_saturations(hist, bins, rescale_min_percentile: numpy.float32, rescale_max_percentile: numpy.float32):
hist_cum = numpy.cumsum(hist)
bin_index_min = numpy.searchsorted(hist_cum, numpy.percentile(hist_cum, rescale_min_percentile))
bin_index_max = numpy.searchsorted(hist_cum, numpy.percentile(hist_cum, rescale_max_percentile))
return bins[bin_index_min], bins[bin_index_max]
def _try_to_find_min_max_from_histo(
input_volume: VolumeBase, rescale_min_percentile, rescale_max_percentile, scan=None
) -> tuple:
"""
util to interpret nabu histogram and deduce data_min and data_max to be used for
rescaling a volume
"""
histogram_res_url = find_histogram(input_volume, scan=scan)
if histogram_res_url is not None:
return _min_max_from_histo(
url=histogram_res_url,
rescale_min_percentile=rescale_min_percentile,
rescale_max_percentile=rescale_max_percentile,
)
else:
return None, None
def _min_max_from_histo(url: DataUrl, rescale_min_percentile: int, rescale_max_percentile: int) -> tuple:
try:
histogram = get_data(url)
except Exception as e:
_logger.error(f"Fail to load histogram from {url.path()}. Reason is {e}")
return None, None
else:
bins = histogram[1]
hist = histogram[0]
return _get_hst_saturations(
hist, bins, numpy.float32(rescale_min_percentile), numpy.float32(rescale_max_percentile)
)