Source code for exosim.tasks.load.loadOptions
import os
import xml.etree.ElementTree as ET
from collections import OrderedDict
from astropy import units as u
from astropy.table import Table
from exosim.tasks.task import Task
from exosim.utils.runConfig import RunConfig
[docs]class LoadOptions(Task):
"""
Reads the XML file with payload parameters and returns an object with attributes related to the input data.
Attributes
----------
config_path : str
Configuration path.
Returns
-------
dict
Parsed XML input file.
Raises
------
IOError
If the indicated file is not found or the format is not supported.
Examples
--------
>>> load_options = LoadOptions()
>>> options = load_options(filename='path/to/file.xml')
"""
def __init__(self):
super().__init__() # Explicitly call the parent class constructor
self.add_task_param("filename", "Input option file name")
self.add_task_param(
"config_path", "On-run setting for ConfigPath", None
)
[docs] def execute(self):
self._filename = self.get_task_param("filename")
self.config_path = (
self.get_task_param("config_path") or "__ConfigPath__"
)
RunConfig.config_file_list.append(os.path.abspath(self._filename))
self._check_format()
root = self._get_root()
options_dict = self._parse_xml(root)
self._finalise(options_dict)
options_dict = self._substitute_keywords(options_dict)
options_dict = _clean_dict(options_dict)
self.set_output(options_dict)
def _check_format(self):
if not self._filename.endswith(".xml"):
self.error("Wrong input format: Expected an XML file.")
raise OSError(
"Unsupported file format. Please provide an XML file."
)
def _get_root(self):
try:
self.debug(f"Input option file found: {self._filename}")
return ET.parse(self._filename).getroot()
except FileNotFoundError:
self.error(f"No input option file found: {self._filename}")
raise FileNotFoundError(f"Input file not found: {self._filename}")
def _parse_xml(self, root):
root_dict = {}
for child in root:
child_dict = self._parse_xml(child)
child_dict.update(child.attrib)
value = self._compact_string(child.text)
if value:
value = self._convert_value(value)
if "unit" in child_dict:
unit_name = child_dict.pop("unit")
if unit_name == "dimensionless":
unit_name = ""
value = value * u.Unit(unit_name)
if isinstance(value, str) and "__ConfigPath__" in value:
value = value.replace("__ConfigPath__", self.config_path)
child_dict["value"] = value
if child.tag == "ConfigPath":
self.config_path = value
if child.tag in root_dict:
existing_attr = root_dict[child.tag]
if isinstance(existing_attr, OrderedDict):
existing_attr[value] = child_dict
else:
dtmp = OrderedDict(
[
(existing_attr.get("value"), existing_attr),
(value, child_dict),
]
)
root_dict[child.tag] = dtmp
else:
root_dict[child.tag] = child_dict
if "datafile" in root_dict:
datafile = root_dict["datafile"]["value"].replace(
"__ConfigPath__", self.config_path
)
if not os.path.exists(datafile):
self.error(f"Datafile not found: {datafile}")
raise FileNotFoundError(f"Datafile not found: {datafile}")
try:
root_dict["data"] = self._read_data_table(datafile)
except OSError:
self.error(f"Cannot read input file: {datafile}")
raise OSError("Error reading the input data file.")
return root_dict
def _substitute_keywords(self, root_dict):
"""
Substitutes keywords in the root_dict with their corresponding values.
Parameters
----------
root_dict : dict
The dictionary parsed from the XML.
Returns
-------
dict
Updated dictionary with substituted keywords.
"""
# Extract keywords (elements starting with '__')
keywords = {
key.strip("_"): value["value"]
for key, value in root_dict.items()
if isinstance(value, dict)
and "value" in value
and key.startswith("__")
}
def substitute_value(value):
if isinstance(value, str):
for keyword, replacement in keywords.items():
value = value.replace(f"__{keyword}__", replacement)
return value
def recursive_substitute(d):
for key, value in d.items():
if isinstance(value, dict):
recursive_substitute(value)
elif isinstance(value, str):
d[key] = substitute_value(value)
recursive_substitute(root_dict)
return root_dict
def _finalise(self, dictionary):
xml_entry = dictionary.pop("config", None)
if xml_entry:
xmlfile = xml_entry["value"].replace(
"__ConfigPath__", self.config_path
)
xmlfile = os.path.expanduser(xmlfile)
if not os.path.exists(xmlfile):
self.error(f"Referenced config file not found: {xmlfile}")
raise FileNotFoundError(
f"Referenced config file not found: {xmlfile}"
)
sub_system_dict = LoadOptions()(
filename=xmlfile, config_path=self.config_path
)
dictionary.update(sub_system_dict)
for key, item in dictionary.items():
if isinstance(item, dict):
self._finalise(item)
def _compact_string(self, string):
return string.replace("\n", "").strip() if string else ""
def _convert_value(self, value):
try:
return int(value)
except ValueError:
try:
return float(value)
except ValueError:
return self._convert_boolean(value)
def _convert_boolean(self, value):
if value == "True":
return True
elif value == "False":
return False
return value
def _read_data_table(self, datafile):
data_type = os.path.splitext(datafile)[1]
try:
data = Table.read(
os.path.expanduser(datafile),
fill_values=[("#N/A", "0"), ("N/A", "0"), ("", "0")],
format="ascii" + data_type,
)
except Exception as exc:
raise Exception(f"{datafile} caused the exception") from exc
for col in data.columns:
if hasattr(data[col], "fill_value"):
data[col].fill_value = 0.0
return data
def _clean_dict(input_dict):
"""
Cleans an input dictionary by removing the "value" notation and comments.
It can be applied recursively.
Parameters
----------
input_dict : dict
The dictionary to clean.
Returns
-------
dict
Cleaned dictionary.
"""
input_dict.pop("comment", None)
for key in list(input_dict.keys()):
if isinstance(input_dict[key], dict):
keys_list = list(input_dict[key].keys())
if keys_list == ["unit", "value"]:
input_dict[key] = input_dict[key]["value"]
elif keys_list == ["value"]:
input_dict[key] = input_dict[key]["value"]
else:
_clean_dict(input_dict[key])
return input_dict