Source code for exosim.tasks.detector.mergeGroups
from tqdm.auto import tqdm
from exosim.models.signal import Adu
from exosim.tasks.task import Task
from exosim.utils.iterators import iterate_over_chunks
[docs]class MergeGroups(Task):
"""
It averages the NDRs of the same group.
"""
def __init__(self):
"""
Parameters
----------
subexposures: :class:`~exosim.models.signal.Counts`
sub-exposures cached signal
n_groups: int
number of groups in the same exposure
n_ndrs: int
number of ndrs to merge in the same group
ndrs: :class:`~exosim.models.signal.Counts`
ndrs cached signal'
"""
self.add_task_param("subexposures", " ")
self.add_task_param("n_groups", " ")
self.add_task_param("n_ndrs", " ")
self.add_task_param("output", "output file", None)
[docs] def execute(self):
self.info("merging groups")
subexposures = self.get_task_param("subexposures")
n_groups = self.get_task_param("n_groups")
n_ndrs = self.get_task_param("n_ndrs")
output = self.get_task_param("output")
merged_ndrs = Adu(
spectral=subexposures.spectral,
time=subexposures.time[0::n_ndrs],
data=None,
spatial=subexposures.spatial,
shape=(
subexposures.dataset.shape[0] // n_ndrs,
subexposures.dataset.shape[1],
subexposures.dataset.shape[2],
),
cached=True,
output=output,
dataset_name="NDRs",
output_path=None,
metadata=subexposures.metadata,
dtype=subexposures.dataset[0, 0, 0].dtype,
)
if n_ndrs > 1:
j = 0
for i in tqdm(range(n_groups), desc="merging groups"):
merged_ndrs.dataset[i] = (
subexposures.dataset[j : j + n_ndrs].sum(axis=0) / n_ndrs
)
j += n_ndrs
merged_ndrs.output.flush()
else:
for chunk in iterate_over_chunks(
subexposures.dataset, desc="merging groups"
):
merged_ndrs.dataset[chunk] = subexposures.dataset[chunk]
self.set_output(merged_ndrs)