Source code for exosim.tasks.detector.addGainDrift

from copy import deepcopy
from typing import Optional

import astropy.units as u
import numpy as np

from exosim.models.signal import Signal
from exosim.output import Output
from exosim.tasks.task import Task
from exosim.utils import RunConfig
from exosim.utils.checks import check_units
from exosim.utils.iterators import iterate_over_chunks
from exosim.utils.types import ArrayType


[docs]class AddGainDrift(Task): r""" It adds a gain noise map to the array. The gain Drift is modeled as a polynomial model for a spectral and time dependent modulation. Notes ----- This is a default class with standardised inputs and outputs. The user can load this class and overwrite the "model" method to implement a custom Task to replace this. """ def __init__(self): """ Parameters ---------- subexposures: :class:`~exosim.models.signal.Counts` sub-exposures cached signal parameters: dict channel parameters dictionary time: :class:`~astropy.units.Quantity` sub-exposures times integration_times: :class:`~astropy.units.Quantity` sub-exposures integration times outputs: :class:`~exosim.output.output.Output` (optional) output file """ self.add_task_param("subexposures", "sub-exposures cached signal") self.add_task_param("parameters", "channel parameters dictionary") self.add_task_param("output", "output file", None)
[docs] def execute(self): self.info("adding gain noise") subexposures = self.get_task_param("subexposures") parameters = self.get_task_param("parameters") output = self.get_task_param("output") self.model(subexposures, parameters, output)
[docs] def model( self, subexposures: Signal, parameters: dict, output: Optional[Output] = None, ) -> None: """ Apply a gain drift model to the subexposures. This method models the gain drift as a polynomial trend based on time and wavelength. Parameters ---------- subexposures : Signal Sub-exposures cached signal. parameters : dict A dictionary containing parameters for the noise model. Expected keys and sub-keys include: - 'readout': dict with 'readout_frequency' - 'detector': dict with keys 'gain_w0', 'gain_f0', 'gain_coeff_order_t', 'gain_coeff_t_min', 'gain_coeff_t_max', 'gain_coeff_order_w', 'gain_coeff_w_min', and 'gain_coeff_w_max'. integration_times : np.ndarray Sub-exposures integration times. output : Optional[Output] Output file to write information, defaults to None. Returns ------- None This method does not return a value but updates the subexposures dataset with applied gain noise. Notes ----- The method computes Brownian noise based on the frequency parameters in 'parameters' and applies a polynomial trend to it. The trend's coefficients are randomly generated within specified ranges. The resulting gain noise is then rebinned and applied to the subexposures. """ if output and issubclass(output.__class__, Output): out_grp = output.create_group("gain noise") else: out_grp = None time = subexposures.time * u.Unit(subexposures.time_units) integration_times = subexposures.metadata["integration_times"] if isinstance(integration_times, dict): integration_times = integration_times["value"] * u.Unit( integration_times["unit"] ) integration_times.to(u.s) # Calculating polynomial trend order_t = parameters["detector"]["gain_coeff_order_t"] coeff_t_min = parameters["detector"]["gain_coeff_t_min"] coeff_t_max = parameters["detector"]["gain_coeff_t_max"] coeff_t = RunConfig.random_generator.uniform( low=coeff_t_min, high=coeff_t_max, size=order_t + 1 ) if out_grp is not None: out_grp.write_list("gain coeff_t", coeff_t) y_t = self._pol_t(time, coeff_t) wl_ax = np.arange(0, subexposures.dataset.shape[2], 1) order_w = parameters["detector"]["gain_coeff_order_w"] coeff_w_min = parameters["detector"]["gain_coeff_w_min"] coeff_w_max = parameters["detector"]["gain_coeff_w_max"] coeff_w = RunConfig.random_generator.uniform( low=coeff_w_min, high=coeff_w_max, size=order_w + 1 ) if out_grp is not None: out_grp.write_list("gain coeff_w", coeff_w) y_w = self._pol_w(wl_ax, coeff_w) z = np.zeros([y_t.size, y_w.size]) for i, y in enumerate(y_t): z[i] = y * y_w if "gain_drift_amplitude" in parameters["detector"].keys(): amplitude = parameters["detector"]["gain_drift_amplitude"] elif "gain_drift_amplitude_range_min" in parameters["detector"].keys(): amplitude_min = parameters["detector"][ "gain_drift_amplitude_range_min" ] amplitude_max = parameters["detector"][ "gain_drift_amplitude_range_max" ] amplitude = RunConfig.random_generator.uniform( amplitude_min, amplitude_max ) if out_grp is not None: out_grp.write_list( "gain amplitude random_seed", RunConfig.random_seed ) else: self.error("missing amplitude definition for gain") dinamic_range = z.max() - z.min() z *= amplitude / dinamic_range z += 1 if out_grp is not None: out_grp.write_array("gain drift", z) out_grp.write_array("coeff_t", coeff_t) out_grp.write_scalar("amplitude", amplitude) out_grp.write_array("coeff_w", coeff_w) # Applying the multiplicative noise for chunk in iterate_over_chunks( subexposures.dataset, desc="applying gain noise" ): chunk_data = deepcopy(subexposures.dataset[chunk]) subexposures.dataset[chunk] = ( chunk_data * z[chunk[0].start : chunk[0].stop, np.newaxis, :] ) subexposures.output.flush()
@staticmethod def _pol_t(tt: np.ndarray, coef: np.ndarray) -> np.ndarray: """ Polynomial trend calculation for time. Parameters ---------- tt : np.ndarray Time array. coef : np.ndarray Coefficients for the polynomial trend. Returns ------- np.ndarray Calculated polynomial trend for time. """ x = ((tt - tt.min()) / (tt.max() - tt.min())).si.value y = np.zeros(tt.size) for i, c in enumerate(coef): y += c * x**i return y @staticmethod def _pol_w(wl: np.ndarray, coef: np.ndarray) -> np.ndarray: """ Polynomial trend calculation for wavelength. Parameters ---------- wl : np.ndarray spectral axis array [pixel unit]. coef : np.ndarray Coefficients for the polynomial trend. Returns ------- np.ndarray Calculated polynomial trend for wavelength. """ x = (wl - wl.min()) / (wl.max() - wl.min()) y = np.zeros(wl.size) for i, c in enumerate(coef): y += c * x**i return y @staticmethod def _psd(f: np.ndarray, w0: float, f0: float) -> np.ndarray: """ Calculate the power spectral density. Parameters ---------- f : np.ndarray Frequency array. w0 : float Parameter for PSD calculation. f0 : float Parameter for PSD calculation. Returns ------- np.ndarray Calculated power spectral density. """ psd = np.zeros_like(f) psd[1:] = w0 * np.sqrt((f0 / f[1:]) ** 2 + 1) return psd @staticmethod def _noise_generator( f: np.ndarray, psd: np.ndarray, out=None ) -> np.ndarray: """ Generate noise based on provided frequency and power spectral density. Parameters ---------- f : np.ndarray Array of frequencies. psd : np.ndarray Power spectral density corresponding to the frequencies. out : object, optional Output object to write information, defaults to None. Returns ------- np.ndarray Generated noise array. """ # Randomize amplitude noise = RunConfig.random_generator.normal( 0, psd * np.sqrt(f[-1]), psd.size ) if out is not None: out.write_list("gain amplitude random_seed", RunConfig.random_seed) # Randomize phase phi = RunConfig.random_generator.normal(-0.5, 1, psd.size) * np.pi if out is not None: out.write_list("gain phase random_seed", RunConfig.random_seed) noisef = noise * np.exp(1j * phi) noise = np.fft.irfft(noisef) * np.sqrt(2 * noisef.size - 1) return noise