import gc
import astropy.units as u
import numpy as np
import exosim.tasks.instrument as instrument
from exosim.models.signal import Signal
from exosim.tasks.task import Task
from exosim.utils.klass_factory import find_task
[docs]class CreateFocalPlaneArray(Task):
"""
It produces the focal plane array
Returns
-------
:class:`~exosim.models.signal.Signal`
focal plane array (no time evolution)
"""
def __init__(self):
"""
Parameters
__________
parameters: dict
channel parameter dictionary. This is usually parsed from :class:`~exosim.tasks.load.loadOptions.LoadOptions`
efficiency: :class:`~exosim.models.signal.Dimensionless`
channel efficiency
"""
self.add_task_param("parameters", "channel parameters dict")
self.add_task_param("efficiency", "channel efficiency")
[docs] def execute(self):
parameters = self.get_task_param("parameters")
focal_plane_dimension = (
parameters["detector"]["spatial_pix"],
parameters["detector"]["spectral_pix"],
)
self.debug("focal plane dimensions: {}".format(focal_plane_dimension))
if "oversampling" in parameters["detector"]:
if "irf_task" in parameters["detector"]:
oversampling = parameters["detector"]["oversampling"]
else:
self.warning(
"The oversampling factor is set but irf_task is not provided in detector configuration. Oversampling factor is set to 1."
)
oversampling = 1
else:
oversampling = 1
self.debug("pixel over sampling factor: {}".format(oversampling))
focal_plane_dimension = tuple(
[d * oversampling for d in focal_plane_dimension]
)
focal_plane_array = np.zeros(focal_plane_dimension)
focal_plane_delta = parameters["detector"]["delta_pix"] / oversampling
if parameters["type"].lower() == "spectrometer":
# get wavelength solution
wl_solution_task = find_task(
parameters["wl_solution"]["wl_solution_task"],
instrument.LoadWavelengthSolution,
)
wl_instance = wl_solution_task()
wl_solution = wl_instance(parameters=parameters)
# estimate spectral dispersion law
spectral_pix_osr = (
np.arange(focal_plane_array.shape[1]) * focal_plane_delta
).to(u.um)
spectral_wav_osr = self._wav_osr(
wl_solution, "spectral", parameters, spectral_pix_osr
)
# estimate spatial dispersion law
spatial_pix_osr = (
np.arange(focal_plane_array.shape[0]) * focal_plane_delta
).to(u.um)
spatial_wav_osr = self._wav_osr(
wl_solution, "spatial", parameters, spatial_pix_osr
)
elif parameters["type"].lower() == "photometer":
efficiency = self.get_task_param("efficiency")
# if we select efficiency > eff_max/e we underestimate the
# total flux excluding to many efficiency data
idx = np.where(
efficiency.data[0, 0] > 0
) # efficiency.data[0, 0].max() / np.e)
x_wav_osr = (
np.linspace(
efficiency.spectral[idx].min().item(),
efficiency.spectral[idx].max().item(),
32 * oversampling,
)
* efficiency.spectral_units
)
# 32 is the minimum number of data we want to compute the
# derivative. For less data points we have a wavelength
# dependent effect if photometers efficiency
# x_wav_center = (efficiency.spectral[idx] *
# efficiency.data[0, 0, idx]).sum() / \
# efficiency.data[0, 0, idx].sum()
spectral_wav_osr = x_wav_osr
spatial_wav_osr = x_wav_osr
focal_plane_array = Signal(
spectral=spectral_wav_osr,
spatial=spatial_wav_osr,
data=focal_plane_array,
metadata={
"focal_plane_delta": focal_plane_delta,
"oversampling": oversampling,
},
)
try:
focal_plane_array.metadata["wl_min"] = parameters["wl_min"]
focal_plane_array.metadata["wl_max"] = parameters["wl_max"]
except KeyError:
pass
self.set_output(focal_plane_array)
del focal_plane_array
gc.collect()
def _wav_osr(self, wl_solution, key, parameters, pix_osr):
if wl_solution[key].data == np.zeros_like(wl_solution["wavelength"]):
# wavelength on each x pixel
wav_osr = np.zeros(pix_osr.size) * u.um
else:
# estimate dispersion law
par = np.polyfit(
wl_solution[key].to(u.um).value,
wl_solution["wavelength"].to(u.um).value,
2,
)
spatial_dispersion_law = np.poly1d(par)
# estimate center
if any("center" in k for k in parameters["wl_solution"].keys()):
pix_osr = self._centering(
parameters, wl_solution, pix_osr, key
)
# walength on each x pixel
wav_osr = spatial_dispersion_law(pix_osr.to(u.um).value) * u.um
self.debug("{} wavelength solution: {}".format(key, wav_osr))
return wav_osr
def _centering(
self, parameters, wl_solution, spectral_pix_osr, key="spectral"
):
"""
Shift the pixel array. If "auto" it sets the central wavelength of
the channel in the center of the pixel array. If a wavelength is indicated,
it centers the wl solution on that wavelength.
Else, it shifts the pixel array by the indicated number of pixels.
"""
if "center" in parameters["wl_solution"].keys():
if parameters["wl_solution"]["center"] == "auto":
par = np.polyfit(
wl_solution["wavelength"].to(u.um).value,
wl_solution[key].to(u.um).value,
2,
)
spectral_dispersion_law_inv = np.poly1d(par)
first_pixel = (
spectral_dispersion_law_inv(
parameters["wl_min"].to(u.um).value
)
* u.um
)
last_pixel = (
spectral_dispersion_law_inv(
parameters["wl_max"].to(u.um).value
)
* u.um
)
spectral_pix_osr_center = (first_pixel + last_pixel) / 2
# spectral_pix_osr_center = spectral_dispersion_law_inv(
# central_wl.to(u.um).value) * u.um
offset = (
spectral_pix_osr[0] + spectral_pix_osr[-1]
) / 2 - spectral_pix_osr_center
self.debug(
"wl solution auto-centering mode. offset {}".format(offset)
)
spectral_pix_osr -= offset
elif isinstance(parameters["wl_solution"]["center"], u.Quantity):
par = np.polyfit(
wl_solution["wavelength"].to(u.um).value,
wl_solution[key].to(u.um).value,
2,
)
spectral_dispersion_law_inv = np.poly1d(par)
central_pixel = (
spectral_dispersion_law_inv(
parameters["wl_solution"]["center"].to(u.um).value
)
* u.um
)
offset = (
spectral_pix_osr[0] + spectral_pix_osr[-1]
) / 2 - central_pixel
self.debug(
"wl solution auto-centering mode. offset {}".format(offset)
)
spectral_pix_osr -= offset
else:
center = parameters["wl_solution"]["{}_center".format(key)]
self.debug(
"wl solution manual-centering mode. offset {}".format(center)
)
spectral_pix_osr -= center
return spectral_pix_osr