Source code for exosim.tasks.detector.addKTC

import numpy as np
from numba import jit

from exosim.output import Output
from exosim.tasks.task import Task
from exosim.utils import RunConfig
from exosim.utils.checks import check_units
from exosim.utils.iterators import iterate_over_chunks


[docs]class AddKTC(Task): """ It adds the ktc bias to the sub-exposures. This Task produces a new random offset map for each ramp, and it adds it to the sub-exposures of the same ramp. If an output group is provided, it saves all the random seeds used. """ def __init__(self): """ Parameters ---------- subexposures: :class:`~exosim.models.signal.Counts` sub-exposures cached signal parameters: dict channel parameters dictionary state_machine: :class:`numpy.ndarray` array indicating the exposures number of each sub-exposure. output: :class:`~exosim.output.output.Output` (optional) output file """ self.add_task_param("subexposures", "sub-exposures cached signal") self.add_task_param("parameters", "channel parameters dictionary") self.add_task_param("state_machine", "ramp state machine") self.add_task_param("output", "output file", None)
[docs] def execute(self): self.info("adding reset bias") subexposures = self.get_task_param("subexposures") parameters = self.get_task_param("parameters") state_machine = self.get_task_param("state_machine") output = self.get_task_param("output") ktc_std = parameters["detector"]["ktc_sigma"].astype(np.float64) ktc_std = check_units(ktc_std, "ct").value offset = RunConfig.random_generator.normal( 0, ktc_std, (subexposures.dataset.shape[1], subexposures.dataset.shape[2]), ) random_seeds = [] for chunk in iterate_over_chunks( subexposures.dataset, desc="adding ktc noise" ): if ( state_machine[chunk[0].start - 1] != state_machine[chunk[0].start] ): offset = RunConfig.random_generator.normal( 0, ktc_std, ( subexposures.dataset.shape[1], subexposures.dataset.shape[2], ), ) subexposures.dataset[chunk], offset = self.add_offset( subexposures.dataset[chunk], state_machine[chunk[0]], offset, ktc_std, ) subexposures.output.flush() random_seeds.append(RunConfig.random_seed) if output: if issubclass(output.__class__, Output): out_grp = output.create_group("kTC noise") out_grp.write_list("random_seed", random_seeds) out_grp.write_array( "chunks_index", np.arange(len(random_seeds)) )
@staticmethod @jit(nopython=True)
[docs] def add_offset( dset: np.ndarray, state_machine: np.ndarray, offset: np.ndarray, bias_std: float, ): dset = dset.astype(np.float64) state_machine = state_machine.astype(np.int64) offset = offset.astype(np.float64) dset[0] += offset for i in range(1, dset.shape[0]): if state_machine[i] == state_machine[i - 1]: dset[i] += offset else: offset = np.random.normal(0.0, bias_std, dset[i].shape).astype( np.float64 ) dset[i] += offset return dset, offset