Source code for exosim.output.hdf5.utils

from typing import Union

import astropy.units as u
import h5py
from astropy.io.misc.hdf5 import read_table_hdf5
from h5py import Dataset, Datatype, Group

import exosim.models.signal as signal


[docs]def load_signal(input: h5py.File) -> signal.Signal: """ It loads the appropriate :class:`~exosim.models.signal.Signal` class from an opened hdf5 file. Parameters ---------- input: :class:`h5py.File` opened hdf5 file Returns ------- :class:`~exosim.models.signal.Signal` Raises ------- IOError if the loaded Dataset is a Cached :class:`~exosim.models.signal.Signal`. """ metadata = {} if "metadata" in input.keys(): metadata = recursively_read_dict_contents(input["metadata"]) unit = u.Unit(input["data_units"][()].decode("utf-8")) spectral = ( input["spectral"] * u.Unit(input["spectral_units"][()].decode("utf-8")) if "spectral" in input.keys() else None ) time = ( input["time"] * u.Unit(input["time_units"][()].decode("utf-8")) if "time" in input.keys() else None ) spatial = ( input["spatial"] * u.Unit(input["spatial_units"][()].decode("utf-8")) if "spatial" in input.keys() else None ) # if 'datatype' in input.keys(): # class_name = input['datatype'][()].decode("utf-8").replace("<class '", # "").replace( # "'>", "") # klass = locate(class_name) # given the unit, check the right class to instantiate if unit == u.W / u.m**2 / u.um: klass = signal.Sed elif unit == u.W / u.m**2 / u.um / u.sr: klass = signal.Radiance elif unit == u.ct / u.s: klass = signal.CountsPerSecond elif unit == u.ct: klass = signal.Counts elif unit == u.adu: klass = signal.Adu elif unit == u.Unit(""): klass = signal.Dimensionless else: klass = signal.Signal if not input["cached"][()]: return klass( spectral=spectral, data=input["data"][()], data_units=unit, time=time, spatial=spatial, metadata=metadata, ) else: raise OSError("impossible to load a cached dataset in a Signal class")
[docs]def recursively_read_dict_contents(input_dict: dict) -> dict: """ Will recursive read a dictionary, initializing quantities and table from a dictionary read from an hdf5 file. Parameters ---------- input_dict : dict dictionary read from hdf5 Returns -------- dict Dictionary we want to use """ new_keys = [k for k in input_dict.keys()] output_dict = {} if all(elem in new_keys for elem in ["value", "unit"]): output_dict = input_dict["value"][()] * u.Unit( input_dict["unit"][()].decode("utf-8") ) return output_dict elif any(".__table_column_meta__" in elem for elem in new_keys): table_keys = [ elem for elem in new_keys if ".__table_column_meta__" in elem ] table_keys = (elem.split(".")[0] for elem in table_keys) for k in table_keys: table = read_table_hdf5(input_dict, k) output_dict[k] = table for key in new_keys: if isinstance(input_dict[key], h5py.Group): output_dict[key] = recursively_read_dict_contents(input_dict[key]) elif isinstance(input_dict[key], h5py.Dataset): try: output_dict[key] = input_dict[key][()].decode("utf-8") except AttributeError: output_dict[key] = input_dict[key][()] return output_dict
[docs]def copy_file( in_object: Union[Group, Dataset], out_object: Union[Group, Dataset] ) -> None: """ Recursively copy an HDF5 tree structure from one file to another. This function traverses the hierarchy of the input HDF5 object (`in_object`) and replicates it in the output HDF5 object (`out_object`). It can copy both groups and datasets, and also replicates all attributes. Parameters ---------- in_object : Union[h5py.Group, h5py.Dataset] The input HDF5 object (either root, a subgroup, or a dataset). out_object : Union[h5py.Group, h5py.Dataset] The output HDF5 object (either root, a subgroup, or a dataset). Raises ------ ValueError If an invalid object type is encountered. """ # Copy attributes only once per group or dataset, to avoid overwriting for key, value in in_object.attrs.items(): out_object.attrs[key] = value # Check if the input object is a dataset and skip further copying (base case for recursion) if isinstance(in_object, Dataset): return # Iterate through each key-value pair in the input object for key, in_obj in in_object.items(): # Skip HDF5 Datatype objects if not isinstance(in_obj, Datatype): # Copy group if isinstance(in_obj, Group): out_obj = out_object.create_group(key) copy_file(in_obj, out_obj) # Recursive call # Copy dataset elif isinstance(in_obj, Dataset): out_object.create_dataset(key, data=in_obj) # Raise an exception for unknown object types else: raise ValueError(f"Invalid object type {type(in_obj)}")