Source code for exosim.models.utils.cachedData

import os
import tempfile
import uuid
from pathlib import Path
from typing import Union

import h5py
import numpy as np

import exosim.log as log
from exosim.utils import RunConfig
from exosim.utils.types import HDF5OutputType


[docs]class CachedData(log.Logger): """ This class caches data cube into an h5 file. The cube data are chunked toward the first axis. In this class are also defined a set of operation to operate on the dataset using the chinks system. Attributes ---------- axis0: int first axis size axis1: int second axis size axis2: int third axis size output: str or :class:`~exosim.output.HDF5Output` name of the file used for caching. dataset_name: str name used to store the dataset into the h5 file. output: :class:`h5py.File` or :class:`~exosim.output.hdf5.hdf5.HDF5Output` or :class:`~exosim.output.hdf5.hdf5.HDF5OutputGroup` h5py open file used for caching dataset_path: str path where is stored the dataset inside the output file. chunked_dataset: :class:`h5py.Dataset` h5py dataset used to store the data Notes ----- The cached data may be stored in a temporary file. To delete temporary files we included a garbage collector. Please, remember to delete the class when done as in the following example >>> myClass = CachedData(1,1,1) >>> del myClass """ def __init__( self, axis0: int, axis1: int, axis2: int, output: Union[str, "HDF5Output", "HDF5OutputGroup"] = None, # type: ignore # noqa: F821 output_path: str = None, dataset_name: str = None, dtype: np.dtype = np.float64, ) -> None: """ Parameters ---------- axis0: int first axis size axis1: int second axis size axis2: int third axis size output: str or :class:`~exosim.output.hdf5.hdf5.HDF5Output` or :class:`~exosim.output.hdf5.hdf5.HDF5OutputGroup` file name to use for caching. If `None` a temporary file will be generated. Default is `None`. output_path: str (optional) path where to store the dataset inside the output file. Default is `None`. dataset_name: str name to use to store the dataset into the h5 file. If `None` a random name will be generated. Default is `None`. """ self.set_log_name() self.axis0 = axis0 self.axis1 = axis1 self.axis2 = axis2 self.output = None # we want to get a name for the dataset to store it in the h5 file if dataset_name is None: dataset_name = str(uuid.uuid4())[:8] self.dataset_name = dataset_name self.debug("data will be stored as: {}".format(self.dataset_name)) # if a file name is given, we use that as output, otherwise we use a temporary file self.tmp = False if output: # if is a string or an already existing temporary file, we open it as an h5py if isinstance(output, str): self.output = h5py.File(output, "a", rdcc_w0=1) self.fname = output elif isinstance(output, tempfile._TemporaryFileWrapper): self.output = h5py.File(output, "a", rdcc_w0=1) self.fname = output self.tmp = True # if it is an Output class, we use it else: from exosim.output import HDF5Output, HDF5OutputGroup if isinstance(output, HDF5Output): if not output._cache: self.error("output file not set for caching") raise OSError("output file not set for caching") self.fname = output.filename self.output = output.fd elif isinstance(output, HDF5OutputGroup): if not output._cache: self.error("output file not set for caching") raise OSError("output file not set for caching") self.fname = output.filename self.output = output._entry else: self.error("unsupported output format") raise OSError("unsupported output format") self.debug("data stored in: {}".format(self.fname)) else: path = Path(os.path.join(os.getcwd(), "tmp")) tempfile.tempdir = path tempfile.tempdir.mkdir(parents=True, exist_ok=True) self.fname = tempfile.NamedTemporaryFile( suffix=".h5", delete=False ) self.output = h5py.File(self.fname, "a", overrite=True, rdcc_w0=1) self.tmp = True self.debug("temporary file created: {}".format(self.fname.name)) # we define the address of the dataset inside the file self.output_path = output_path if self.output_path: self.dataset_path = os.path.join(output_path, self.dataset_name) else: self.dataset_path = self.dataset_name + "/data" # chunk size fixed to 2Mb mem_size = RunConfig.chunk_size * 1e6 # convert to Mbs image_size = axis1 * axis2 * np.dtype(dtype).itemsize axis0_chunk = ( int(mem_size // image_size) if int(mem_size // image_size) < axis0 else axis0 ) # we create an empty dataset (full of zeros) of a given shape, chunked of the first axis self.chunked_dataset = self.output.create_dataset( self.dataset_path, shape=(axis0, axis1, axis2), chunks=(axis0_chunk, axis1, axis2), dtype=dtype, compression=None, )
[docs] def rename_dataset(self, new_name: str) -> None: """It renames the dataset in the HDF5 file. Parameters ----------- new_name: str new name for the dataset """ self.output[new_name] = self.output[self.dataset_name] del self.output[self.dataset_name] self.dataset_name = new_name
# to perform any operation, we iterate over the chunks built on the first axis # def _get_val_in_slice(self, other, i): # """Returs the values contained in chunk built on the first axis.""" # if hasattr(other, 'dataset'): # return other.dataset[i, :, :] # elif hasattr(other, 'shape'): # return other[i, :, :] # else: # return np.ones(self.dataset[i, :, :].shape) * other # # def _create_new_instance(self): # return CachedData(*self.dataset.shape, output=self.fname, # output_path=self.output_path) # # @property # def dataset(self): # """h5py dataset used to store the data""" # return self._dataset # # @dataset.setter # def dataset(self, other): # """ This setter implements the slicing/caching system""" # self._dataset = other # self.output.flush() # # @dataset.deleter # def dataset(self): # """It empties the dataset""" # del self._dataset def __del__(self): """Garbage collector""" # close the file # try to flush and close the output file if self.output and isinstance(self.output, h5py.File): if hasattr(self.output, "flush"): try: self.output.flush() except ValueError: pass if hasattr(self.output, "close"): self.output.close() if self.tmp: # remove temp file if exist try: os.remove(self.fname) self.debug("file deleted: {}".format(self.fname)) except TypeError: os.remove(self.fname.name) self.debug( "temporary file deleted: {}".format(self.fname.name) ) except FileNotFoundError: pass # check if the file has been correctly removed try: if Path(self.fname).is_file(): self.warning("file not deleted: {}".format(self.fname)) except TypeError: if Path(self.fname.name).is_file(): self.warning( "file not deleted: {}".format(self.fname.name) ) # if temp dir is empty, delete it try: if not any(tempfile.tempdir.iterdir()): os.rmdir(tempfile.tempdir) self.debug( "temporary dir deleted: {}".format(tempfile.tempdir) ) except FileNotFoundError: pass except AttributeError: pass