import os
from collections import OrderedDict
from typing import Union
import astropy.units as u
import numpy as np
from astropy.table import QTable, hstack, vstack
import exosim.log as log
import exosim.recipes as recipes
import exosim.tasks.radiometric as radiometric
from exosim.output import SetOutput
from exosim.utils.ascii_arts import astronomer4
from exosim.utils.klass_factory import find_task
from exosim.utils.prepare_recipes import clean_config_files, load_options
from exosim.utils.runConfig import RunConfig
from exosim.utils.timed_class import TimedClass
[docs]class RadiometricModel(TimedClass, log.Logger):
"""
Pipeline to create the radiometric model.
This pipeline has three working modes:
- it can load an already produced focal plane and use it to estimate a radiometric model;
- it can produce a single sourec focal planet and estimate the radiometric model;
- it can load a target list and produce the radiometric model for each target of the target list.
Attributes
------------
mainConfig: dict
This is parsed from :class:`~exosim.tasks.load.loadOptions.LoadOptions`
input: :class:`~exosim.output.output.Output`
input/output file
payloadConfig: dict
payload configuration dictionary extracted from mainConfig`
table: :class:`~astropy.table.QTable`
table for the radiometric estimations
Examples
--------
If the user wants to estimate the radiometric model af an existing focal plane
>>> import exosim.recipes as recipes
>>> rm = recipes.RadiometricModel(options_file= 'main _configuration.xml',
>>> input_file = 'focal_plane.h5')
Otherwhise, if a focal planet has not been produced yet, this recipy can produce it,
if a destination not existing file is provided:
>>> import exosim.recipes as recipes
>>> rm = recipes.RadiometricModel(options_file= 'main _configuration.xml',
>>> input_file = 'desired_output.h5')
In both cases, to store the produced table into the output file,
the :func:`~exosim.recipes.radiometricModel.RadiometricModel.write`
is to be used:
>>> rm.write()
"""
def __init__(
self, options_file: Union[str, dict], input_file: str
) -> None:
"""
Parameters
----------
options_file: str or dict
input configuration file
input_file: str
input file
"""
super().__init__()
self.graphics(astronomer4)
RunConfig.stats()
self.announce("started")
clean_config_files()
self.mainConfig, self.payloadConfig = load_options(options_file)
self.table = None
# if focal plane already exists it loads it
if os.path.isfile(input_file):
self.input = SetOutput(input_file, replace=False)
self.info("Running radiometric model on existing focal plane")
self.single_file_pipeline()
# if focal plane doesn't exist it create it for a single step
elif input_file.endswith("h5"):
self.info(
"Focal plane not found: running CreateFocalPlane pipeline"
)
# single time step
self.mainConfig["time_grid"] = {
"start_time": 0 * u.hr,
"end_time": 1 * u.hr,
"low_frequencies_resolution": 1 * u.hr,
}
self._isolate_every_opt()
recipes.CreateFocalPlane(
options_file=self.mainConfig, output_file=input_file
)
self.input = SetOutput(input_file, replace=False)
self.single_file_pipeline()
# if input is a target list, it creates the focal plane only for
# instruments and foregrounds
elif input_file.endswith(".csv"):
raise NotImplementedError
# # single time step
# self.mainConfig['time_grid'] = {'start_time': 0 * u.hr,
# 'end_time': 1 * u.hr,
# 'low_frequencies_resolution': 1 * u.hr}
# # remove source
# if 'source' in self.mainConfig['sky'].keys():
# self.mainConfig['sky'].pop('source')
# self._isolate_every_opt()
# recipes.CreateFocalPlane(options_file=self.mainConfig,
# output_file=input_file)
self.common_pipeline()
self.write()
self.log_runtime_complete("recipe ended", "info")
self.announce("ended")
def _isolate_every_opt(self) -> None:
"""
it iterates over the optical elements to isolate them and craete sub
focal planes
"""
from exosim.utils.iterators import iterate_over_opticalElements
self.mainConfig["sky"] = iterate_over_opticalElements(
self.mainConfig["sky"], "foregrounds", "isolate", True
)
self.payloadConfig = iterate_over_opticalElements(
self.payloadConfig, "Telescope", "isolate", True
)
self.payloadConfig = iterate_over_opticalElements(
self.payloadConfig, "channel", "isolate", True
)
[docs] def single_file_pipeline(self) -> None:
"""
Radiometric pipeline to run for a single target with an already
produced focal plane. The involved steps are:
1. creation of the wavelength table with :func:`~exosim.recipes.radiometricModel.RadiometricModel.create_table`;
2. estimation of the apertures sizes and number of pixels involved with :func:`~exosim.recipes.radiometricModel.RadiometricModel.compute_apertures`;
3. estimation of the signals in the apertures for the sub foregrounds, if any: :func:`~exosim.recipes.radiometricModel.RadiometricModel.compute_sub_foregrounds_signals`;
4. estimation of the total foreground signal in the apertures: :func:`~exosim.recipes.radiometricModel.RadiometricModel.compute_foreground_signals`;
5. estimation of the source focal plane signal in the aperture: :func:`~exosim.recipes.radiometricModel.RadiometricModel.compute_source_signals`;
6. estimation of the saturation time in the channel: :func:`~exosim.recipes.radiometricModel.RadiometricModel.compute_saturation`;
The pipeline will update the `table` attribute.
"""
self.table = self.create_table()
self.compute_apertures()
self.compute_sub_foregrounds_signals()
self.compute_foreground_signals()
self.compute_source_signals()
self.compute_saturation()
[docs] def common_pipeline(self) -> None:
"""
Radiometric pipeline to run starting from a radiometric table with already estimated signals.
It computes the noise.
1. estimation of the multiaccum factors :func:`~exosim.recipes.radiometricModel.RadiometricModel.compute_multiaccum`;
2. estimation shot noise :func:`~exosim.recipes.radiometricModel.RadiometricModel.compute_photon_noise`;
3. update total noise :func:`~exosim.recipes.radiometricModel.RadiometricModel.update_total_noise`
The pipeline will update the `table` attribute.
"""
self.compute_multiaccum()
self.compute_photon_noise()
self.update_total_noise()
[docs] def create_table(self) -> QTable:
"""
Produces the starting radiometric table with the spectral bins and their edges.
It is based on :class:`~exosim.tasks.radiometric.estimateSpectralBinning.EstimateSpectralBinning` by default.
Returns
-------
:class:`~astropy.table.QTable`
table for the radiometric estimations with wavelength grid.
"""
self.info("Computing radiometric signals")
estimateSpectralBinning_task = (
find_task(
self.payloadConfig["channel"]["spectral_binning_task"],
radiometric.EstimateSpectralBinning,
)
if "spectral_binning_task" in self.payloadConfig["channel"].keys()
else radiometric.EstimateSpectralBinning
)
estimateSpectralBinning = estimateSpectralBinning_task()
if isinstance(self.payloadConfig["channel"], OrderedDict):
table_list = [
estimateSpectralBinning(
parameters=self.payloadConfig["channel"][ch]
)
for ch in self.payloadConfig["channel"].keys()
]
else:
table_list = [
estimateSpectralBinning(
parameters=self.payloadConfig["channel"]
)
]
return vstack(table_list)
[docs] def compute_apertures(self) -> QTable:
"""
Estimates the photometric aperture for each spectral bin
using :class:`~exosim.tasks.radiometric.estimateApertures.EstimateApertures` by default.
Returns
-------
:class:`~astropy.table.QTable`
table with the apertures for each channel and bin
"""
if isinstance(self.payloadConfig["channel"], OrderedDict):
table_list = []
for ch in self.payloadConfig["channel"].keys():
self.info("estimating apertures on {}".format(ch))
description = self.payloadConfig["channel"][ch]
estimateApertures_task = (
find_task(
description["radiometric"]["aperture_photometry"][
"apertures_task"
],
radiometric.EstimateApertures,
)
if "apertures_task"
in description["radiometric"]["aperture_photometry"].keys()
else radiometric.EstimateApertures
)
estimateApertures = estimateApertures_task()
with self.input.open() as f:
f = f["channels"]
self.debug("extracting wavelength grid")
osf = f[ch]["focal_plane"]["metadata"]["oversampling"][()]
focal_plane = f[ch]["focal_plane"]["data"][
0, osf // 2 :: osf, osf // 2 :: osf
]
wl_grid = f[ch]["focal_plane"]["spectral"][
osf // 2 :: osf
] * u.Unit(f[ch]["focal_plane"]["spectral_units"][()])
table_list += [
estimateApertures(
table=self.table[self.table["ch_name"] == ch],
focal_plane=focal_plane,
description=description["radiometric"][
"aperture_photometry"
],
wl_grid=wl_grid,
)
]
else:
description = self.payloadConfig["channel"]
self.info(
"estimating apertures on {}".format(description["value"])
)
estimateApertures_task = (
find_task(
description["radiometric"]["aperture_photometry"][
"apertures_task"
],
radiometric.EstimateApertures,
)
if "apertures_task"
in description["radiometric"]["aperture_photometry"].keys()
else radiometric.EstimateApertures
)
estimateApertures = estimateApertures_task()
with self.input.open() as f:
f = f["channels"]
ch = list(f.keys())[0]
self.debug("extracting wavelength grid")
osf = f[ch]["focal_plane"]["metadata"]["oversampling"][()]
focal_plane = f[ch]["focal_plane"]["data"][
0, osf // 2 :: osf, osf // 2 :: osf
]
wl_grid = f[ch]["focal_plane"]["spectral"][
osf // 2 :: osf
] * u.Unit(f[ch]["focal_plane"]["spectral_units"][()])
table_list = [
estimateApertures(
table=self.table,
focal_plane=focal_plane,
wl_grid=wl_grid,
description=description["radiometric"][
"aperture_photometry"
],
)
]
stack = vstack(table_list)
self.table = hstack((self.table, stack))
return hstack((self.table["ch_name", "Wavelength"], stack))
[docs] def write(self, output_file: str = None) -> None:
"""
It adds the radiometric table to the output.
If the table exists already in the output file, it replaces it.
Parameters
----------
output_file: str (optional)
output file. Default is `input`
"""
output = (
self.input
if output_file is None
else SetOutput(output_file, replace=False)
)
with output.use(append=True) as out:
self.info("radiometric table stored in {}".format(output.fname))
rad_out = out.create_group("radiometric")
rad_out.write_table("table", self.table, replace=True)
[docs] def compute_sub_foregrounds_signals(self) -> QTable:
"""
It estimates the radiometric signals on the foreground sub focal planes for all the
channels and returns a table with all the contributions.
It uses :class:`~exosim.tasks.radiometric.computeSubFrgSignalsChannel.ComputeSubFrgSignalsChannel` by default.
Returns
-------
astropy.table.QTable:
signal table
"""
table_list = []
if isinstance(self.payloadConfig["channel"], OrderedDict):
for ch in self.payloadConfig["channel"].keys():
self.info(
"estimating sub-foreground radiometric signal for {}".format(
ch
)
)
computeFrgSignalsChannel_task = (
find_task(
self.payloadConfig["channel"][ch]["radiometric"][
"sub_frg_signal_task"
],
radiometric.ComputeSubFrgSignalsChannel,
)
if "sub_frg_signal_task"
in self.payloadConfig["channel"][ch]["radiometric"].keys()
else radiometric.ComputeSubFrgSignalsChannel
)
computeFrgSignalsChannel = computeFrgSignalsChannel_task()
table_list += [
computeFrgSignalsChannel(
table=self.table[self.table["ch_name"] == ch],
ch_name=ch,
input_file=self.input,
parameters=self.payloadConfig["channel"],
)
]
else:
self.info(
"estimating sub-foreground radiometric signal for {}".format(
self.payloadConfig["channel"]["value"]
)
)
computeFrgSignalsChannel_task = (
find_task(
self.payloadConfig["channel"]["radiometric"][
"sub_frg_signal_task"
],
radiometric.ComputeSubFrgSignalsChannel,
)
if "sub_frg_signal_task"
in self.payloadConfig["channel"]["radiometric"].keys()
else radiometric.ComputeSubFrgSignalsChannel
)
computeFrgSignalsChannel = computeFrgSignalsChannel_task()
table_list = [
computeFrgSignalsChannel(
table=self.table,
ch_name=self.payloadConfig["channel"]["value"],
input_file=self.input,
)
]
stack = vstack(table_list)
self.table = hstack((self.table, stack))
for k in self.table.keys():
if hasattr(self.table[k], "filled"):
self.table[k] = self.table[k].filled(0.0)
ret_k = ["ch_name", "Wavelength"] + list(stack.keys())
return self.table[ret_k], stack
[docs] def compute_foreground_signals(self) -> QTable:
"""
It estimates the radiometric signals on the foreground focal plane for all the
channels and returns a table with all the contributions.
It uses :class:`~exosim.tasks.radiometric.computeSignalsChannel.ComputeSignalsChannel` by default.
Returns
-------
astropy.table.QTable:
signal table
"""
phot = np.array([])
with self.input.open() as f:
if isinstance(self.payloadConfig["channel"], OrderedDict):
for ch in self.payloadConfig["channel"].keys():
self.info(
"estimating foreground radiometric signal for {}".format(
ch
)
)
computeSignalsChannel_task = (
find_task(
self.payloadConfig["channel"][ch]["radiometric"][
"signal_task"
],
radiometric.ComputeSignalsChannel,
)
if "signal_task"
in self.payloadConfig["channel"][ch][
"radiometric"
].keys()
else radiometric.ComputeSignalsChannel
)
computeSignalsChannel = computeSignalsChannel_task()
focal_plane = f["channels"][ch]["frg_focal_plane"]
phot_ = computeSignalsChannel(
table=self.table[self.table["ch_name"] == ch],
focal_plane=focal_plane,
)
phot = np.concatenate((phot, phot_))
else:
self.info(
"estimating foreground radiometric signal for {}".format(
self.payloadConfig["channel"]["value"]
)
)
computeSignalsChannel_task = (
find_task(
self.payloadConfig["channel"]["radiometric"][
"signal_task"
],
radiometric.ComputeSignalsChannel,
)
if "signal_task"
in self.payloadConfig["channel"]["radiometric"].keys()
else radiometric.ComputeSignalsChannel
)
computeSignalsChannel = computeSignalsChannel_task()
ch = list(f["channels"].keys())[0]
focal_plane = f["channels"][ch]["frg_focal_plane"]
phot_ = computeSignalsChannel(
table=self.table, focal_plane=focal_plane
)
phot = np.concatenate((phot, phot_))
self.table["foreground_signal_in_aperture"] = phot
return self.table[
"ch_name", "Wavelength", "foreground_signal_in_aperture"
]
[docs] def compute_source_signals(self) -> QTable:
"""
It estimates the radiometric signals on the source focal plane for all the
channels and returns a table with all the contributions.
It uses :class:`~exosim.tasks.radiometric.computeSignalsChannel.ComputeSignalsChannel` by default.
Returns
-------
astropy.table.QTable:
signal table
"""
phot = np.array([])
with self.input.open() as f:
if isinstance(self.payloadConfig["channel"], OrderedDict):
for ch in self.payloadConfig["channel"].keys():
self.info(
"estimating source radiometric signal for {}".format(
ch
)
)
computeSignalsChannel_task = (
find_task(
self.payloadConfig["channel"][ch]["radiometric"][
"signal_task"
],
radiometric.ComputeSignalsChannel,
)
if "signal_task"
in self.payloadConfig["channel"][ch][
"radiometric"
].keys()
else radiometric.ComputeSignalsChannel
)
computeSignalsChannel = computeSignalsChannel_task()
focal_plane = f["channels"][ch]["focal_plane"]
phot_ = computeSignalsChannel(
table=self.table[self.table["ch_name"] == ch],
focal_plane=focal_plane,
)
phot = np.concatenate((phot, phot_))
else:
self.info(
"estimating source radiometric signal for {}".format(
self.payloadConfig["channel"]["value"]
)
)
computeSignalsChannel_task = (
find_task(
self.payloadConfig["channel"]["radiometric"][
"signal_task"
],
radiometric.ComputeSignalsChannel,
)
if "signal_task"
in self.payloadConfig["channel"]["radiometric"].keys()
else radiometric.ComputeSignalsChannel
)
computeSignalsChannel = computeSignalsChannel_task()
ch = list(f["channels"].keys())[0]
focal_plane = f["channels"][ch]["focal_plane"]
phot_ = computeSignalsChannel(
table=self.table, focal_plane=focal_plane
)
phot = np.concatenate((phot, phot_))
self.table["source_signal_in_aperture"] = phot
return self.table["ch_name", "Wavelength", "source_signal_in_aperture"]
[docs] def compute_saturation(self) -> QTable:
"""
It computes and adds the saturation time to the radiometric table
Returns
-------
astropy.table.QTable:
saturation table
"""
saturations, integration_time, max_signal_in_bin, min_signal_in_bin = (
[],
[],
[],
[],
)
saturationChannel = radiometric.SaturationChannel()
if isinstance(self.payloadConfig["channel"], OrderedDict):
for ch in self.payloadConfig["channel"].keys():
sat, t_int, max_, min_ = saturationChannel(
table=self.table,
description=self.payloadConfig["channel"][ch],
input_file=self.input,
)
saturations += sat
integration_time += t_int
max_signal_in_bin += max_
min_signal_in_bin += min_
else:
sat, t_int, max_, min_ = saturationChannel(
table=self.table,
description=self.payloadConfig["channel"],
input_file=self.input,
)
saturations += sat
integration_time += t_int
max_signal_in_bin += max_
min_signal_in_bin += min_
self.table["saturation_time"] = saturations
self.table["integration_time"] = integration_time
self.table["max_signal_in_bin"] = max_signal_in_bin
self.table["min_signal_in_bin"] = min_signal_in_bin
return self.table[
"ch_name",
"Wavelength",
"saturation_time",
"integration_time",
"max_signal_in_bin",
"min_signal_in_bin",
]
[docs] def compute_multiaccum(self) -> QTable:
"""
It estimates the multiaccum gain factors using :class:`~exosim.tasks.radiometric.multiaccum.Multiaccum`. The
Returns
-------
astropy.table.QTable:
multiaccum factors
"""
read_gain, shot_gain = [], []
multiaccum = radiometric.Multiaccum()
if isinstance(self.payloadConfig["channel"], OrderedDict):
for ch in self.payloadConfig["channel"].keys():
if (
"multiaccum"
in self.payloadConfig["channel"][ch]["radiometric"].keys()
):
read, shot = multiaccum(
parameters=self.payloadConfig["channel"][ch][
"radiometric"
]["multiaccum"]
)
read_gain += [read] * len(
self.table[self.table["ch_name"] == ch]
)
shot_gain += [shot] * len(
self.table[self.table["ch_name"] == ch]
)
else:
read_gain += [1] * len(
self.table[self.table["ch_name"] == ch]
)
shot_gain += [1] * len(
self.table[self.table["ch_name"] == ch]
)
else:
if (
"multiaccum"
in self.payloadConfig["channel"]["radiometric"].keys()
):
read, shot = multiaccum(
parameters=self.payloadConfig["channel"]["radiometric"][
"multiaccum"
]
)
read_gain += [read] * len(self.table)
shot_gain += [shot] * len(self.table)
else:
read_gain += [1] * len(self.table)
shot_gain += [1] * len(self.table)
self.table["multiaccum_read_gain"] = read_gain
self.table["multiaccum_shot_gain"] = shot_gain
return self.table[
"ch_name",
"Wavelength",
"multiaccum_read_gain",
"multiaccum_shot_gain",
]
[docs] def compute_photon_noise(self) -> QTable:
"""
It computes and adds the photon noise to the radiometric table using :class:`~exosim.tasks.radiometric.computePhotonNoise.ComputePhotonNoise`.
Returns
-------
astropy.table.QTable:
photon noise
"""
computePhotonNoise = radiometric.ComputePhotonNoise()
signals = [k for k in self.table.keys() if "_signal_in_aperture" in k]
for sig in signals:
phot_noise = []
for i in range(len(self.table["ch_name"])):
ch_description = (
self.payloadConfig["channel"][self.table["ch_name"][i]]
if isinstance(self.payloadConfig["channel"], OrderedDict)
else self.payloadConfig["channel"]
)
phot_noise += [
computePhotonNoise(
signal=self.table[sig][i],
description=ch_description,
multiaccum_gain=self.table["multiaccum_shot_gain"][i],
)
]
name = sig.replace("_signal_in_aperture", "")
phot_noise = (
np.array([p.value for p in phot_noise]) * phot_noise[0].unit
)
self.table["{}_photon_noise".format(name)] = phot_noise
return self.table[
"ch_name",
"Wavelength",
"source_photon_noise",
"foreground_photon_noise",
]
[docs] def update_total_noise(self) -> QTable:
"""Updates the total noise column in the radiometric table."""
computeTotalNoise = radiometric.ComputeTotalNoise()
total_noise = computeTotalNoise(table=self.table)
self.table["total_noise"] = total_noise
return self.table["ch_name", "Wavelength", "total_noise"]