Source code for exosim.tasks.instrument.loadPsfPaosTimeInterp

import gc
import os
import re
from copy import deepcopy

import astropy.units as u
import h5py
import numpy as np

import exosim.tasks.instrument.loadPsfPaos as loadPsfPaos


[docs]class LoadPsfPaosTimeInterp(loadPsfPaos.LoadPsfPaos): """ It loads the PSFs from two PAOS files and interpolates over time between them according to the given parameters. Each PAOS file contains a PSF vs wavelength. The task loads the PSF cubes provided by the `filename` parameter. The PSFs are then interpolated over a grid matching the one used to produce the focal planes, to convert them into the physical units. Then the total volume of the interpolated PSF is rescaled to the total volume of the original one. This allow to take into account for loss in the transmission due to the optical path. The PSF are then interpolated over a wavelength grid matching the one used to for the focal plane, producing the cube. Then an example of time dependency is implemented in the model method. The PSF cube is then cropped to removed unused edges, where the signal is less than 1/100 of the PSF peak. This would fasten up the successive `ExoSim` step. Returns ------- :class:`~numpy.ndarray` cube of psfs. axis=0 is time, axis=1 is wavelength, axis=2 is spatial direction, axis=3 is spectral direction. Note ----- `filename` must be a string of filenames separated by a comma and a space. To write your version, define a new class that inherits from this one in a dedicated python file. Then copy the `model` method and replace the indicated section (where `REPLACE THIS WITH YOUR MODE` is written). """
[docs] def model(self, filename, parameters, wavelength, time): self.debug("loading PAOS psf with time dependency") psfs = self.load_psfs(filename, wavelength, parameters) # prepare output ipercube psf_out = np.empty( (time.size, wavelength.size, psfs[0].shape[1], psfs[0].shape[2]) ) ################################ # time evolution example. # REPLACE THIS WITH YOUR MODEL t0 = parameters["psf"]["t0"] for i, t in enumerate(time): psf_out[i] = (t / t0) * psfs[0] + (1 - t / t0) * psfs[1] ################################ # clean memory del psfs gc.collect() # crop image stack psf_out_cropped = self.crop_image_stack(psf_out, time_iteration=True) # re-normalize output psf_out_cropped = self.normalise(psf_out, psf_out_cropped) return psf_out_cropped
@staticmethod
[docs] def normalise(psf_out, psf_out_cropped): old_norms = psf_out.sum(axis=-1).sum(axis=-1) new_norms = psf_out_cropped.sum(axis=-1).sum(axis=-1) dim_array = np.ones((1, psf_out.ndim), int).ravel() dim_array[0] = psf_out.shape[0] dim_array[1] = psf_out.shape[1] del psf_out # clean memory gc.collect() old_norms_reshaped = old_norms.reshape(dim_array) new_norms_reshaped = new_norms.reshape(dim_array) psf_out_cropped_norm_ = psf_out_cropped / new_norms_reshaped del psf_out_cropped # clean memory gc.collect() psf_out_cropped = psf_out_cropped_norm_ * old_norms_reshaped return psf_out_cropped
[docs] def load_psfs(self, filename, wavelength, parameters): """ Parameters ---------- parameters: dict dictionary containing the parameters. This is usually parsed from :class:`~exosim.tasks.load.loadOptions.LoadOptions` wavelength: :class:`~astropy.units.Quantity` wavelength grid. filename: str string containing PSF input files separated by a comma Returns ------- list list of psf cubes. axis=0 is wavelength, axis=1 is spatial direction, axis=2 is spectral direction. """ if "oversampling" in parameters["detector"].keys(): oversampling = parameters["detector"]["oversampling"] else: oversampling = 1 delta_pix = parameters["detector"]["delta_pix"] / oversampling filename = filename.split(", ") psfs = [] for file in filename: with h5py.File(os.path.expanduser(file), "r") as data: # find the sampled wl temp = re.compile(r"\d+(?:\.\d*)") wl_sampled_h5 = [ele for ele in data.keys() if temp.match(ele)] psf_cube = [ self.load_imac( wl, data, parameters, oversampling, delta_pix ) for wl in wl_sampled_h5 ] wl_sampled = [ self.load_wl_sampled(wl, data) for wl in wl_sampled_h5 ] wl_sampled = np.array(wl_sampled) * u.m psf_cube = np.array(psf_cube) # wl_interpolate to the output wl grid wl_unsorted = deepcopy(wavelength) wavelength.sort() psf_cube_interp = self.wl_interpolate( psf_cube, wavelength, wl_sampled.to(u.um) ) # reorder the array idx = [int(np.where(wavelength == wl_)[0]) for wl_ in wl_unsorted] psf_cube_interp = psf_cube_interp[idx, ...] psfs.append(psf_cube_interp) return psfs