Source code for exosim.output.hdf5.hdf5

# This code is inspired by the code developed for TauREx 3.1.
# Therefore, we attach here TauREx3.1 license:
#
# BSD 3-Clause License
#
# Copyright (c) 2019, Ahmed F. Al-Refaie, Quentin Changeat, Ingo Waldmann, Giovanna Tinetti
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
#    list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
#    this list of conditions and the following disclaimer in the documentation
#    and/or other materials provided with the distribution.
#
# 3. Neither the names of the copyright holders nor the names of its
#    contributors may be used to endorse or promote products derived from
#    this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import datetime
import os
from typing import List, Union

import astropy.units as u
import h5py
import numpy as np
from astropy.table import QTable, Table, meta

import exosim.output.output as output
from exosim import (
    __author__,
    __branch__,
    __citation__,
    __commit__,
    __copyright__,
    __license__,
    __pkg_name__,
    __title__,
    __url__,
    __version__,
)
from exosim.utils.runConfig import RunConfig

[docs]META_KEY = "__table_column_meta__"
[docs]class HDF5OutputGroup(output.OutputGroup): def __init__( self, entry: h5py.Group, fd: h5py.File, cache: bool = False, filename: str = None, ) -> None: self.set_log_name() self.fd = fd self._entry = entry self._cache = cache self.filename = filename
[docs] def write_array( self, array_name: str, array: np.ndarray, metadata: dict = None ) -> None: if isinstance(array, list): for idx, a in enumerate(array): self.write_array("{}{}".format(array_name, idx), a, metadata) return ds = self._entry.create_dataset( str(array_name), data=array, shape=array.shape, dtype=array.dtype ) ds.attrs["datatype"] = str(array.__class__) if metadata: for k, v in metadata.items(): ds.attrs[k] = v
[docs] def write_table( self, table_name: str, table: Union[Table, QTable], metadata: dict = None, replace: bool = False, ) -> None: if replace: if str(table_name) in self._entry.keys(): del self._entry[str(table_name)] try: del self._entry[str(table_name) + "." + META_KEY] except KeyError: pass try: del self._entry["{}_to_group".format(str(table_name))] except KeyError: pass table = _encode_mixins(table) if any(col.info.dtype.kind == "U" for col in table.itercols()): table = table.copy(copy_data=False) table.convert_unicode_to_bytestring() self._entry.create_dataset(str(table_name), data=table.as_array()) header_yaml = meta.get_yaml_from_table(table) header_encoded = [h.encode("utf-8") for h in header_yaml] self._entry.create_dataset( str(table_name) + "." + META_KEY, data=header_encoded ) tg = self._entry.create_group("{}_to_group".format(str(table_name))) for col in table.keys(): tg_c = tg.create_group(str(col)) tg_c.create_dataset("value", data=table[col]) if table[col].unit is not None: tg_c.create_dataset("unit", data=str(table[col].unit)) tg.attrs["datatype"] = str(table.__class__) if metadata: for k, v in metadata.items(): tg.attrs[k] = v for k, v in table.meta.items(): tg.attrs[k] = v
[docs] def write_scalar( self, scalar_name: str, scalar: float, metadata: dict = None ) -> None: ds = self._entry.create_dataset(str(scalar_name), data=scalar) ds.attrs["datatype"] = str(scalar.__class__) if metadata: for k, v in metadata.items(): ds.attrs[k] = v
[docs] def write_string( self, string_name: str, string: str, metadata: dict = None ) -> None: # h5py new string hanlder support bites. So we convert strings into bites string = string.encode("utf-8") ds = self._entry.create_dataset(str(string_name), data=string) ds.attrs["datatype"] = str(string.__class__) if metadata: for k, v in metadata.items(): ds.attrs[k] = v
[docs] def create_group(self, group_name: str) -> "HDF5OutputGroup": """ it creates an :class:`~exosim.output.hdf5.hdf5.HDF5OutputGroup`. Parameters ---------- group_name: str group name Returns ------- OutputGroup """ entry = None if self._entry: try: entry = self._entry.create_group(str(group_name)) except ValueError: entry = self._entry[str(group_name)] return HDF5OutputGroup( entry, fd=self.fd, cache=self._cache, filename=self.filename )
[docs] def delete_data(self, key): try: del self._entry[key] except KeyError: self.error("key {} not found".format(key)) raise KeyError
[docs] def flush(self) -> None: self.fd.flush()
[docs] def write_string_array( self, string_name: str, string_array: List[str], metadata: dict = None ) -> None: string_array = string_array.astype("S64") ds = self._entry.create_dataset(str(string_name), data=string_array) ds.attrs["datatype"] = str(string_array.__class__) if metadata: for k, v in metadata.items(): ds.attrs[k] = v
[docs] def write_quantity( self, quantity_name: str, quantity: u.Quantity, metadata: dict = None ) -> None: if quantity_name == "value": qg_c = self._entry.create_dataset( "value", data=quantity.value, ) else: qg_c = self._entry.create_group(str(quantity_name)) qg_c.create_dataset( "value", data=quantity.value, ) qg_c.create_dataset("unit", data=str(quantity.unit)) pass qg_c.attrs["datatype"] = str(quantity.__class__) if metadata: for k, v in metadata.items(): qg_c.attrs[k] = v
def _encode_mixins(tbl): from astropy.table import serialize from astropy.utils.data_info import serialize_context_as with serialize_context_as("hdf5"): encode_tbl = serialize.represent_mixins_as_columns(tbl) return encode_tbl
[docs]class HDF5Output(output.Output): def __init__(self, filename, append=False, cache=False): super().__init__() self.filename = filename self._append = append self._cache = cache self.fd = None
[docs] def open(self) -> None: self.fd = self._openFile(self.filename) self.debug("opened {}".format(self.filename))
def _openFile(self, fname: str) -> h5py.File: mode = "w" if self._append: mode = "a" rdcc_w0 = None if self._cache: rdcc_w0 = 1 attrs = { "file_name": fname, "file_time": datetime.datetime.now().isoformat(), "creator": self.__class__.__name__, "HDF5_Version": h5py.version.hdf5_version, "h5py_version": h5py.version.version, "program_name": str(__title__), "package name": str(__pkg_name__), "program_version": str(__version__), "author": str(__author__), "copyright": str(__copyright__), "license": str(__license__), "url": str(__url__), "citation": str(__citation__), "git commit": str(__commit__), "git branch": str(__branch__), } fd = h5py.File(fname, mode=mode, rdcc_w0=rdcc_w0) for key in attrs: fd.attrs[key] = attrs[key] if mode == "w" or "info" not in fd.keys(): try: gd_ = fd["info"] except KeyError: gd_ = fd.create_group("info") gd = HDF5OutputGroup(gd_, fd) gd.store_dictionary(attrs, "exosim") gd.store_dictionary(RunConfig.__dict__(), "RunConfig") fd.flush() return fd
[docs] def add_info(self, attrs: dict, name: str = None) -> None: gd = self.create_group("info") gd.store_dictionary(attrs, name)
[docs] def flush(self) -> None: self.fd.flush()
[docs] def create_group(self, group_name: str) -> "HDF5OutputGroup": """ it creates an :class:`~exosim.output.hdf5.hdf5.HDF5OutputGroup`. Parameters ---------- group_name: str group name Returns ------- OutputGroup """ entry = None if self.fd: try: entry = self.fd.create_group(str(group_name)) except ValueError: entry = self.fd[str(group_name)] return HDF5OutputGroup( entry, cache=self._cache, filename=self.filename, fd=self.fd )
[docs] def store_dictionary( self, dictionary: dict, group_name: str = None ) -> None: """ it stores a full dictionary inside the :class:`HDF5Output`, forcing the flush. Parameters ---------- dictionary: dict data to store group_name: str (optional) group name for the data """ from exosim.output.utils import ( recursively_save_dict_contents_to_output, ) out = self if group_name is not None: out = self.create_group(group_name) recursively_save_dict_contents_to_output(out, dictionary) self.fd.flush()
[docs] def close(self) -> None: if self.fd: self.fd.flush() self.fd.close() self.debug("closed {}".format(self.filename))
[docs] def getsize(self) -> u.Quantity: """It returns the output file size""" size = os.path.getsize(self.filename) * u.b if size.to(u.Tb).value > 1.0: size = size.to(u.Tb) elif size.to(u.Gb).value > 1.0: size = size.to(u.Gb) elif size.to(u.Mb).value > 1.0: size = size.to(u.Mb) return size
[docs] def delete_data(self, key): try: del self.fd[key] except KeyError: self.error("key {} not found".format(key)) raise KeyError