Source code for exosim.tasks.subexposures.addForegrounds

from copy import deepcopy

import astropy.units as u
import numpy as np

from exosim.tasks.task import Task
from exosim.utils.iterators import iterate_over_chunks, searchsorted


[docs]class AddForegrounds(Task): """ It adds the foregrounds the sub exposures. Returns -------- :class:`~exosim.models.signal.Counts` sub-exposure cached signal class """ def __init__(self): """ Parameters ---------- subexposures: :class:`~exosim.models.signal.Counts` sub-exposures cached signal class frg_focal_plane: :class:`~exosim.models.signal.Signal` channel foreground focal plane integration_time: :class:`~astropy.units.Quantity` sub-exposures integration times """ self.add_task_param( "subexposures", "sub-exposures cached signal class" ) self.add_task_param( "frg_focal_plane", "channel foreground focal plane", ) self.add_task_param( "integration_time", "sub-exposures integration times", )
[docs] def execute(self): self.info("adding foregrounds") subexposures = self.get_task_param("subexposures") frg_focal_plane = self.get_task_param("frg_focal_plane") integration_time = self.get_task_param("integration_time") osf = frg_focal_plane.metadata["oversampling"] ndrs_time = (subexposures.time * subexposures.time_units).to(u.hr) frg = deepcopy(frg_focal_plane.data.astype(np.float64)) frg_time = (frg_focal_plane.time * frg_focal_plane.time_units).to(u.hr) index = searchsorted(frg_time, ndrs_time) self.debug("time indexes: {}".format(index)) # bin down the frg to the subexposure size (integrate over pixels) frg = frg.reshape( (frg.shape[0], int(frg.shape[1] / osf), osf, frg.shape[2]) ).sum(axis=2) frg = frg.reshape( ( frg.shape[0], frg.shape[1], int(frg.shape[2] / osf), osf, ) ).sum(axis=3) for chunk in iterate_over_chunks( subexposures.dataset, desc="adding foregrounds" ): subexposures.dataset[chunk] = self.add_frg( subexposures.dataset[chunk], frg, index[chunk[0]], integration_time[chunk[0]].to(u.s).value, ) subexposures.output.flush() subexposures.metadata["frg_focal_plane_time_indexes"] = index self.set_output(subexposures)
@staticmethod
[docs] def add_frg(ndrs, frg, index, integration_time): for t in range(ndrs.shape[0]): ndrs[t] = ndrs[t] + frg[index[t]] * integration_time[t] return ndrs