Source code for exosim.tasks.subexposures.computeReadingScheme
import astropy.units as u
import numpy as np
import exosim.output as output
from exosim.tasks.task import Task
from exosim.utils.checks import check_units
[docs]class ComputeReadingScheme(Task):
"""
It computes the reading scheme for the sub-exposures given the instrument parameters and the focal planes.
Returns
-------
:class:`astropy.units.Quantity`
simulation frequency.
:class:`numpy.ndarray`
state machine for the reading operation on the ramp.
:class:`numpy.ndarray`
state machine of the group ends sampling the ramp.
:class:`numpy.ndarray`
state machine of the group starts sampling the ramp.
:class:`numpy.ndarray`
full list of simulation stapes for each steps on the ramp repeated by the number of ramps.
int
number of exposures needed to sample the full observation using ramps of the exposure time size.
"""
def __init__(self):
"""
Parameters
----------
parameters: dict
channel parameters dict
main_parameters: dict
main parameters dict
output_file: :class:`~exosim.output.output.Output` (optional)
output file
"""
self.add_task_param("parameters", "channel parameters dict")
self.add_task_param("main_parameters", "main parameters dict")
self.add_task_param(
"readout_oversampling", "readout oversampling factor", 1
)
self.add_task_param("output_file", "output file")
[docs] def execute(self):
parameters = self.get_task_param("parameters")
main_parameters = self.get_task_param("main_parameters")
readout_oversampling = self.get_task_param("readout_oversampling")
output_file = self.get_task_param("output_file")
# load reading scheme parameters and convert to cycle clocks
total_observing_time = main_parameters["time_grid"]["end_time"]
self.info("total observing time: {}".format(total_observing_time))
clock_freq = check_units(
parameters["readout"]["readout_frequency"], u.Hz
)
clock_freq *= readout_oversampling
clock = check_units(clock_freq, u.s)
n_NRDs_per_group = parameters["readout"]["n_NRDs_per_group"]
n_GRPs = parameters["readout"]["n_groups"]
n_clk_GND = (
parameters["readout"]["n_sim_clocks_Ground"] * readout_oversampling
)
n_clk_NDR0 = (
parameters["readout"]["n_sim_clocks_first_NDR"]
* readout_oversampling
)
n_clk_NDR = (
parameters["readout"]["n_sim_clocks_NDR"]
if "n_sim_clocks_NDR" in parameters["readout"].keys()
else 1
)
n_clk_NDR *= readout_oversampling
n_clk_RST = (
parameters["readout"]["n_sim_clocks_Reset"] * readout_oversampling
)
n_clk_GRP = (
parameters["readout"]["n_sim_clocks_groups"] * readout_oversampling
)
# define exposures time
exposure_time = (
n_clk_GND
+ n_clk_NDR0
+ n_clk_NDR * (n_NRDs_per_group - 1)
+ (n_clk_GRP + n_clk_NDR * (n_NRDs_per_group - 1)) * (n_GRPs - 1)
+ n_clk_RST
) * clock
self.info("exposure time: {}".format(exposure_time))
# define number of exposures
if "n_exposures" in parameters["readout"].keys():
number_of_exposures = parameters["readout"]["n_exposures"]
self.debug(
"number of exposures manually set to {}".format(
number_of_exposures
)
)
else:
number_of_exposures = np.ceil(
total_observing_time.to(u.s) / exposure_time
).astype(int)
self.debug(
"number of exposures automatically set to {}".format(
number_of_exposures
)
)
self.info("number of exposures: {}".format(number_of_exposures))
# prepare scheme output
read_dict = {
"n_NRDs_per_group": n_NRDs_per_group,
"n_GRPs": n_GRPs,
"n_clk_GND": n_clk_GND,
"n_clk_RST": n_clk_RST,
"n_clk_ndr": n_clk_NDR,
"n_clk_GRP": n_clk_GRP,
"exposure_time": exposure_time,
"number_of_exposures": number_of_exposures,
}
if issubclass(output_file.__class__, output.Output):
output_file.store_dictionary(read_dict, "reading_scheme_params")
# build state machines
base = (
[n_clk_GND]
+ [n_clk_NDR0]
+ [n_clk_NDR] * (n_NRDs_per_group - 1)
+ ([n_clk_GRP] + [n_clk_NDR] * (n_NRDs_per_group - 1))
* (n_GRPs - 1)
+ [n_clk_RST]
)
base_mask = np.array([0] + [1] * (n_NRDs_per_group) * (n_GRPs) + [0])
self.info(
"number of NDRs: {}".format(number_of_exposures * base_mask.sum())
)
frame_sequence = np.tile(base, number_of_exposures)
self.set_output(
[clock, base_mask, frame_sequence, number_of_exposures]
)