Source code for nabu.app.composite_cor

import logging
import os
import sys
import numpy as np
import re

from nabu.resources.dataset_analyzer import HDF5DatasetAnalyzer
from nabu.pipeline.estimators import CompositeCOREstimator, estimate_cor
from nabu.resources.nxflatfield import update_dataset_info_flats_darks
from nabu.resources.utils import extract_parameters
from nxtomo.application.nxtomo import NXtomo
from .. import version
from .cli_configs import CompositeCorConfig
from .utils import parse_params_values
from ..utils import DictToObj
import json


[docs] class NumpyArrayEncoder(json.JSONEncoder):
[docs] def default(self, obj): if isinstance(obj, np.ndarray): return obj.tolist() return json.JSONEncoder.default(self, obj)
[docs] def main(user_args=None): "Application to extract with the composite cor finder the center of rotation for a scan or a series of scans" if user_args is None: user_args = sys.argv[1:] args_dict = parse_params_values( CompositeCorConfig, parser_description=main.__doc__, program_version="nabu " + version, user_args=user_args, ) composite_cor_entry_point(args_dict) # here we have been called by the cli. The return value 0 means OK return 0
[docs] def composite_cor_entry_point(args_dict): args = DictToObj(args_dict) if len(os.path.dirname(args.filename_template)) == 0: # To make sure that other utility routines can succesfully deal with path within the current directory args.filename_template = os.path.join(".", args.filename_template) args.filename_template = os.path.abspath(args.filename_template) if args.first_stage is not None: if args.num_of_stages is None: args.num_of_stages = 1 # if the first_stage parameter has been given then # we are using numbers to form the names of the files. # The filename must containe a XX..X substring which will be replaced pattern = re.compile("[X]+") ps = pattern.findall(args.filename_template) if len(ps) == 0: message = f""" You have specified the "first_stage" parameter, with an integer. Therefore the "filename_template" parameter is expected to containe a XX..X subsection but none was found in the passed parameter which is {args.filename_template} """ raise ValueError(message) ls = list(map(len, ps)) idx = np.argmax(ls) args.filename_template = args.filename_template.replace(ps[idx], "{i_stage:" + "0" + str(ls[idx]) + "d}") if args.num_of_stages is None: # this way it works also in the simple case where # only the filename is provided together with the cor options num_of_stages = 1 first_stage = 0 else: num_of_stages = args.num_of_stages first_stage = args.first_stage cor_list = [] for iz in range(first_stage, first_stage + num_of_stages): if args.num_of_stages is not None: nexus_name = args.filename_template.format(i_stage=iz) else: nexus_name = args.filename_template dataset_info = HDF5DatasetAnalyzer(nexus_name, extra_options={"h5_entry": args.entry_name}) update_dataset_info_flats_darks(dataset_info, flatfield_mode=1) ### JL start ### # Extract CoR parameters from configuration file try: cor_options = extract_parameters(args.cor_options, sep=";") except Exception as exc: msg = "Could not extract parameters from cor_options: %s" % (str(exc)) raise ValueError(msg) ### JL end ### # JL start ### #: next bit will be done in estimate # if "near_pos" not in args.cor_options: # scan = NXtomo() # scan.load(file_path=nexus_name, data_path=args.entry_name) # estimated_near = scan.instrument.detector.estimated_cor_from_motor # # cor_options = args.cor_options + f" ; near_pos = {estimated_near} " # # else: # cor_options = args.cor_options ### JL end ### cor_finder = CompositeCOREstimator( dataset_info, oversampling=args.oversampling, theta_interval=args.theta_interval, n_subsampling_y=args.n_subsampling_y, take_log=True, spike_threshold=0.04, cor_options=cor_options, norm_order=1, ) cor_position = cor_finder.find_cor() cor_list.append(cor_position) cor_list = np.array(cor_list).T if args.output_file is not None: output_name = args.output_file else: output_name = os.path.splitext(args.filename_template)[0] + "_cors.txt" if output_name.endswith(".json"): with open(output_name, "w") as fp: json.dump( dict(rotation_axis_position=cor_list[0], rotation_axis_position_list=cor_list), fp, indent=4, cls=NumpyArrayEncoder, ) else: np.savetxt( output_name, cor_list, )