Source code for nabu.processing.histogram

from math import log2, ceil
import numpy as np
from silx.math import Histogramnd
from tomoscan.io import HDF5File
from ..utils import check_supported
from ..resources.logger import LoggerOrPrint


[docs] class PartialHistogram: """ A class for computing histogram progressively. In certain cases, it is cumbersome to compute a histogram directly on a big chunk of data (ex. data not fitting in memory, disk access too slow) while some parts of the data are readily available in-memory. """ histogram_methods = ["fixed_bins_width", "fixed_bins_number"] bin_width_policies = ["uint16"] backends = ["numpy", "silx"] def __init__(self, method="fixed_bins_width", bin_width="uint16", num_bins=None, min_bins=None, backend="silx"): """ Initialize a PartialHistogram class. Parameters ---------- method: str, optional Partial histogram computing method. Available are: - `fixed_bins_width`: all the histograms are computed with the same bin width. The class adapts to the data range and computes the number of bins accordingly. - `fixed_bins_number`: all the histograms are computed with the same number of bins. The class adapts to the data range and computes the bin width accordingly. Default is "fixed_bins_width" bin_width: str or float, optional Policy for histogram bins when method="fixed_bins_width". Available are: - "uint16": The bin width is computed so that floating-point elements `f1` and `f2` satisfying `|f1 - f2| < bin_width` implies `f1_converted - f2_converted < 1` once cast to uint16. - A number: all the bins have this fixed width. Default is "uint16" num_bins: int, optional Number of bins when method = 'fixed_bins_number'. min_bins: int, optional Minimum number of bins when method = 'fixed_bins_width'. backend: str, optional Which histogram backend to use for computations. Available are "silx", "numpy". Fastest is "silx". """ check_supported(method, self.histogram_methods, "histogram computing method") self.method = method check_supported(backend, self.backends, "histogram backend") self.backend = backend self._set_bin_width(bin_width) self._set_num_bins(num_bins) self.min_bins = min_bins self._set_histogram_methods() def _set_bin_width(self, bin_width): if self.method == "fixed_bins_number": self.bin_width = None return if isinstance(bin_width, str): check_supported(bin_width, self.bin_width_policies, "bin width policy") self._fixed_bw = False else: bin_width = float(bin_width) self._fixed_bw = True self.bin_width = bin_width def _set_num_bins(self, num_bins): if self.method == "fixed_bins_width": self.num_bins = None return if self.method == "fixed_bins_number" and num_bins is None: raise ValueError("Need to specify num_bins for method='fixed_bins_number'") self.num_bins = int(num_bins) def _set_histogram_methods(self): self._histogram_methods = { "fixed_bins_number": { "compute": self._compute_histogram_fixed_nbins, "merge": self._merge_histograms_fixed_nbins, }, "fixed_bins_width": { "compute": self._compute_histogram_fixed_bw, "merge": self._merge_histograms_fixed_bw, }, } assert set(self._histogram_methods.keys()) == set(self.histogram_methods) @staticmethod def _get_histograms_and_bins(histograms, center=False, dont_truncate_bins=False): histos = [h[0] for h in histograms] if dont_truncate_bins: bins = [h[1] for h in histograms] else: if center: bins = [0.5 * (h[1][1:] + h[1][:-1]) for h in histograms] else: bins = [h[1][:-1] for h in histograms] return histos, bins # # Histogram with fixed number of bins # def _compute_histogram_fixed_nbins(self, data, data_range=None): if data.ndim > 1: data = data.ravel() dmin, dmax = data.min(), data.max() if data_range is None else data_range if self.backend == "numpy": res = np.histogram(data, bins=self.num_bins) elif self.backend == "silx": histogrammer = Histogramnd(data, n_bins=self.num_bins, histo_range=(dmin, dmax), last_bin_closed=True) res = histogrammer.histo, histogrammer.edges[0] # pylint: disable=E1136 return res def _merge_histograms_fixed_nbins(self, histograms, dont_truncate_bins=False): histos, bins = self._get_histograms_and_bins(histograms, dont_truncate_bins=dont_truncate_bins) res = np.histogram( np.hstack(bins), weights=np.hstack(histos), bins=self.num_bins, ) return res # # Histogram with fixed bin width # def _bin_width_u16(self, dmin, dmax): return (dmax - dmin) / 65535.0 def _bin_width_fixed(self, dmin, dmax): return self.bin_width
[docs] def get_bin_width(self, dmin, dmax): if self._fixed_bw: return self._bin_width_fixed(dmin, dmax) elif self.bin_width == "uint16": return self._bin_width_u16(dmin, dmax) else: raise ValueError()
def _compute_histogram_fixed_bw(self, data, data_range=None): dmin, dmax = data.min(), data.max() if data_range is None else data_range min_bins = self.min_bins or 1 bw_max = self.get_bin_width(dmin, dmax) nbins = 0 bw_factor = 1 while nbins < min_bins: bw = 2 ** round(log2(bw_max)) / bw_factor nbins = int((dmax - dmin) / bw) bw_factor *= 2 res = np.histogram(data, bins=nbins) return res def _merge_histograms_fixed_bw(self, histograms, **kwargs): histos, bins = self._get_histograms_and_bins(histograms, center=False) dmax = max([b[-1] for b in bins]) dmin = min([b[0] for b in bins]) bw_max = max([b[1] - b[0] for b in bins]) res = np.histogram(np.hstack(bins), weights=np.hstack(histos), bins=int((dmax - dmin) / bw_max)) return res # # Dispatch methods #
[docs] def compute_histogram(self, data, data_range=None): compute_hist_func = self._histogram_methods[self.method]["compute"] return compute_hist_func(data, data_range=data_range)
[docs] def merge_histograms(self, histograms, **kwargs): merge_hist_func = self._histogram_methods[self.method]["merge"] return merge_hist_func(histograms, **kwargs)
[docs] class VolumeHistogram: """ A class for computing the histogram of an entire volume. Unless explicitly specified, histogram is computed in several passes so that not all the volume is loaded in memory. """ def __init__(self, data_url, chunk_size_slices=100, chunk_size_GB=None, nbins=1e6, logger=None): """ Initialize a VolumeHistogram object. Parameters ---------- fname: DataUrl DataUrl to the HDF5 file. chunk_size_slices: int, optional Compute partial histograms of groups of slices. This is the default behavior, where the groups size is 100 slices. This parameter is mutually exclusive with 'chunk_size_GB'. chunk_size_GB: float, optional Maximum memory (in GB) to use when computing the histogram by group of slices. This parameter is mutually exclusive with 'chunk_size_slices'. nbins: int, optional Histogram number of bins. Default is 1e6. """ self.data_url = data_url self.logger = LoggerOrPrint(logger) self._get_data_info() self._set_chunk_size(chunk_size_slices, chunk_size_GB) self.nbins = int(nbins) self._init_histogrammer() def _get_data_info(self): self.fname = self.data_url.file_path() self.data_path = self.data_url.data_path() with HDF5File(self.fname, "r") as fid: try: data_ptr = fid[self.data_path] except KeyError: msg = str( "Could not access HDF5 path %s in file %s. Please check that this file \ actually contains a reconstruction and that the HDF5 path is correct" % (self.data_path, self.fname) ) self.logger.fatal(msg) raise ValueError(msg) if data_ptr.ndim != 3: msg = "Expected data to have 3 dimensions, got %d" % data_ptr.ndim raise ValueError(msg) self.data_shape = data_ptr.shape self.data_dtype = data_ptr.dtype self.data_nbytes_GB = np.prod(data_ptr.shape) * data_ptr.dtype.itemsize / 1e9 def _set_chunk_size(self, chunk_size_slices, chunk_size_GB): if not ((chunk_size_slices is not None) ^ (chunk_size_GB is not None)): raise ValueError("Please specify either chunk_size_slices or chunk_size_GB") if chunk_size_slices is None: chunk_size_slices = int(chunk_size_GB / (np.prod(self.data_shape[1:]) * self.data_dtype.itemsize / 1e9)) self.chunk_size = chunk_size_slices self.logger.debug("Computing histograms by groups of %d slices" % self.chunk_size) def _init_histogrammer(self): self.histogrammer = PartialHistogram(method="fixed_bins_number", num_bins=self.nbins) def _compute_histogram(self, data): return self.histogrammer.compute_histogram(data.ravel()) # 1D
[docs] def compute_volume_histogram(self): n_z = self.data_shape[0] histograms = [] n_steps = ceil(n_z / self.chunk_size) with HDF5File(self.fname, "r") as fid: for chunk_id in range(n_steps): self.logger.debug("Computing histogram %d/%d" % (chunk_id + 1, n_steps)) z_slice = slice(chunk_id * self.chunk_size, (chunk_id + 1) * self.chunk_size) images_stack = fid[self.data_path][z_slice, :, :] hist = self._compute_histogram(images_stack) histograms.append(hist) res = self.histogrammer.merge_histograms(histograms) return res
[docs] def hist_as_2Darray(hist, center=True, dtype="f"): hist, bins = hist if bins.size != hist.size: # assert bins.size == hist.size +1 if center: bins = 0.5 * (bins[1:] + bins[:-1]) else: bins = bins[:-1] res = np.zeros((2, hist.size), dtype=dtype) res[0] = hist res[1] = bins.astype(dtype) return res
[docs] def add_last_bin(histo_bins): """ Add the last bin (max value) to a list of bin edges. """ res = np.zeros(histo_bins.size + 1, dtype=histo_bins.dtype) res[:-1] = histo_bins[:] res[-1] = res[-2] + (res[1] - res[0]) return res