Source code for exosim.recipes.createNDRs

import os
from collections import OrderedDict
from typing import List, Tuple, Union

import astropy.units as u
import h5py
import numpy as np

import exosim.log as log
import exosim.tasks.detector as detector
from exosim.models.signal import Counts
from exosim.output import HDF5OutputGroup, SetOutput
from exosim.output.hdf5.utils import copy_file, recursively_read_dict_contents
from exosim.utils.ascii_arts import astronomer3
from exosim.utils.checks import look_for_key
from exosim.utils.klass_factory import find_and_run_task
from exosim.utils.prepare_recipes import (
    clean_config_files,
    copy_input_files,
    load_options,
)
from exosim.utils.runConfig import RunConfig
from exosim.utils.timed_class import TimedClass


[docs]class CreateNDRs(TimedClass, log.Logger): """ Pipeline to create the observation NDRs. This pipeline loads the configuration file and the Sub-Exposures. Examples -------- >>> import exosim.recipes as recipes >>> recipes.CreateNDRs(input_file='./input_file.h5', >>> output_file = 'ndrs_file.h5', >>> options_file='main _configuration.xml') """ def __init__( self, input_file: str, output_file: str, options_file: Union[str, dict] ) -> None: """ Parameters ---------- input_file : str Sub-Exposure ExoSim product output_file : str output file name options_file: str or dict input configuration file """ super().__init__() self.graphics(astronomer3) RunConfig.stats() self.announce("started") clean_config_files() self.input = input_file self.mainConfig, self.payloadConfig = load_options(options_file) copy_input_files(os.path.dirname(os.path.abspath(output_file))) output = SetOutput(output_file) with h5py.File(self.input, "r") as f: ch_list = list(f["channels"].keys()) ch_list.sort() with output.use(append=True, cache=True) as out: output_grp = out.create_group("channels") for ch in ch_list: ch_description = ( self.payloadConfig["channel"][ch] if isinstance(self.payloadConfig["channel"], OrderedDict) else self.payloadConfig["channel"] ) # prepare output self.announce("iterating over {}".format(ch)) ch_grp = output_grp.create_group(ch) # import exposures data ( spectral, spatial, mid_freq_time_line, integration_times, number_of_exposures, n_subexposures_per_groups, n_groups_per_ramp, ) = self.load_subexposure_data(ch) # preparing outputs sub_ndrs, sim_grp = self.prepare_output( spectral, spatial, mid_freq_time_line, integration_times, ch, ch_grp, ) # add dark current if look_for_key( ch_description["detector"], "dark_current", True ): dc_instance = find_and_run_task( ch_description["detector"], "dc_task", detector.AddConstantDarkCurrent, ) dc_instance( subexposures=sub_ndrs, parameters=ch_description, integration_times=integration_times, output=sim_grp, ) # add shot noise if look_for_key( ch_description["detector"], "shot_noise", True ): addShotNoise = detector.AddShotNoise() addShotNoise(subexposures=sub_ndrs, output=sim_grp) # add cosmic rays if look_for_key( ch_description["detector"], "cosmic_rays", True ): cosmic_instance = find_and_run_task( ch_description["detector"], "cosmic_rays_task", detector.AddCosmicRays, ) cosmic_instance( subexposures=sub_ndrs, parameters=ch_description, integration_times=integration_times, output=sim_grp, ) # prepare accumulation n_subexposures_per_ramp = ( sub_ndrs.dataset.shape[0] // number_of_exposures ) base = np.arange(0, number_of_exposures, 1) state_machine = np.repeat(base, n_subexposures_per_ramp) # accumulate ndrs accumulateSubExposures = detector.AccumulateSubExposures() accumulateSubExposures( subexposures=sub_ndrs, state_machine=state_machine ) # add reset bias if look_for_key( ch_description["detector"], "ktc_offset", True ): reset_instance = find_and_run_task( ch_description["detector"], "ktc_offset_task", detector.AddKTC, ) reset_instance( subexposures=sub_ndrs, parameters=ch_description, state_machine=state_machine, output=sim_grp, ) # apply dead pixels map if look_for_key( ch_description["detector"], "dead_pixels", True ): dp_map_instance = find_and_run_task( ch_description["detector"], "dp_map_task", detector.ApplyDeadPixelsMap, ) dp_map_instance( subexposures=sub_ndrs, parameters=ch_description, output=sim_grp, ) # apply pixel non linearity if look_for_key( ch_description["detector"], "pixel_non_linearity", True ): pnl_map_instance = find_and_run_task( ch_description["detector"], "pnl_map_task", detector.LoadPixelsNonLinearityMap, ) pnl_map = pnl_map_instance( parameters=ch_description, output=sim_grp, ) pnl_apply_instance = find_and_run_task( ch_description["detector"], "pnl_task", detector.ApplyPixelsNonLinearity, ) pnl_apply_instance( subexposures=sub_ndrs, parameters=pnl_map ) # apply pixel saturation if look_for_key( ch_description["detector"], "saturation", True ): sat_instance = find_and_run_task( ch_description["detector"], "sat_task", detector.ApplySimpleSaturation, ) sat_instance( subexposures=sub_ndrs, parameters=ch_description ) # add read noise if look_for_key( ch_description["detector"], "read_noise", True ): read_instance = find_and_run_task( ch_description["detector"], "read_noise_task", detector.AddNormalReadNoise, ) read_instance( subexposures=sub_ndrs, parameters=ch_description, output=sim_grp, ) # add gain drift if look_for_key( ch_description["detector"], "gain_drift", True ): gain_instance = find_and_run_task( ch_description["detector"], "gain_drift_task", detector.addGainDrift, ) gain_instance( subexposures=sub_ndrs, parameters=ch_description, output=sim_grp, ) # converting to digital if look_for_key(ch_description["detector"], "ADC", True): analogToDigital = detector.AnalogToDigital() sub_ndrs = analogToDigital( subexposures=sub_ndrs, parameters=ch_description, output=sim_grp, ) # merging groups' ndrs mergeGroups = detector.MergeGroups() merged_ndrs = mergeGroups( subexposures=sub_ndrs, n_groups=n_groups_per_ramp * number_of_exposures, n_ndrs=n_subexposures_per_groups, output=ch_grp, ) merged_ndrs.metadata.update( {"n_groups_per_ramp": n_groups_per_ramp} ) merged_ndrs.write() # clean the output from redundant information self.clean_output_tree(ch_grp, ["simulation_data/sub_ndrs"]) self.info("{} completed.".format(ch)) self.log_runtime("{} ended in".format(ch), "info") self.refactor_output(output_file) self.info("output {} size: {:.3f}".format(output_file, out.getsize())) self.log_runtime_complete("recipe ended", "info") self.announce("ended")
[docs] def load_subexposure_data( self, ch: str ) -> Tuple[u.Quantity, u.Quantity, u.Quantity, u.Quantity, int, int, int]: """It loads the sub-exposures data from file Parameters ---------- ch : str channel name Returns ------- :class:`astropy.units.Quantity` spectral axes array :class:`astropy.units.Quantity` spatial axes array :class:`astropy.units.Quantity` temporal array :class:`astropy.units.Quantity` integration times int number of exposures int number of NDRs per ramp int number of NDRs per group """ with h5py.File(self.input, "r") as f: file_path = os.path.join("channels", ch) se = f[os.path.join(file_path, "SubExposures")] spectral = se["spectral"] * u.Unit( se["spectral_units"][()].decode("utf-8") ) spatial = se["spatial"] * u.Unit( se["spatial_units"][()].decode("utf-8") ) time_line = se["time"] * u.Unit( se["time_units"][()].decode("utf-8") ) integration_times = se["metadata"]["integration_times"]["value"][ () ] * u.Unit( se["metadata"]["integration_times"]["unit"][()].decode("utf-8") ) params = dict( f[os.path.join(file_path, "instantaneous_readout_params")] ) try: number_of_exposures = params["number_of_exposures"][()] except TypeError: number_of_exposures = params["number_of_exposures"]["value"][ () ] params = dict(f[os.path.join(file_path, "reading_scheme_params")]) n_subexposures_per_groups = params["n_NRDs_per_group"][()] n_groups_per_ramp = params["n_GRPs"][()] return ( spectral, spatial, time_line, integration_times, number_of_exposures, n_subexposures_per_groups, n_groups_per_ramp, )
[docs] def prepare_output( self, spectral: u.Quantity, spatial: u.Quantity, time_line: u.Quantity, integration_times: u.Quantity, ch: str, ch_grp: HDF5OutputGroup, ) -> Tuple[Counts, HDF5OutputGroup]: """It produces the NDRs output per channel. Parameters ---------- spectral : u.Quantity spectral axes array spatial : u.Quantity spatial axes array time_line : u.Quantity temporal array integration_times : u.Quantity integration times ch : str channel name ch_grp : h5py.Group output group Returns ------- :class:`exosim.models.signal.Counts` NDRs signal class :class:`exosim.output.hdf5.h5df.HDF5OutputGroup` output group """ sim_grp = ch_grp.create_group("simulation_data") with h5py.File(self.input, "r") as f: file_path = os.path.join("channels", ch) # copy simulation data sim_data = os.path.join(file_path, "simulation_data") for key in f[sim_data].keys(): f.copy(os.path.join(sim_data, key), sim_grp._entry) f.copy("info", sim_grp._entry, name="sub_exposures_info") inst_rdo_data = os.path.join( file_path, "instantaneous_readout_params" ) keys_to_keep = [ "spe_jit_averaged", "spa_jit_averaged", "y_jit", "x_jit", "jit_indexes", "y_jit_averaged", "x_jit_averaged", "effective_osf", ] for key in f[inst_rdo_data].keys(): if key in keys_to_keep: f.copy(os.path.join(inst_rdo_data, key), sim_grp._entry) if "pointing_jitter" in f.keys(): for key in f["pointing_jitter"]: f.copy( os.path.join("pointing_jitter", key), sim_grp._entry ) self.debug("simulation data copied to output") se = f[os.path.join(file_path, "SubExposures")] subexposures = se["data"] metadata = recursively_read_dict_contents(se["metadata"]) # create sub_ndrs data sub_ndrs = Counts( spectral=spectral, time=time_line, data=None, spatial=spatial, shape=( subexposures.shape[0], subexposures.shape[1], subexposures.shape[2], ), cached=True, output=sim_grp, dataset_name="sub_ndrs", output_path=None, metadata={"integration_times": integration_times}, dtype=np.float64, ) sub_ndrs.metadata.update(metadata) for chunk in sub_ndrs.dataset.iter_chunks(): sub_ndrs.dataset[chunk] = subexposures[chunk] sub_ndrs.output.flush() sub_ndrs.write() return sub_ndrs, sim_grp
[docs] def clean_output_tree( self, out: HDF5OutputGroup, key_list: List[str] ) -> None: """ Remove specified keys and their associated data from the output object. This method iterates through a list of keys and deletes each key-value pair from the output object. Useful for cleaning up temporary or unnecessary data from the output. Parameters ---------- out : HDF5OutputGroup The output object where key-value pairs are stored. key_list : list of str A list of keys that need to be removed from the output object. Examples -------- >>> out = Output({'key1': 'value1', 'key2': 'value2', 'key3': 'value3'}) >>> clean_output(out, ['key1', 'key3']) >>> out.data {'key2': 'value2'} Notes ----- The method assumes that the keys specified in the `key_list` are present in the output object. If a key is not found, the `delete_data` method should handle it gracefully. """ self.info("Cleaning the output tree") for key in key_list: out.delete_data(key)
[docs] def refactor_output(self, output_file: str) -> None: """ Renames the original output file and creates a new one with its contents. This method takes the original output file, renames it by appending '_unrefactored' to its name, and then copies its content into a new file with the original name. The refactored (old version) file is deleted after the operation. Parameters ---------- output_file : str The path of the original output file. """ # Log the refactoring process self.info("Refactoring output") # Split the output_file into filename and extension filename, file_extension = os.path.splitext(output_file) # Create a new name for the original file by appending '_unrefactored' new_name = "{}_unrefactored{}".format(filename, file_extension) # Rename the original file os.rename(output_file, new_name) # Open the renamed file and create a new output file # Then copy the contents from the renamed file to the new file with h5py.File(new_name, "r") as f: with h5py.File(output_file, "w") as g: copy_file(f, g) # Remove the renamed (old version) file os.remove(new_name)