Source code for exosim.models.signal

from __future__ import annotations

import copy

import astropy.units as u
import numpy as np

import exosim.log as log
import exosim.utils.binning as binning
import exosim.utils.checks as checks
from exosim.models.utils.cachedData import CachedData
from exosim.output.output import Output
from exosim.utils.types import ArrayType, UnitType, ValueType

_invalid_units = "invalid units"


[docs]class Signal(log.Logger): """ This class handles data cubes. The cube axes are time (0), spatial (1) and spectral (2) directions. Attributes ----------- spectral: :class:`~numpy.ndarray` spectral direction grid. data: :class:`~numpy.ndarray` data table. It has 3 axes: 0. time axis, 1. spatial axis, 2. spectral axis. dataset: :class:`h5py.Dataset` If cached mode is enabled, it contains the data table. It has 3 axes: 0. time axis, 1. spatial axis, 2. spectral axis. time: :class:`~numpy.ndarray` time grid. spatial: :class:`~numpy.ndarray` spatial direction grid. spectral_units: str define the spectral direction units. Default is `um` data_units: str define the data units. time_units: str define the time direction units. Default is `hr`. spatial_units: str define the spatial direction units. Default is `um`. cached: bool it tells if cache mode is enabled. output: str or :class:`~exosim.output.hdf5.hdf5.HDF5Output` h5py open file used for caching dataset_name: :class:`h5py.Dataset` h5py dataset used to store the data output_path: str path where is stored the dataset inside the output file. metadata: dict signal metadata attached to the class Notes ----- To understand caching mode, please look to :class:`~exosim.models.utils.cachedData.CachedData` """ def __init__( self, spectral: ArrayType = [0] * u.um, data: ArrayType = None, time: ArrayType = [0] * u.hr, data_units: UnitType = None, spatial: ArrayType = [0] * u.um, shape: tuple[int, int, int] = None, cached: bool = False, output: str = None, output_path: str = None, dataset_name: str = None, metadata: dict = None, dtype: np.dtype = np.float64, ) -> None: """ Parameters ---------- spectral: :class:`~numpy.ndarray` or :class:`~astropy.units.Quantity` (optional) wavelength grid. Must have a single axes. Default is [0] um. If the input data is a not :class:`~astropy.units.Quantity`, it's assumed to be expressed in microns, otherwhise is converted into microns or pixels. data: :class:`~numpy.ndarray` or :class:`~astropy.units.Quantity` data table. It must have 3 axes: 0. time axis, 1. spatial axis, 2. spectral axis. If data is :class:`~astropy.units.Quantity`, data_units are parsed automatically. time: :class:`~numpy.ndarray` or :class:`~astropy.units.Quantity` (optional) time grid. Must have a single axes. Default is [0] hr. spatial: :class:`~numpy.ndarray` or :class:`~astropy.units.Quantity` (optional) spatial direction grid. Must have a single axes. If the input data is a not :class:`~astropy.units.Quantity`, it's assumed to be expressed in micron. Default is [0] um. data_units: str or :class:`~astropy.units.Quantity` (optional) define the data units. If data is :class:`~astropy.units.Quantity`, data_units are parsed automatically. shape: tuple (optional) data shape. If data is None, it's used to create a data table with the desired shape. cached: bool (optional) it enable the cache mode. Default is `False`. output: str or :class:`~exosim.output.hdf5.hdf5.HDF5Output` (optional) h5 file to use for caching. If `None` a temporary file will be generated. Default is `None`. output_path: str (optional) path where to store the dataset inside the output file. Deafault is `None`. dataset_name: str (optional) name to use to store the Dataset into the h5 file named after 'output'. If `None` a random name will be generated. Default is `None`. metadata: dict (optional) dictionary of metadata. dtype: :class:`~numpy.dtype` (optional) data type of the data table. Default is `np.float64`. """ self.set_log_name() try: self.spectral = checks.check_units( spectral, "um", self, force=True ).value self.spectral_units = self._normalize_units(u.um) except u.UnitConversionError: self.spectral = checks.check_units(spectral, "pix", self).value self.spectral_units = self._normalize_units(u.pix) self.time = checks.check_units(time, "hr", self, force=True).value self.time_units = self._normalize_units(u.hr) try: self.spatial = checks.check_units( spatial, "um", self, force=True ).value self.spatial_units = self._normalize_units(u.um) except u.UnitConversionError: self.spatial = checks.check_units(spatial, "pix", self).value self.spatial_units = u.pix # TODO set up the use of a standard Hdf5Output instead of a common file: # we can use the same file use for output instead of creating a new one self.cached = cached self.shape = shape self.output = output self.output_path = output_path self.dataset_name = dataset_name if data_units is not None: self.data_units = self._normalize_units(u.Unit(data_units)) elif hasattr(data, "unit"): self.data_units = self._normalize_units(data.unit) else: self.data_units = self._normalize_units(u.Unit("")) if cached: if shape is not None: self._data = self._prepare_cached_dataset( fname=output, def_name=dataset_name, data=data, shape=shape, dtype=dtype, ) elif data is not None: data = checks.check_units( data, self.data_units, self, force=True ).value self._data = self._prepare_cached_dataset( fname=output, def_name=dataset_name, data=data, shape=data.shape, dtype=data.dtype, ) else: self._data = self._check_data_size(data) self._data = checks.check_units( self._data, self.data_units, self, force=True ).value if metadata: self.metadata = metadata else: self.metadata = {} self.data = self._data.chunked_dataset[:] if cached else self._data self.dataset = self._data.chunked_dataset if cached else None
[docs] def to(self, units: u.Unit) -> None: """ It converts the Signal data into the desired units. Parameters ---------- units: :class:`~astropy.units.Unit` desired unit Examples --------- >>> import numpy as np >>> import astropy.units as u >>> from exosim.models.signal import Signal >>> wl = np.linspace(0.1, 1, 10) * u.m >>> time_grid = np.linspace(1, 5, 10) * u.s >>> data = np.random.random_sample((10, 1, 10))*u.m**2 >>> signal = Signal(spectral=wl, data=data, time=time_grid) >>> signal.to(u.cm**2) """ self.data *= self.data_units.to(units) self.debug("converted data: {}".format(self.data)) self.data_units = units
def _check_data_size(self, data: ArrayType) -> ArrayType: while data.ndim < 3: data = np.expand_dims(data, axis=0) return data def _prepare_cached_dataset( self, fname: str, def_name: str, data: ArrayType, shape: tuple[int, int, int], dtype, ) -> CachedData: # type: ignore # noqa: F821 """ If cached option is enable, this function initializes a :class:`~exosim.modules.utils.cachedData.CachedData` class. """ cached_data = CachedData( axis0=shape[0], axis1=shape[1], axis2=shape[2], output=fname, output_path=self.output_path, dataset_name=def_name, dtype=dtype, ) if data is not None: cached_data.chunked_dataset[:] = data return cached_data @staticmethod def _normalize_units(unit: u.Unit) -> u.Unit: """Normalize units to a canonical form for consistent comparison.""" return u.Unit(unit.to_string(format="console"))
[docs] def spectral_rebin( self, new_wavelength: ArrayType, fill_value: ValueType = 0.0, **kwargs ) -> None: """ It bins the class data over the spectral direction and changes the wavelegth attributes. This method is based on :func:`~exosim.utils.binning.rebin`. Parameters ---------- new_wavelength: :class:`~numpy.ndarray` or :class:`~astropy.units.Quantity` new wavelength grid. If no units are attached is considered expressed in 'um' fill_value: float or :class:`~astropy.units.Quantity` filla value for empry bins. If no units are attached is considered expressed in 'um'. Examples ---------- >>> from exosim.models.signal import Signal >>> import numpy as np >>> import astropy.units as u We first define the initial values: >>> wavelength = np.linspace(0.1, 1, 10) * u.um >>> data = np.ones((10, 1, 10)) >>> time_grid = np.linspace(1, 5, 10) * u.hr >>> signal = Signal(spectral=wavelength, data=data, time=time_grid) >>> print(signal.data.shape) (10,1,10) We now interpolates at a finer wavelength grid: >>> new_wl = np.linspace(0.1, 1, 20) * u.um >>> signal.spectral_rebin(new_wl) >>> print(signal.data.shape) (10,1,20) We now bin down the to a new wavelength grid: >>> signal = Signal(spectral=wavelength, data=data, time=time_grid) >>> new_wl = np.linspace(0.1, 1, 5) * u.um >>> signal.spectral_rebin(new_wl) >>> print(signal.data.shape) (10,1,5) """ new_wavelength_ = checks.check_units( new_wavelength, "um", self, force=True ).value self.data = binning.rebin( new_wavelength_, self.spectral, self.data, axis=2, fill_value=fill_value, **kwargs, ) idx = np.where(np.isnan(self.data)) self.data[idx] = 0.0 self.spectral = new_wavelength_
[docs] def temporal_rebin( self, new_time: ArrayType, fill_value="extrapolate", **kwargs ) -> None: """ It bins the class data over the temporal direction and changes the time attributes. This method is based on :func:`~exosim.utils.binning.rebin`. Parameters ---------- new_time: :class:`~numpy.ndarray` or :class:`~astropy.units.Quantity` new time grid. If no units are attached is considered expressed in 'hr' Examples ---------- >>> from exosim.models.signal import Signal >>> import numpy as np >>> import astropy.units as u We first define the initial values: >>> wavelength = np.linspace(0.1, 1, 10) * u.um >>> data = np.ones((10, 1, 10)) >>> time_grid = np.linspace(1, 5, 10) * u.hr >>> signal = Signal(spectral=wavelength, data=data, time=time_grid) >>> print(signal.data.shape) (10,1,10) We now interpolates at a finer time grid: >>> new_time = np.linspace(1, 5, 20) * u.hr >>> signal.temporal_rebin(new_time) >>> print(signal.data.shape) (20,1,10) We now bin down the to a new wavelength grid: >>> signal = Signal(spectral=wavelength, data=data, time=time_grid) >>> new_time = np.linspace(1, 5, 5) * u.hr >>> signal.temporal_rebin(new_time) >>> print(signal.data.shape) (5,1,10) """ new_time_ = checks.check_units(new_time, "hr", self, force=True).value if self.time.size == 1: # if a rebin is called on a signal with no time dependence, # the data array will be repeated to match the desired size a = copy.deepcopy(self.data[0, :, :]) self.data = np.repeat(a[np.newaxis, :, :], new_time.size, axis=0) else: # if the rebin is called on a signal with time dependence, # the data grid will be rebinned self.data = binning.rebin( new_time_, self.time, self.data, axis=0, fill_value=fill_value, **kwargs, ) idx = np.where(np.isnan(self.data)) self.data[idx] = 0.0 self.time = new_time_
# TODO if cached just rename or move. Do not write it again!
[docs] def write(self, output: Output = None, name: str = None) -> None: """ It writes the Signal class into an :class:`~exosim.output.output.Output`. The signal class is stored as a dictionary. Parameters ---------- output: :class:`~exosim.output.output.Output` container to use to write the class name: str name to use to store Examples -------- >>> from exosim.models.signal import Signal >>> import numpy as np >>> import astropy.units as u We first define the initial values: >>> wavelength = np.linspace(0.1, 1, 10) * u.um >>> data = np.ones((10, 1, 10)) >>> time_grid = np.linspace(1, 5, 10) * u.hr >>> signal = Signal(spectral=wavelength, data=data, time=time_grid) Then we store it in a test output HDF5 file >>> from exosim.output.hdf5.hdf5 import HDF5Output >>> output = os.path.join(test_dir, 'output_test.h5') >>> with HDF5Output(output) as o: >>> signal.write(o, 'test_signal') Inside the file is now stored a dictionary like the one we can obtain by >>> dict(signal) """ if not output: output = self.output if not name: name = self.dataset_name if output is None: self.warning("No output indicated") return to_store = dict(self) if self.cached: to_store.pop("data", None) to_store["datatype"] = str(self.__class__) output.store_dictionary(to_store, group_name=name) self.debug("{} saved".format(name))
def _find_slice( self, start_time: ValueType, end_time: ValueType ) -> tuple[int, int]: """This function finds a data slice given a time interval.""" if isinstance(start_time, u.Quantity): start_time = start_time.to(u.hr) else: self.debug("start time assumed to be expressed in hr") if isinstance(end_time, u.Quantity): end_time = end_time.to(u.hr) else: self.debug("start time assumed to be expressed in hr") start = np.argmin(np.abs(self.time - start_time.value)) stop = np.argmin(np.abs(self.time - end_time.value)) return start, stop
[docs] def get_slice( self, start_time: ValueType, end_time: ValueType ) -> np.ndarray: """ It returnes the data relative to a time slice: Parameters ---------- start_time: float or :class:`~astropy.units.Quantity` if a float is given, it's assume to be expressed in hours. end_time: float or :class:`~astropy.units.Quantity` if a float is given, it's assume to be expressed in hours. Returns -------- :class:`~numpy.ndarray` """ start, stop = self._find_slice(start_time, end_time) return self.data[start:stop, :, :]
[docs] def set_slice( self, start_time: ValueType, end_time: ValueType, data: ArrayType ) -> None: """ It replaces the class data values in a specific a time slice: Parameters ---------- start_time: float or :class:`~astropy.units.Quantity` if a float is given, it's assume to be expressed in hours. end_time: float or :class:`~astropy.units.Quantity` if a float is given, it's assume to be expressed in hours. data: :class:`~numpy.ndarray` data to use as replacement for the existing ones. """ start, end = self._find_slice(start_time, end_time) # if self.cached: # self.cached_data.dataset[start:end, :, :] = data # else: self.data[start:end, :, :] = data
# Here we define the class iterators. These are usefull to write data def __iter__(self): for a in ["data", "time", "spectral", "spatial", "metadata", "cached"]: yield a, self.__getattribute__(a) for a in [ "data_units", "time_units", "spectral_units", "spatial_units", ]: yield a, str(self.__getattribute__(a)) # Here we define the class maths. __numpy_ufunc__ = None # Numpy up to 13.0 __array_ufunc__ = None # Numpy 13.0 and above def _check_other_val_sum(self, other: ArrayType) -> ArrayType: """extracts value and units from the other element and performs the required conversions""" if hasattr(other, "unit"): val = other.to(self.data_units).value elif hasattr(other, "data_units"): try: val = other.data * other.data_units.to(self.data_units) except TypeError: val = other._data * other.data_units.to(self.data_units) elif hasattr(other, "cached_data"): val = other.cached_data # elif hasattr(other, 'data'): # val = other.data else: val = other return val def _check_other_val_product(self, other: ArrayType) -> ArrayType: """extracts value and units from the other element""" if hasattr(other, "unit"): return other.value, other.unit elif hasattr(other, "data_units"): return other.data, other.data_units else: units = u.Unit("") val = other return val, units def _create_new_instance( self, unit: UnitType = None, cached: bool = None, shape: tuple[int, int, int] = None, output: str | Output = None, output_path: str = None, metadata: dict = None, dataset_name: str = None, ) -> Signal: # check if overwrite data if not cached: cached = self.cached if not shape: shape = self.shape if not output: output = self.output if not output_path: output_path = self.output_path if not metadata: metadata = self.metadata # check on cached data # if self.cached: # data = self._data.dataset # else: # data = self.data # if no unit provided use the first class unit if not unit: unit = self.data_units # given the unit, check the right class to instantiate if unit == u.W / u.m**2 / u.um: klass = Sed elif unit == u.W / u.m**2 / u.um / u.sr: klass = Radiance elif unit == u.ct / u.s: klass = CountsPerSecond elif unit == u.ct: klass = Counts elif unit == u.adu: klass = Adu elif unit == u.Unit(""): klass = Dimensionless else: klass = Signal return klass( spectral=self.spectral * self.spectral_units, data=data * unit, time=self.time * self.time_units, spatial=self.spatial * self.spatial_units, cached=cached, shape=shape, dataset_name=dataset_name, output=output, output_path=output_path, metadata=metadata, )
[docs] def copy(self, **kwargs) -> Signal: """ It returns a copy of the class. Parameters ---------- kwargs: dict Dictionary of parameters to overwrite. The paramaters that can be included in the list are `cached`, `metadata`, `dataset_name`, 'output`, `output_path`. Returns ------- :class:`~exosim.models.signal.Signal` Examples ---------- >>> import numpy as np >>> import astropy.units as u >>> from exosim.models.signal import Signal >>> wl = np.linspace(0.1, 1, 10) * u.um >>> data = np.random.random_sample((10, 1, 10)) >>> time_grid = np.linspace(1, 5, 10) * u.hr >>> signal = Signal(spectral=wl, data=data, time=time_grid) >>> signal_new = signal.copy() """ return self._create_new_instance(**kwargs)
def __add__(self, other: ArrayType | Signal | ValueType) -> Signal: val = self._check_other_val_sum(other) out_class = self._create_new_instance() out_class.data = self.data + val if isinstance(out_class.data, CachedData): # type: ignore # noqa: F821 out_class.cached = True return out_class def __radd__(self, other: ArrayType | Signal | ValueType) -> Signal: return self + other def __sub__(self, other): val = self._check_other_val_sum(other) out_class = self._create_new_instance() out_class.data = self.data - val if isinstance(out_class.data, CachedData): # type: ignore # noqa: F821 out_class.cached = True return out_class def __rsub__(self, other: ArrayType | Signal | ValueType) -> Signal: return (-1) * (self - other) def __mul__(self, other: ArrayType | Signal | ValueType) -> Signal: val, units = self._check_other_val_product(other) out_class = self._create_new_instance(unit=self.data_units * units) out_class.data = self.data * val if hasattr(out_class.data, "unit"): out_class.data = out_class.data.value return out_class def __rmul__(self, other: ArrayType | Signal | ValueType) -> Signal: return self * other def __truediv__(self, other: ArrayType | Signal | ValueType) -> Signal: val, units = self._check_other_val_product(other) out_class = self._create_new_instance(unit=self.data_units / units) out_class.data = self.data / val if hasattr(out_class.data, "unit"): out_class.data = out_class.data.value return out_class def __rtruediv__(self, other: ArrayType | Signal | ValueType) -> Signal: val, units = self._check_other_val_product(other) out_class = self._create_new_instance(unit=units / self.data_units) out_class.data = val / self.data if hasattr(out_class.data, "unit"): out_class.data = out_class.data.value return out_class def __floordiv__(self, other: ArrayType | Signal | ValueType) -> Signal: val, units = self._check_other_val_product(other) out_class = self._create_new_instance(unit=self.data_units / units) out_class.data = self.data // val if hasattr(out_class.data, "unit"): out_class.data = out_class.data.value return out_class def __rfloordiv__(self, other: ArrayType | Signal | ValueType) -> Signal: val, units = self._check_other_val_product(other) out_class = self._create_new_instance(unit=units / self.data_units) out_class.data = val // self.data if hasattr(out_class.data, "unit"): out_class.data = out_class.data.value return out_class
[docs]class Sed(Signal): r""" It's a Signal class with data having units of :math:`[W m^{-2} \mu m^{-1}]` """ def __init__( self, spectral: ArrayType = [0] * u.um, data: ArrayType = None, time: ArrayType = [0] * u.hr, spatial: ArrayType = [0] * u.um, *args, **kwargs, ) -> None: self.set_log_name() if hasattr(data, "unit") and self._normalize_units( data.unit ) != self._normalize_units(u.W / u.m**2 / u.um): try: data = data.to(u.W / u.m**2 / u.um) except u.UnitConversionError: self.error(_invalid_units) raise u.UnitsError(_invalid_units) kwargs.pop("data_units", None) super().__init__( spectral, data, time, data_units=u.W / u.m**2 / u.um, spatial=spatial, *args, **kwargs, )
[docs]class Radiance(Signal): r""" It's a Signal class with data having units of :math:`[W m^{-2} \mu m^{-1} sr^{-1}]` """ def __init__( self, spectral: ArrayType = [0] * u.um, data: ArrayType = None, time: ArrayType = [0] * u.hr, spatial: ArrayType = [0] * u.um, *args, **kwargs, ) -> None: self.set_log_name() if hasattr(data, "unit") and self._normalize_units( data.unit ) != self._normalize_units(u.W / u.m**2 / u.um / u.sr): try: data = data.to(u.W / u.m**2 / u.um / u.sr) except u.UnitConversionError: self.error(_invalid_units) raise u.UnitsError(_invalid_units) kwargs.pop("data_units", None) super().__init__( spectral, data, time, data_units=u.W / u.m**2 / u.um / u.sr, spatial=spatial, *args, **kwargs, )
[docs]class CountsPerSecond(Signal): r""" It's a Signal class with data having units of :math:`[ct/s]` """ def __init__( self, spectral: ArrayType = [0] * u.um, data: ArrayType = None, time: ArrayType = [0] * u.hr, spatial: ArrayType = [0] * u.um, *args, **kwargs, ) -> None: self.set_log_name() if hasattr(data, "unit") and self._normalize_units( data.unit ) != self._normalize_units(u.ct / u.s): try: data = data.to(u.ct / u.s) except u.UnitConversionError: self.error(_invalid_units) raise u.UnitsError(_invalid_units) kwargs.pop("data_units", None) super().__init__( spectral, data, time, data_units=u.ct / u.s, spatial=spatial, *args, **kwargs, )
[docs]class Counts(Signal): r""" It's a Signal class with data having units of :math:`[ct]` """ def __init__( self, spectral: ArrayType = [0] * u.um, data: ArrayType = None, time: ArrayType = [0] * u.hr, spatial: ArrayType = [0] * u.um, *args, **kwargs, ) -> None: self.set_log_name() if hasattr(data, "unit") and self._normalize_units( data.unit ) != self._normalize_units(u.ct): try: data = data.to(u.ct) except u.UnitConversionError: self.error(_invalid_units) raise u.UnitsError(_invalid_units) kwargs.pop("data_units", None) super().__init__( spectral, data, time, data_units=u.ct, spatial=spatial, *args, **kwargs, )
[docs]class Adu(Signal): r""" It's a Signal class with data having units of :math:`[adu/s]` """ def __init__( self, spectral: ArrayType = [0] * u.um, data: ArrayType = None, time: ArrayType = [0] * u.hr, spatial: ArrayType = [0] * u.um, *args, **kwargs, ) -> None: self.set_log_name() if hasattr(data, "unit") and self._normalize_units( data.unit ) != self._normalize_units(u.adu): try: data = data.to(u.adu) except u.UnitConversionError: self.error(_invalid_units) raise u.UnitsError(_invalid_units) kwargs.pop("data_units", None) super().__init__( spectral, data, time, data_units=u.adu, spatial=spatial, *args, **kwargs, )
[docs]class Dimensionless(Signal): """ It's a Signal class with data having no units """ def __init__( self, spectral: ArrayType = [0] * u.um, data: ArrayType = None, time: ArrayType = [0] * u.hr, spatial: ArrayType = [0] * u.um, *args, **kwargs, ) -> None: self.set_log_name() if hasattr(data, "unit") and self._normalize_units( data.unit ) != self._normalize_units(u.Unit("")): data.to(u.Unit("")) kwargs.pop("data_units", None) super().__init__( spectral, data, time, u.Unit(""), spatial=spatial, *args, **kwargs )