import gc
import os
import re
from copy import deepcopy
import astropy.units as u
import h5py
import numpy as np
from scipy.interpolate import RectBivariateSpline, interp1d
import exosim.tasks.instrument.loadPsf as loadPsf
from exosim.utils.aperture import find_rectangular_aperture
[docs]class LoadPsfPaos(loadPsf.LoadPsf):
"""
It loads the PSFs from a PAOS file.
Each PAOS file contains a PSF vs wavelength.
The task loads the PSF cube provided by the `filename` parameter.
The PSF are then interpolated over a grid matching the one used to produce the focal planes,
to convert them into the physical units.
Then the total volume of the interpolated PSF is rescaled to the total volume of the original one.
This allow to take into account for loss in the transmission due to the optical path.
The PSF are then interpolated over a wavelength grid matching the one used to for the focal plane, producing the cube.
Then, the PSF cube is repeated on the temporal axis, because no temporal variation is considered in this Task.
Returns
-------
:class:`~numpy.ndarray`
cube of psfs. axis=0 is time, axis=1 is wavelength, axis=2 is spatial direction, axis=3 is spectral direction.
"""
[docs] def model(self, filename, parameters, wavelength, time):
self.debug("loading PAOS psf")
if (
"psf" in parameters
and "time_dependence" in parameters["psf"].keys()
):
timeDependence = parameters["psf"]["time_dependence"]
else:
timeDependence = True
if "oversampling" in parameters["detector"].keys():
oversampling = parameters["detector"]["oversampling"]
else:
oversampling = 1
delta_pix = parameters["detector"]["delta_pix"] / oversampling
with h5py.File(os.path.expanduser(filename), "r") as data:
# find the sampled wl
temp = re.compile(r"\d+(?:\.\d*)")
wl_sampled_h5 = [ele for ele in data.keys() if temp.match(ele)]
# load psf interpolated into the window
psf_cube = [
self.load_imac(
wl,
data,
parameters,
oversampling,
delta_pix,
)
for wl in wl_sampled_h5
]
wl_sampled = [
self.load_wl_sampled(wl, data) for wl in wl_sampled_h5
]
wl_sampled = np.array(wl_sampled) * u.m
psf_cube = np.array(psf_cube)
# wl_interpolate to the output wl grid
psf_out = self.wl_interpolate(
psf_cube, wavelength, wl_sampled.to(u.um)
)
del psf_cube
gc.collect()
# repeat over time
psf_cube_out = psf_out[np.newaxis, ...]
del psf_out
gc.collect()
if timeDependence:
psf_cube_out = np.repeat(psf_cube_out, time.size, axis=0)
return psf_cube_out
@staticmethod
[docs] def load_wl_sampled(wl, data):
"""
extract the wavelength from the data stored in the surfaces
Parameters
----------
wl: str
wavelength
data: :class:`h5py.File`
opened HDF5 file
Returns
-------
float:
wavelength
"""
data_wl_surfaces = list(data[wl].keys())
data_wl_surfaces.sort()
data_cube = data[wl][data_wl_surfaces[-1]]
return float(data_cube["wl"][()])
[docs] def load_imac(self, wl, data, parameters, oversampling, delta_pix):
"""
Returns the PSF interpolated to the detector pixel size.
The PSF is normalized to the initial volume after the interpolation.
Parameters
----------
wl: str
wavelength
data: :class:`h5py.File`
opened HDF5 file
parameters: dict
detector description
oversampling: int
oversampling factor
delta_pix: float or :class:`astropy.units.Quantity`
sub-pixel size
Returns
-------
:class:`numpy.ndarray`
PSF scaled to physical size
"""
data_wl_surfaces = list(data[wl].keys())
data_wl_surfaces.sort()
data_cube = data[wl][data_wl_surfaces[-1]]
# load data
dx = data_cube["dy"][()] * u.m
dy = data_cube["dx"][()] * u.m
ima = data_cube["amplitude"][()] * data_cube["amplitude"][()]
# ima = ima.T
vol = ima.sum()
# scale the psf image
paos_x, paos_y = np.arange(0, ima.shape[0]) * dx.to(u.um), np.arange(
0, ima.shape[1]
) * dy.to(u.um)
size = min(
parameters["detector"]["spatial_pix"],
parameters["detector"]["spectral_pix"],
)
spatial_size = size * oversampling
spectral_size = size * oversampling
self.debug("windows sizes: {}, {}".format(spatial_size, spectral_size))
exosim_x = np.arange(0, spatial_size - 1) * delta_pix.to(u.um)
exosim_y = np.arange(0, spectral_size - 1) * delta_pix.to(u.um)
exosim_x -= (
exosim_x[int(spatial_size // 2)] - paos_x[ima.shape[0] // 2]
)
exosim_y -= (
exosim_y[int(spectral_size // 2)] - paos_y[ima.shape[1] // 2]
)
f = RectBivariateSpline(paos_x, paos_y, ima, kx=1, ky=1)
imac = f(exosim_x.value, exosim_y.value)
imac -= imac.min()
# normalise
norm = imac.sum()
imac /= norm
imac *= vol
return np.array(imac)
[docs] def crop_image_stack(self, psf_out, ene=0.99, time_iteration=False):
"""
It crops the image stack.
It takes the PSF for the longest wavelength, as it is expected to be the largest PSF,
and then selects the smallest aperture which collect at least the desired ene,
using :func:`~exosim.utils.psf.find_rectangular_aperture`.
Then it remove these area from all the image in the psf data cube.
Parameters
----------
psf_out: :class:`numpy.ndarray`
output PSF cube
ene: float
encircled energy desired. Default is 99%.
time_iteration: bool
if time iteration is True then it perform
Returns
-------
:class:`numpy.ndarray`
PSF cube cropped
"""
h, w = 0, 0
if time_iteration or psf_out.ndims > 3:
self.debug("looking for best aperture: iterating over time")
for t in range(psf_out.shape[0]):
sizes, surf, ene = find_rectangular_aperture(
psf_out[t, -1, :, :], ene
)
h = sizes[0] if sizes[0] > h else h
w = sizes[1] if sizes[1] > w else w
crop_h = (psf_out.shape[2] - h) // 2
crop_w = (psf_out.shape[3] - w) // 2
return psf_out[
:,
:,
int(crop_h) : int(crop_h + h),
int(crop_w) : int(crop_w + w),
]
else:
(h, w), surf, ene = find_rectangular_aperture(
psf_out[-1, :, :], ene
)
crop_h = (psf_out.shape[1] - h) // 2
crop_w = (psf_out.shape[2] - w) // 2
return psf_out[
:, int(crop_h) : int(crop_h + h), int(crop_w) : int(crop_w + w)
]
@staticmethod
[docs] # @jit(nopython=True, parallel=True)
def wl_interpolate(psf_cube, wl, wl_sampled):
"""
This function interpolates the PSF to the desired wavelength grid
Parameters
----------
psf_out: :class:`numpy.ndarray`
output PSF cube
psf_cube: :class:`numpy.ndarray`
input PSF cube
wl: :class:`numpy.ndarray`
desired wavelength grid
wl_sampled: :class:`numpy.ndarray`
input wavelength grid
Returns
-------
:class:`numpy.ndarray`
output PSF cube
"""
# take into account different ordering
if wl_sampled.size > 1:
wl_unsorted = deepcopy(wl)
wl.sort()
psf_cube_resh = psf_cube.reshape(
psf_cube.shape[0], psf_cube.shape[1] * psf_cube.shape[2]
)
f = interp1d(
wl_sampled,
psf_cube_resh,
axis=0,
bounds_error=False,
fill_value="extrapolate",
)
psf_out = f(wl)
psf_out = psf_out.reshape(
wl.size, psf_cube.shape[1], psf_cube.shape[2]
)
idx = [int(np.where(wl == wl_)[0][0]) for wl_ in wl_unsorted]
psf_out = psf_out[idx, ...]
else:
psf_out = np.repeat(psf_cube, wl.size, axis=0)
# psf_out[k, ...] /= psf_out[k, ...].sum()
return psf_out