from collections import OrderedDict
from typing import Tuple
import astropy.units as u
import numpy as np
import exosim.log as log
import exosim.models.signal as signal
import exosim.tasks.instrument as instrument
import exosim.tasks.parse as parse
from exosim.utils.klass_factory import find_and_run_task
from exosim.utils.types import ArrayType, OutputType, ValueType
[docs]class Channel(log.Logger):
"""
It handles the channel gven the description
Attributes
----------
ch_name: str
channel name
path: dict
dictionary of :class:`~exosim.models.signal.Radiance` and :class:`~exosim.models.signal.Dimensionless`,
represeting the radiance and efficiency of the path.
responsivity: :class:`~exosim.models.signal.Signal`
channel responsivity
sources: dict
dictionary containing :class:`~exosim.models.signal.Signal`
time: :class:`~astropy.units.Quantity`
time grid.
parameters: dict
dictionary contained the optical element parameters. This is usually parsed from :class:`~exosim.tasks.load.loadOptions.LoadOptions`
wavelength: :class:`~numpy.ndarray` or :class:`~astropy.units.Quantity`
wavelength grid. If no units are attached is considered as expressed in `um`.
focal_plane: :class:`~exosim.models.signal.Signal`
source focal plane
frg_focal_plane: :class:`~exosim.models.signal.Signal`
foreground focal plane
frg_sub_focal_planes: dict
dictionary of :class:`~exosim.models.signal.Signal`. It contains the sub focal planes produced by the radiances.
This dictionary is produced only if at least one optical surface has ``isolate=True``.
The sum of the sub focal planes returns the frg_focal_plane. If not surface has ``isolate=True``, the dictionary is empty.
output: :class:`~exosim.output.output.Output`
output file
target_source: str
name of the target source
"""
def __init__(
self,
parameters: dict,
wavelength: ArrayType,
time: ArrayType,
output: OutputType = None,
) -> None:
"""
Parameters
__________
parameters: dict
dictionary contained the optical element parameters. This is usually parsed from :class:`~exosim.tasks.load.loadOptions.LoadOptions`
wavelength: :class:`~numpy.ndarray` or :class:`~astropy.units.Quantity`
wavelength grid. If no units are attached is considered as expressed in `um`.
time: :class:`~astropy.units.Quantity`
time grid.
output: :class:`~exosim.output.output.Output` (optional)
output file
"""
self.set_log_name()
self.parameters = parameters
self.wavelength = wavelength
self.time = time
self.ch_name = parameters["value"]
self.output = output.create_group(self.ch_name) if output else None
# init to None
self.path, self.responsivity, self.sources, self.psf = (
None,
None,
None,
None,
)
(
self.focal_plane,
self.bkg_focal_plane,
self.frg_focal_plane,
self.frg_sub_focal_planes,
) = (
None,
None,
None,
None,
)
[docs] def parse_path(self, light_path: OrderedDict) -> dict:
"""
It applies :class:`~exosim.tasks.parse.parsePath.ParsePath`
Parameters
----------
light_path: `~collections.OrderedDict` (optional)
dictionary of contributes
Returns
-------
dict
dictionary of :class:`~exosim.models.signal.Radiance` and :class:`~exosim.models.signal.Dimensionless`,
represeting the radiance and efficiency of the path.
Note
----
The resulting information is also stored in the class under `path` attribute.
"""
parsePath = parse.ParsePath()
self.path = parsePath(
parameters=self.parameters["optical_path"],
wavelength=self.wavelength,
time=self.time,
output=self.output,
light_path=light_path,
group_name="path",
)
self.path["efficiency"].write(output=self.output, name="efficiency")
return self.path
[docs] def estimate_responsivity(self) -> signal.Signal:
"""
It estimates the responsivity using the indicated :class:`~exosim.tasks.instrument.loadResponsivity.LoadResponsivity`
Returns
-------
:class:`~exosim.models.signal.Signal`
channel responsivity
Note
----
The resulting information is also stored in the class under `responsivity` attribute.
"""
responsivity_instance = find_and_run_task(
self.parameters["qe"],
"responsivity_task",
instrument.LoadResponsivity,
)
self.responsivity = responsivity_instance(
parameters=self.parameters,
wavelength=self.wavelength,
time=self.time,
)
self.responsivity.write(output=self.output, name="responsivity")
return self.responsivity
[docs] def propagate_foreground(self) -> dict:
"""
It multiplies each radiance in the path by the solid angle.
Returns
-------
dict
dictionary of :class:`~exosim.models.signal.Radiance` and :class:`~exosim.models.signal.Dimensionless`,
represeting the radiance and efficiency of the path.
Note
----
it updates the `path` attribute of this class
"""
propagateForegrounds = instrument.PropagateForegrounds()
self.path = propagateForegrounds(
light_path=self.path,
parameters=self.parameters,
responsivity=self.responsivity,
)
return self.path
[docs] def propagate_sources(self, sources: OrderedDict, Atel: ValueType) -> dict:
"""
It propagates the sources though the channel,
by applying :class:`~exosim.tasks.instrument.propagateSources.PropagateSources`
Parameters
__________
sources: dict
dictionary containing :class:`~exosim.models.signal.Sed`
Atel: :class:`~astropy.units.Quantity`
effective telescope Area
Returns
-------
dict
dictionary containing :class:`~exosim.models.signal.Signal`
"""
out_sources = {}
for source in sources.keys():
out_sources[source] = signal.Sed(
data=sources[source].data,
spectral=sources[source].spectral,
time=sources[source].time,
metadata=sources[source].metadata,
)
propagateSources = instrument.PropagateSources()
self.sources = propagateSources(
sources=out_sources,
Atel=Atel,
efficiency=self.path["efficiency"],
responsivity=self.responsivity,
)
return self.sources
[docs] def create_focal_planes(self) -> signal.Signal:
"""
It produces the empty focal planes
Returns
-------
:class:`~exosim.models.signal.Signal`
focal plane array (with time evolution)
"""
createFocalPlane = instrument.CreateFocalPlane()
focal_plane = createFocalPlane(
parameters=self.parameters,
efficiency=self.path["efficiency"],
time=self.time,
output=self.output,
group_name="focal_plane",
)
self.focal_plane = focal_plane
self.frg_focal_plane = focal_plane.copy(dataset_name="frg_focal_plane")
if len(self.sources) > 1:
self.bkg_focal_plane = focal_plane.copy(
dataset_name="bkg_focal_plane"
)
return focal_plane
[docs] def rescale_contributions(self) -> None:
"""
It updated the contributions (sources and path)
by rebinning them to the wavelength solution grid
and multipling them by the wl solution gradient
"""
def wl_gradient(x_wav_osr):
d_x_wav_osr = np.zeros_like(x_wav_osr)
idx = np.where(x_wav_osr > 0.0)
d_x_wav_osr[idx] = np.gradient(x_wav_osr[idx])
if np.any(d_x_wav_osr < 0):
d_x_wav_osr *= -1.0
return d_x_wav_osr
d_spectral_wl = (
wl_gradient(self.focal_plane.spectral)
* self.focal_plane.spectral_units
)
# multiply sources by gradient
if self.sources:
for source in self.sources.keys():
self.sources[source].spectral_rebin(self.focal_plane.spectral)
self.sources[source] *= d_spectral_wl
# multiply radiances by gradient
if self.path:
for rad in [k for k in self.path.keys() if "radiance" in k]:
self.path[rad].spectral_rebin(self.focal_plane.spectral)
self.path[rad] *= d_spectral_wl
@property
[docs] def target_source(self):
# TODO test this
if len(self.sources) == 1:
return list(self.sources.keys())[0]
target = [
source
for source, param in self.sources.items()
if "source_target" in param.metadata["parsed_parameters"].keys()
and param.metadata["parsed_parameters"]["source_target"] == True
]
if len(target) > 1:
self.error(
"More than one target source found. Please check your input file."
)
self.debug(f"Target source is {target[0]}")
return target[0]
[docs] def populate_focal_plane(
self, pointing: Tuple[u.Quantity, u.Quantity] = None
) -> signal.Signal:
"""
It populates the empty focal plane with monocromatic PSFs.
Parameters
-----------
pointing: (:class:`astropy.units.Quantity`, :class:`astropy.units.Quantity`) (optional)
telescope pointing direction, expressed ad a tuple of RA and DEC in degrees. Default is ``None``
Returns
-------
:class:`~exosim.models.signal.Signal`
focal plane array populated
"""
# selecting the target source
target = self.target_source
# storing binned target source
sources_out = self.output.create_group("sources")
self.sources[target].write(sources_out, name=target)
# populates the focal plane
populateFocalPlane = instrument.PopulateFocalPlane()
focal_plane, psf = populateFocalPlane(
parameters=self.parameters,
focal_plane=self.focal_plane,
sources={target: self.sources[target]},
pointing=pointing,
output=self.output,
)
self.psf = psf
self.focal_plane = focal_plane
return focal_plane
[docs] def populate_bkg_focal_plane(
self, pointing: Tuple[u.Quantity, u.Quantity] = None
) -> signal.Signal:
"""
It populates the empty background focal plane with monocromatic PSFs for each of the background sources.
Parameters
-----------
pointing: (:class:`astropy.units.Quantity`, :class:`astropy.units.Quantity`) (optional)
telescope pointing direction, expressed ad a tuple of RA and DEC in degrees. Default is ``None``
Returns
-------
:class:`~exosim.models.signal.Signal`
background focal plane array populated
"""
if len(self.sources) > 1:
# TODO test this
# removing target source from source dictionary
target = self.target_source
self.sources.pop(target)
# storing binned sources
sources_out = self.output.create_group("sources")
for source in self.sources.keys():
self.sources[source].write(sources_out, name=source)
# populates the focal plane
populateFocalPlane = instrument.PopulateFocalPlane()
bkg_focal_plane, self.psf = populateFocalPlane(
parameters=self.parameters,
focal_plane=self.bkg_focal_plane,
sources=self.sources,
pointing=pointing,
output=self.output,
psf=self.psf,
)
self.bkg_focal_plane = bkg_focal_plane
return self.bkg_focal_plane
[docs] def apply_irf(self) -> signal.Signal:
"""
It applies the intra pixel response function (IRF) to the focal plane
Returns
-------
:class:`~exosim.models.signal.Signal`
focal plane array
"""
irf_instance = find_and_run_task(
self.parameters["detector"],
"irf_task",
instrument.CreateIntrapixelResponseFunction,
)
self.parameters["psf_shape"] = self.psf.shape[1:]
kernel, delta_kernel = irf_instance(
parameters=self.parameters, output=self.output
)
if "convolution_method" in self.parameters["detector"]:
convolution_method = self.parameters["detector"][
"convolution_method"
]
else:
convolution_method = "fftconvolve"
applyIntraPixelResponseFunction = find_and_run_task(
self.parameters["detector"],
"apply_irf_task",
instrument.ApplyIntraPixelResponseFunction,
)
# apply IRF to target focal plane
focal_plane = applyIntraPixelResponseFunction(
focal_plane=self.focal_plane,
irf_kernel=kernel,
irf_kernel_delta=delta_kernel,
convolution_method=convolution_method,
)
if self.bkg_focal_plane is not None:
# apply IRF to background focal plane
bkg_focal_plane = applyIntraPixelResponseFunction(
focal_plane=self.bkg_focal_plane,
irf_kernel=kernel,
irf_kernel_delta=delta_kernel,
convolution_method=convolution_method,
)
else:
bkg_focal_plane = None
return focal_plane, bkg_focal_plane
[docs] def populate_foreground_focal_plane(
self,
) -> Tuple[signal.Signal, signal.Signal]:
"""
It adds the foreground contribution to the foreground focal plane
Returns
-------
:class:`~exosim.models.signal.Signal`
focal plane array
"""
# populates the focal plane
foregroundToFocalPlane = instrument.ForegroundsToFocalPlane()
frg_focal_plane, sub_focal_planes = foregroundToFocalPlane(
parameters=self.parameters,
focal_plane=self.frg_focal_plane,
path=self.path,
)
self.frg_focal_plane = frg_focal_plane
self.frg_sub_focal_planes = sub_focal_planes
return frg_focal_plane, sub_focal_planes