Source code for nabu.pipeline.datadump

from os import path
from ..resources.logger import LoggerOrPrint
from .utils import get_subregion
from .writer import WriterManager
from ..io.reader import get_hdf5_dataset_shape

try:
    import pycuda.gpuarray as garray

    __has_pycuda__ = True
except:
    __has_pycuda__ = False


[docs] class DataDumpManager: """ A helper class for managing data dumps, with the aim of saving/resuming the processing from a given step. """ def __init__(self, process_config, sub_region, margin=None, logger=None): """ Initialize a DataDump object. Parameters ----------- process_config: ProcessConfig ProcessConfig object sub_region: tuple of int Series of integers defining the sub-region being processed. The form is ((start_angle, end_angle), (start_z, end_z), (start_x, end_x)) margin: tuple of int, optional Margin, used when processing data, in the form ((up, down), (left, right)). Each item can be None. Using a margin means that a given chunk of data will eventually be cropped as `data[:, up:-down, left:-right]` logger: Logger, optional Logging object """ self.process_config = process_config self.processing_steps = process_config.processing_steps self.processing_options = process_config.processing_options self.dataset_info = process_config.dataset_info self._set_subregion_and_margin(sub_region, margin) self.logger = LoggerOrPrint(logger) self._configure_data_dumps() def _set_subregion_and_margin(self, sub_region, margin): self.sub_region = get_subregion(sub_region) self._z_sub_region = self.sub_region[1] self.z_min = self._z_sub_region[0] self.margin = get_subregion(margin, ndim=2) # ((U, D), (L, R)) self.margin_up = self.margin[0][0] self.start_index = self.z_min + self.margin_up self.delta_z = self._z_sub_region[-1] - self._z_sub_region[-2] self._grouped_processing = False iangle1, iangle2 = self.sub_region[0] if iangle1 != 0 or iangle2 < len(self.process_config.rotation_angles(subsampling=False)): self._grouped_processing = True self.start_index = self.sub_region[0][0] def _configure_dump(self, step_name, force_dump_to_fname=None): if force_dump_to_fname is not None: # Shortcut fname_full = force_dump_to_fname elif step_name in self.processing_steps: # Standard case if not self.processing_options[step_name].get("save", False): return fname_full = self.processing_options[step_name]["save_steps_file"] elif step_name == "sinogram" and self.process_config.dump_sinogram: # "sinogram" is a special keyword fname_full = self.process_config.dump_sinogram_file else: return # "fname_full" is the path to the final master file. # We also need to create partial files (in a sub-directory) fname, ext = path.splitext(fname_full) dirname, file_prefix = path.split(fname) self.data_dump[step_name] = WriterManager( dirname, file_prefix, file_format="hdf5", overwrite=True, start_index=self.start_index, logger=self.logger, metadata={ "process_name": step_name, "processing_index": 0, "config": { "processing_options": self.processing_options, # slow! "nabu_config": self.process_config.nabu_config, }, "entry": getattr(self.dataset_info.dataset_scanner, "entry", "entry"), }, ) def _configure_data_dumps(self): self.data_dump = {} for step_name in self.processing_steps: self._configure_dump(step_name) # sinogram is a special keyword: not in processing_steps, but guaranteed to be before sinogram generation if self.process_config.dump_sinogram: self._configure_dump("sinogram")
[docs] def get_data_dump(self, step_name): """ Get information on where to write a given processing step. Parameters ---------- step_name: str Name of the processing step Returns ------- writer_configurator: WriterConfigurator An object with information on where to write the data for the given processing step. """ return self.data_dump.get(step_name, None)
[docs] def get_read_dump_subregion(self): read_opts = self.processing_options["read_chunk"] if read_opts.get("process_file", None) is None: return None dump_start_z, dump_end_z = read_opts["dump_start_z"], read_opts["dump_end_z"] relative_start_z = self.z_min - dump_start_z relative_end_z = relative_start_z + self.delta_z # When using binning, every step after "read" results in smaller-sized data. # Therefore dumped data has shape (ceil(n_angles/subsampling), n_z//binning_z, n_x//binning_x) relative_start_z //= self.process_config.binning_x relative_end_z //= self.process_config.binning_x # (n_angles, n_z, n_x) subregion = (None, None, relative_start_z, relative_end_z, None, None) return subregion
def _check_resume_from_step(self): read_opts = self.processing_options["read_chunk"] expected_radios_shape = get_hdf5_dataset_shape( read_opts["process_file"], read_opts["process_h5_path"], sub_region=self.get_read_dump_subregion(), ) # TODO check
[docs] def dump_data_to_file(self, step_name, data): if step_name not in self.data_dump: return writer = self.data_dump[step_name] self.logger.info("Dumping data to %s" % writer.fname) if __has_pycuda__: if isinstance(data, garray.GPUArray): data = data.get() writer.write_data(data)
def __repr__(self): res = "%s(%s, margin=%s)" % (self.__class__.__name__, str(self.sub_region), str(self.margin)) if len(self.data_dump) > 0: for step_name, writer_configurator in self.data_dump.items(): res += "\n- Dump %s to %s" % (step_name, writer_configurator.fname) return res