Source code for nabu.io.reader_helical

from .reader import *


[docs] class ChunkReaderHelical(ChunkReader): """implements reading by projections subsets""" def _set_subregion(self, sub_region): super()._set_subregion(sub_region) ########### # undo the chun_size setting of the base class # to avoid allocation of Tera bytes in the helical case self.chunk_shape = (1,) + self.shape
[docs] def set_data_buffer(self, data_buffer, pre_allocate=False): if data_buffer is not None: # overwrite out_dtype self.files_data = data_buffer self.out_dtype = data_buffer.dtype if data_buffer.shape[1:] != self.shape: raise ValueError("Expected shape %s but got %s" % (self.shape, data_buffer.shape)) if pre_allocate: self.files_data = np.zeros(self.chunk_shape, dtype=self.out_dtype) if (self.binning is not None) and (np.dtype(self.out_dtype).kind in ["u", "i"]): raise ValueError( "Output datatype cannot be integer when using binning. Please set the 'convert_float' parameter to True or specify a 'data_buffer'." )
[docs] def get_binning(self): if self.binning is None: return 1, 1 else: return self.binning
def _load_single(self, sub_total_prange_slice=slice(None, None)): if sub_total_prange_slice == slice(None, None): sorted_files_indices = self._sorted_files_indices else: sorted_files_indices = self._sorted_files_indices[sub_total_prange_slice] for i, fileidx in enumerate(sorted_files_indices): file_url = self.files[fileidx] self.files_data[i] = self.get_data(file_url) self._fileindex_to_idx[fileidx] = i def _apply_subsample_fact(self, t): if t is not None: t = t * self.dataset_subsampling return t def _load_multi(self, sub_total_prange_slice=slice(None, None)): if sub_total_prange_slice == slice(None, None): files_to_be_compacted_dict = self.files sorted_files_indices = self._sorted_files_indices else: if self.dataset_subsampling > 1: start, stop, step = list( map( self._apply_subsample_fact, [sub_total_prange_slice.start, sub_total_prange_slice.stop, sub_total_prange_slice.step], ) ) sub_total_prange_slice = slice(start, stop, step) sorted_files_indices = self._sorted_files_indices[sub_total_prange_slice] files_to_be_compacted_dict = dict( zip(sorted_files_indices, [self.files[idx] for idx in sorted_files_indices]) ) urls_compacted = get_compacted_dataslices(files_to_be_compacted_dict, subsampling=self.dataset_subsampling) loaded = {} start_idx = 0 for idx in sorted_files_indices: url = urls_compacted[idx] url_str = str(url) is_loaded = loaded.get(url_str, False) if is_loaded: continue ds = url.data_slice() delta_z = ds.stop - ds.start if ds.step is not None and ds.step > 1: delta_z //= ds.step end_idx = start_idx + delta_z self.files_data[start_idx:end_idx] = self.get_data(url) start_idx += delta_z loaded[url_str] = True
[docs] def load_files(self, overwrite: bool = False, sub_total_prange_slice=slice(None, None)): """ Load the files whose links was provided at class instantiation. Parameters ----------- overwrite: bool, optional Whether to force reloading the files if already loaded. """ if self._loaded and not (overwrite): raise ValueError("Radios were already loaded. Call load_files(overwrite=True) to force reloading") if self.file_reader.multi_load: self._load_multi(sub_total_prange_slice) else: if self.dataset_subsampling > 1: assert ( False ), " in helica pipeline, load file _load_single has not yet been adapted to angular subsampling " self._load_single(sub_total_prange_slice) self._loaded = True
load_data = load_files @property def data(self): return self.files_data