Source code for exosim.recipes.createSubExposures

import os
from collections import OrderedDict
from typing import Tuple, Union

import h5py

import exosim.log as log
from exosim.models.signal import CountsPerSecond
from exosim.output import HDF5OutputGroup, SetOutput
from exosim.output.hdf5.utils import load_signal
from exosim.tasks.astrosignal import (
    ApplyAstronomicalSignal,
    FindAstronomicalSignals,
)
from exosim.tasks.subexposures import (
    AddForegrounds,
    ApplyQeMap,
    EstimatePointingJitter,
    InstantaneousReadOut,
    LoadILS,
    LoadQeMap,
    PrepareInstantaneousReadOut,
)
from exosim.utils.ascii_arts import astronomer2
from exosim.utils.iterators import iterate_over_chunks
from exosim.utils.klass_factory import find_and_run_task
from exosim.utils.prepare_recipes import (
    clean_config_files,
    copy_input_files,
    load_options,
)
from exosim.utils.runConfig import RunConfig
from exosim.utils.timed_class import TimedClass


[docs]class CreateSubExposures(TimedClass, log.Logger): """ Pipeline to create the focal planes sub-exposures. This pipeline loads the configuration file and the produced focal planes to produce the sub-exposures. It prepares the pointing jitter first, then it scales it to the instrument focal planes pixels. It estimates the detector reading scheme, considering instantaneous readout, and it produces the sub-exposures by jittering the focal planes. Attributes ------------ input: str input file mainConfig: dict This is parsed from :class:`~exosim.tasks.load.loadOptions.LoadOptions` payloadConfig: dict payload configuration dictionary extracted from mainConfig jitter_spa: :class:`~astropy.units.Quantity` pointing jitter in the spatial direction expressed in units of :math:`deg`. jitter_spe: :class:`~astropy.units.Quantity` pointing jitter in the spectral direction expressed in units of :math:`deg`. Examples -------- >>> import exosim.recipes as recipes >>> recipes.CreateSubExposures(input_file='./input_file.h5', >>> output_file = 'se_file.h5', >>> options_file='main _configuration.xml') """ def __init__( self, input_file: str, output_file: str, options_file: Union[str, dict] ) -> None: """ Parameters ---------- input_file: str input file output_file: str output file options_file: str or dict input configuration file """ super().__init__() self.graphics(astronomer2) RunConfig.stats() self.announce("started") clean_config_files() self.input = input_file self.mainConfig, self.payloadConfig = load_options(options_file) copy_input_files(os.path.dirname(os.path.abspath(output_file))) self.jitter_spa, self.jitter_spe, self.jitter_time = None, None, None output = SetOutput(output_file) # prepare channel list with h5py.File(self.input, "r") as f: ch_list = list(f["channels"].keys()) ch_list.sort() with output.use(append=True, cache=True) as out: # if jitter is requested, run the jitter task use_slicing = False if "jitter" in self.mainConfig.keys(): jitter_instance = find_and_run_task( self.mainConfig["jitter"], "jitter_task", EstimatePointingJitter, ) ( self.jitter_spa, self.jitter_spe, self.jitter_time, ) = jitter_instance( parameters=self.mainConfig, output_file=out ) if "slicing" in self.mainConfig["jitter"].keys(): use_slicing = self.mainConfig["jitter"]["slicing"] output_grp = out.create_group("channels") instantaneousReadOut = InstantaneousReadOut() prepareInstantaneousReadOut = PrepareInstantaneousReadOut() addForegrounds = AddForegrounds() for ch in ch_list: self.announce("computing sub-exposure for {}".format(ch)) # prepare the output group ch_grp = output_grp.create_group(ch) # copy the simulation data from the input file to the output file sim_grp = self.copy_simulation_data(ch, ch_grp) # load the focal plane fp, frg_fp, bkg_fp = self.load_focal_plane(ch) # determine if only a channel is provided or a list of channels if isinstance(self.payloadConfig["channel"], OrderedDict): ch_param = self.payloadConfig["channel"][ch] else: ch_param = self.payloadConfig["channel"] # prepare the readout task ( readout_parameters, integration_time, ) = prepareInstantaneousReadOut( main_parameters=self.mainConfig, parameters=ch_param, focal_plane=fp, pointing_jitter=( self.jitter_spa, self.jitter_spe, self.jitter_time, ), output_file=ch_grp, ) # run the readout task out_se = instantaneousReadOut( parameters=ch_param, focal_plane=fp, readout_parameters=readout_parameters, pointing_jitter=( self.jitter_spa, self.jitter_spe, self.jitter_time, ), slicing=use_slicing, output_file=ch_grp, ) # introducing the astronomical signal ils, timeline = None, None findAstronomicalSignals = FindAstronomicalSignals() applyAstronomicalSignal = ApplyAstronomicalSignal() astrosignals = findAstronomicalSignals( sky_parameters=self.mainConfig["sky"], ) for source, signals in astrosignals.items(): astro_grp = ch_grp.create_group("Astrosignal") source_grp = astro_grp.create_group(source) # TODO what if different signals are found for the same source with the same name? for signal, params in signals.items(): signal_grp = source_grp.create_group(signal) self.info( "{}-{} astronomical signal found.".format( source, signal ) ) # load the instrument line shape ils_instance = find_and_run_task( ch_param["detector"], "ils_task", LoadILS ) ils = ( ils_instance( input_file=self.input, ch_param=ch_param, wl_grid=fp.spectral * fp.spectral_units, ) if ils is None else ils ) # prepare the timeline for the astronomical signal as the middle of each sub-exposure timeline = ( ( out_se.time * out_se.time_units + integration_time / 2 ) if timeline is None else timeline ) # estimating the signal model signal_task = params["task"]() model_timeline, signal_model = signal_task( timeline=timeline, wl_grid=fp.spectral * fp.spectral_units, ch_parameters=ch_param, source_parameters=params["parsed_parameters"], output=signal_grp, ) # load the source flux to weight the signal model with h5py.File(self.input, "r") as f: source_dic = f[f"channels/{ch}/sources/{source}"] flux = source_dic["data"][0, 0] # apply the signal model to the sub-exposures out_se = applyAstronomicalSignal( model=signal_model, subexposures=out_se, focal_plane=fp, ils=ils, timeline=model_timeline, source=params, source_flux=flux, ch_parameters=ch_param, pointing=( self.mainConfig["pointing"]["ra"], self.mainConfig["pointing"]["dec"], ), ) # adding background stars if any add_background_to_se = False if "add_background_to_se" in ch_param["detector"].keys(): if ch_param["detector"]["add_background_to_se"]: add_background_to_se = True else: if bkg_fp: self.warning( "No indication for adding background found. Adding them by default." ) add_background_to_se = True if bkg_fp and add_background_to_se: bkg_se = instantaneousReadOut( parameters=ch_param, focal_plane=bkg_fp, readout_parameters=readout_parameters, pointing_jitter=( self.jitter_spa, self.jitter_spe, self.jitter_time, ), dataset_name="sub_exposures_bkg", output_file=ch_grp, ) for chunk in iterate_over_chunks( out_se.dataset, desc="adding background stars" ): out_se.dataset[chunk] += bkg_se.dataset[chunk] out_se.output.flush() # add the foregrounds to the sub-exposures try: if ch_param["detector"]["add_foregrounds_to_se"]: out_se = addForegrounds( subexposures=out_se, frg_focal_plane=frg_fp, integration_time=integration_time, ) except KeyError: self.warning( "No indication for adding foregrounds found. Adding them by default." ) out_se = addForegrounds( subexposures=out_se, frg_focal_plane=frg_fp, integration_time=integration_time, ) # apply the QE map if present try: qe_map_instance = find_and_run_task( ch_param["detector"], "qe_map_task", LoadQeMap ) qe_map = qe_map_instance( parameters=ch_param, time=fp.time * fp.time_units, output=sim_grp, ) applyQEMap = ApplyQeMap() out_se = applyQEMap(subexposures=out_se, qe_map=qe_map) except KeyError: self.warning( "No quantum efficiency variation map detected" ) # store the results in output out_se.write() self.log_runtime("{} ended in".format(ch), "info") self.info( "output {} size: {:.3f}".format(output_file, out.getsize()) ) self.log_runtime_complete("recipe ended", "info") self.announce("ended")
[docs] def load_focal_plane( self, ch: str ) -> Tuple[CountsPerSecond, CountsPerSecond]: """ It loads the channel focal plane from the input file: Parameters ---------- ch: str channel name Returns ------- :class:`~exosim.models.signal.CountsPerSecond` focal plane :class:`~exosim.models.signal.CountsPerSecond` foreground focal plane :class:`~exosim.models.signal.CountsPerSecond` bkg focal plane""" with h5py.File(self.input, "r") as f: file_path = os.path.join("channels", ch) fp = load_signal(f[os.path.join(file_path, "focal_plane")]) frg_fp = load_signal(f[os.path.join(file_path, "frg_focal_plane")]) try: bkg_fp = load_signal( f[os.path.join(file_path, "bkg_focal_plane")] ) except KeyError: bkg_fp = None self.debug("focal planes loaded from {}".format(ch)) return fp, frg_fp, bkg_fp
[docs] def load_source(self, ch: str, source: str) -> Tuple[CountsPerSecond]: """ It loads the channel focal plane from the input file: Parameters ---------- ch: str channel name source: str source name Returns ------- :class:`~exosim.models.signal.CountsPerSecond` source """ with h5py.File(self.input, "r") as f: file_path = os.path.join("channels", ch) path = os.path.join(file_path, "sources") source = load_signal(f[os.path.join(path, source)]) self.debug("{} source loaded from {}".format(source, ch)) return source
[docs] def copy_simulation_data( self, ch: str, ch_grp: HDF5OutputGroup ) -> HDF5OutputGroup: """ It copies relevant data from the input file into the output one. The copied data are: `info`, `qe_map`, `efficiency` and `responsivity`. Parameters ---------- ch: str channel name ch_grp: :class:`~exosim.output.hdf5.hdf5.HDF5OutputGroup` output file Returns -------- :class:`~exosim.output.hdf5.hdf5.HDF5OutputGroup` output group """ sim_grp = ch_grp.create_group("simulation_data") with h5py.File(self.input, "r") as f: # copy the original data info f.copy("info", sim_grp._entry, name="focal_plane_info") # copy simulation data file_path = os.path.join("channels", ch) for info in ["efficiency", "responsivity"]: try: f.copy(os.path.join(file_path, info), sim_grp._entry) self.debug("{} data copied to output".format(info)) except KeyError: continue return sim_grp