"""
Reads a config file, assigns values to variables and creates an output directory
"""
# TODO Add check function for valid inv_species / out_species combinations
import os
import shutil
import tomllib
import logging
from copy import deepcopy
from pathlib import Path
from collections.abc import Iterable
import numpy as np
import pandas as pd
from openairclim.calc_cont import calc_sac_slope
_SENTINEL = object()
# CONSTANTS
# Template of config dictionary with types of MANDATORY input settings
CONFIG_TEMPLATE = {
"species": {"inv": Iterable, "out": Iterable},
"inventories": {"dir": str, "files": Iterable, "rel_to_base": bool},
"output": {
"run_oac": bool,
"run_metrics": bool,
"run_plots": bool,
"dir": str,
"name": str,
"overwrite": bool,
"concentrations": bool,
},
"time": {"range": Iterable},
"background": {
"dir": str,
"CO2": {"file": str, "scenario": str},
"CH4": {"file": str, "scenario": str},
"N2O": {"file": str, "scenario": str},
},
"responses": {"dir": str},
"temperature": {"method": str, "CO2": {"lambda": float}},
"metrics": {"types": Iterable, "t_0": Iterable, "H": Iterable},
"aircraft": {"types": Iterable},
}
# Default config settings to be added if not specified by user in config file
DEFAULT_CONFIG = {
"responses": {
"CO2": {
"response_grid": "0D",
"conc": {"method": "Sausen&Schumann"},
"rf": {"method": "Etminan_2016", "attr": "proportional"},
},
"H2O": {"response_grid": "2D"},
"O3": {"response_grid": "2D"},
"CH4": {
"response_grid": "2D",
"rf": {"method": "Etminan_2016", "attr": "proportional"},
},
"cont": {"response_grid": "cont", "method": "Megill_2025"},
},
"temperature": {"method": "Boucher&Reddy"},
}
# Species for which responses are calculated subsequently,
# i.e. dependent on computed response of other species
SPECIES_SUB_ARR = ["PMO"]
# Alias map that maintains backwards compatibility when config parameters change
ALIAS_MAP = {
"output.full_run": "output.run_oac",
}
[docs]
def get_config(file_name):
"""load_config, check_config and create_output_dir
Args:
file_name (str): Name of config file
Returns:
dict: Configuration dictionary
"""
config = load_config(file_name)
config = check_config(
config, config_template=CONFIG_TEMPLATE, default_config=DEFAULT_CONFIG
)
create_output_dir(config)
return config
[docs]
def load_config(file_name):
"""Loads config file in toml format.
Args:
file_name (str): Name of config file
Returns:
dict: Configuration dictionary
"""
try:
with open(file_name, "rb") as config_file:
config = tomllib.load(config_file)
return config
except FileNotFoundError as exc:
raise FileNotFoundError("No Config file found") from exc
except tomllib.TOMLDecodeError as exc:
raise tomllib.TOMLDecodeError(
"Config file is not a valid TOML document."
) from exc
[docs]
def load_ac_data(config):
"""Load aircraft identifier parameters from a separate csv file.
Args:
config (dict): Configuration dictionary
Raises:
FileNotFoundError: File does not exist
KeyError: If a required column or value does not exist
ValueError: If a duplicate identifier is found
Returns:
dict: Configuration dictionary modified in-place
"""
# check file is not defined, then return
ac_file = config["aircraft"].get("file")
if ac_file is None or (isinstance(ac_file, str) and not ac_file.strip()):
return config
# check whether file exists
file_path = Path(config["aircraft"]["dir"]) / config["aircraft"]["file"]
if not file_path.exists():
logging.error("File %s does not exist.", file_path)
raise FileNotFoundError(f"File {file_path} does not exist.")
# helper function that checks whether a column is present
def _check_column_present(df, col):
if col not in df.columns:
raise KeyError(
f"Required column '{col}' not present in aircraft identifier "
"input file."
)
# load file, check whether columns "ac"
df = pd.read_csv(file_path)
_check_column_present(df, "ac")
# check for NaNs or duplicates
if df["ac"].isna().any():
raise ValueError("NaN value found for aircraft identifier.")
if df["ac"].duplicated().any():
raise ValueError(
"Duplicate values found in column 'ac': "
f"{df[df['ac'].duplicated()]['ac'].unique()}"
)
# update aircraft types
config["aircraft"]["types"].extend(df["ac"].tolist())
config["aircraft"]["types"] = list(dict.fromkeys(config["aircraft"]["types"]))
# if contrails aren't calculated, we don't need to add the
# contrail-specific variables
if "cont" not in config["species"]["out"]:
return config
# add contrail-specific variables
# check "eff_fac" column is present and all values are valid
_check_column_present(df, "eff_fac")
if df["eff_fac"].isna().any():
raise ValueError("Invalid 'eff_fac' value found.")
# check whether G_250 and PMrel columns exist and if not initialise them
req_cols = ["G_250", "PMrel"]
for col in req_cols:
if col not in df.columns:
df[col] = np.nan
# calculate missing G_250 values
df_no_g = df[df["G_250"].isna()]
if not df_no_g.empty:
_check_column_present(df, "SAC_eq")
for idx, row in df_no_g.iterrows():
cols = ["SAC_eq", "Q_h", "eta", "eta_elec", "EIH2O", "R"]
args = [row.get(c) for c in cols]
try:
g_250 = calc_sac_slope(250e2, *args)
df.at[idx, "G_250"] = g_250
# log which aircraft failed
except ValueError:
logging.error(
"Error in calculation of G_250 of aircraft identifer %s", row["ac"]
)
raise
# calculate missing PMrel values
df_no_pmrel = df[df["PMrel"].isna()]
if not df_no_pmrel.empty:
_check_column_present(df, "PM")
for idx, row in df_no_pmrel.iterrows():
if pd.isna(row.get("PM")):
msg = f"Missing 'PM' value for aircraft identifier {row['ac']}"
logging.error(msg)
raise KeyError(msg)
df.at[idx, "PMrel"] = row.get("PM") / 1.5e15
# add values to config (no overwriting)
cols_to_add = ["G_250", "PMrel", "eff_fac"]
for _, row in df.iterrows():
ac = row["ac"]
new_data = {col: round(row[col], 3) for col in cols_to_add}
if ac not in config["aircraft"]:
config["aircraft"][ac] = new_data
else:
for k, v in new_data.items():
if k in config["aircraft"][ac]:
logging.warning(
"Aircraft '%s': value for '%s' already exists in config "
"(existing=%r, new=%r) — keeping existing value.",
ac,
k,
config["aircraft"][ac][k],
v,
)
config["aircraft"][ac].setdefault(k, v)
return config
def _apply_aliases(config: dict) -> dict:
"""Map deprecated variables to their new counterparts to maintain backwards
compatibility.
Args:
config (dict): Configuration dictionary
Returns:
dict: Configuration dictionary, modified in place.
"""
# loop over aliases
for old, new in ALIAS_MAP.items():
cur = config
parts = old.split(".")
for p in parts[:-1]:
if not isinstance(cur, dict) or p not in cur:
break # old path missing
cur = cur[p]
else:
old_key = parts[-1]
if old_key in cur: # old value is present
cur_new = config
new_parts = new.split(".")
for p in new_parts[:-1]:
cur_new = cur_new.setdefault(p, {}) # create new path
new_key = new_parts[-1]
if new_key not in cur_new:
cur_new[new_key] = cur.pop(old_key) # old -> new value
logging.warning(
"Config key '%s' is deprecated; migrated to '%s'. "
"Please update your config file.",
old,
new,
)
else:
logging.warning(
"Both deprecated key '%s' and new key '%s' exist; "
"keeping the new key. Please update your config file.",
old,
new,
)
return config
def _gather_response_files(config: dict) -> list[Path]:
"""Collect required response files for all 2D species.
Args:
config (dict): Configuration dictionary
Raises:
KeyError: If no response file is found.
Returns:
list[Path]: List of paths to all response files.
"""
_, species_2d, _, _ = classify_species(config)
resp_dir = Path(config["responses"]["dir"])
response_files: list[Path] = []
# for 2D species, find response files
for spec in species_2d:
spec_cfg = config["responses"].get(spec, {})
found_any = False
for resp_type in ("conc", "rf", "tau", "resp"):
try:
filename = spec_cfg[resp_type]["file"]
except (KeyError, TypeError):
continue
response_files.append(resp_dir / filename)
found_any = True
# if none are found, raise KeyError
if not found_any:
raise KeyError(f"No response file defined for {spec}")
return response_files
def _gather_inventory_files(config: dict) -> list[Path]:
"""Collect all inventory files, including base inventories if rel_to_base
is True.
Args:
config (dict): Configuration dictionary
Returns:
list[Path]: List of paths to all inventory files.
"""
inv = config["inventories"]
files: list[Path] = []
# get emission inventory paths
inv_dir = Path(inv.get("dir", ""))
for f in inv["files"]:
files.append(inv_dir / f)
# get base emission inventory paths
if inv.get("rel_to_base"):
base = inv.get("base", {})
base_dir = Path(base.get("dir", ""))
for f in base.get("files", []):
files.append(base_dir / f)
return files
def _aircraft_identifier_validation(config: dict) -> None:
"""Check aircraft identifiers and required contrail variables.
Args:
config (dict): Configuration dictionary
Raises:
ValueError: If a reserved aircraft identifier is used.
ValueError: If contrail variables are missing for an aircraft identifier.
"""
# ensure no reserved aircraft identifiers are present
ac_types = list(config["aircraft"]["types"])
reserved_acs = "TOTAL"
for reserved in reserved_acs:
if reserved in ac_types:
raise ValueError(
f"Aircraft identifier {reserved} is reserved and cannot be"
"defined in the config file."
)
# for the contrail module, test whether required parameters are present
if "cont" in config["species"]["out"]:
required = ("G_250", "eff_fac", "PMrel")
for ac in ac_types:
ac_cfg = config["aircraft"].get(ac)
if not isinstance(ac_cfg, dict):
msg = f"Contrail variables missing for aircraft {ac}."
logging.error(msg)
raise ValueError(msg)
for key in required:
if key not in ac_cfg:
msg = f"Variable {key} missing for aircraft {ac}."
logging.error(msg)
raise ValueError(msg)
def _assert_files_exist(paths: list[Path]) -> None:
"""Ensure that no files in the input list of paths are missing.
Args:
paths (list[Path]): List of paths to check.
Raises:
FileNotFoundError: If files are missing.
"""
missing = [str(p) for p in paths if not Path(p).exists()]
if missing:
for m in missing:
logging.error("File %s does not exist.", m)
raise FileNotFoundError("Missing required files:\n" + "\n".join(missing))
def _validate_against_template(cfg: dict, tmpl: dict, path=""):
"""Recursively ensure every key in template (tmpl) exists in config (cfg)
and has the right type. For dict-valued template entries, recurse into
their children. For leaf template entries, the tempalte value is a type
(e.g. str, Iterable, bool).count(value)
Args:
cfg (dict): Configuration dictionary
tmpl (dict): Configuration template dictioanry
path (str, optional): Path within recursive dict. Defaults to "".
"""
# check that config is a dictionary
if not isinstance(cfg, dict):
raise TypeError(f"{path or '<root>'} must be a dict.")
# recursively loop through keys and values
for k, v in tmpl.items():
here = f"{path}.{k}" if path else k
# if v is a dictionary, then it is a (sub)section of the config
if isinstance(v, dict):
if k not in cfg:
raise KeyError(f"Missing required section: {here}")
if not isinstance(cfg[k], dict):
raise TypeError(f"{here} must be a dict.")
# recurse into (sub)section
_validate_against_template(cfg[k], v, here)
# otherwise, v is a value, so check its type
else:
val = cfg.get(k, _SENTINEL)
if val is _SENTINEL:
raise KeyError(f"Missing required setting: {here}")
if not isinstance(val, v):
raise TypeError(
f"{here} has incorrect type: {type(val).__name__}"
f"(expected {v.__name__})"
)
def _merge_defaults_inplace(cfg: dict, defaults: dict):
"""Recursively add defaults into cfg (config) without overwriting existing
user values. If a key is missing, copy the default into cfg. If a key
exists, leave it as-is (even if the type differs).
Args:
cfg (dict): Configuration dictionary
defaults (dict): Configuration dictionary with default values
"""
for k, dv in defaults.items():
# if k does not exist in cfg, copy defaults into cfg
if k not in cfg:
cfg[k] = deepcopy(dv)
# if k does exist and is a value, do not overwrite
# if k exists and is a dict, recurse
else:
cv = cfg[k]
if isinstance(cv, dict) and isinstance(dv, dict):
_merge_defaults_inplace(cv, dv)
[docs]
def check_against_template(config, config_template, default_config):
"""Checks config dictionary against template:
- check if config is complete,
- add default settings if required,
- check if values have correct data types.
Args:
config (dict): Configuration dictionary
config_template (dict): Configuration template dictionary
default_config (dict): Default configuration dictionary
Returns:
dict: Configuration dictionary, possibly with added default settings
"""
# validate required keys and types from the template
_validate_against_template(config, config_template)
# add defaults non-destructively
_merge_defaults_inplace(config, default_config)
return config
[docs]
def check_config(config, config_template, default_config):
"""Checks if configuration is complete and correct
Args:
config (dict): Configuration dictionary
Raises:
KeyError: if no response file defined
Returns:
dict: Configuration dictionary
"""
# apply aliases for backwards compatibility of config files
config = _apply_aliases(config)
# validate and fill defaults (no overwriting)
config = check_against_template(config, config_template, default_config)
# check aircraft identifiers and contrail variables
config = load_ac_data(config)
_aircraft_identifier_validation(config)
# collect files and ensure that they exist
response_files = _gather_response_files(config)
inventory_files = _gather_inventory_files(config)
_assert_files_exist(response_files + inventory_files)
# metrics time settings
if config["output"]["run_metrics"]:
_check_metrics(config)
logging.info("Configuration file checked.")
return config
[docs]
def create_output_dir(config):
"""Check for existing output directory, results file,
overwrite and run_oac settings. Create new output directory if needed.
Args:
config (dict): Configuration dictionary
Raises:
OSError: if no output directory is created or
results file not existing with run_oac = false
"""
dir_path = config["output"]["dir"]
output_name = config["output"]["name"]
overwrite = config["output"]["overwrite"]
run_oac = config["output"]["run_oac"]
results_file = dir_path + output_name + ".nc"
metrics_file = dir_path + output_name + "_metrics.nc"
if not run_oac and os.path.exists(results_file):
msg = "Compute climate metrics only, using results file " + results_file
logging.info(msg)
if os.path.exists(metrics_file):
msg = "Overwrite existing metrics file " + metrics_file
logging.info(msg)
elif not run_oac and not os.path.exists(results_file):
raise OSError(
"Results file "
+ results_file
+ " does not exist."
+ " Repeat simulation with run_oac = true"
)
elif overwrite and not os.path.isdir(dir_path):
msg = "Create new output directory " + dir_path
logging.info(msg)
os.makedirs(dir_path)
elif overwrite and os.path.isdir(dir_path):
msg = "Overwrite existing output directory " + dir_path
logging.info(msg)
shutil.rmtree(dir_path)
os.makedirs(dir_path)
else:
raise OSError(
"No output directory is created. Set output overwrite = true for "
"overwriting existing directory or define a different directory path."
)
[docs]
def classify_species(config):
"""Classifies species into applied response modelling methods
Args:
config (dict): Configuration dictionary
Raises:
KeyError: if no valid response_grid in config
KeyError: if no response defined for a spec
Returns:
tuple: tuple of lists of strings (species names)
"""
species = config["species"]["out"]
responses = config["responses"]
species_0d = []
species_2d = []
species_cont = []
species_sub = []
for spec in species:
# Classify species_sub, no response_grid required
if spec in SPECIES_SUB_ARR:
species_sub.append(spec)
exists = True
else:
# Initialize exists flag
exists = False
# Check if response_grid is defined for spec and classify
for key, item in responses.items():
# Check if spec has config settings in response section
# If True, classify spec according to response_grid
if key == spec:
exists = True
if item["response_grid"] == "0D":
species_0d.append(spec)
elif item["response_grid"] == "2D":
species_2d.append(spec)
elif item["response_grid"] == "cont":
species_cont.append(spec)
else:
raise KeyError("No valid response_grid in config for", spec)
else:
pass
if exists is False:
raise KeyError("Responses not defined in config for", spec)
return species_0d, species_2d, species_cont, species_sub
[docs]
def classify_response_types(config, species_arr):
"""
Classifies species into categories based on their response types defined in the config
Args:
config (dict): Configuration dictionary
species_arr (list): A list of strings representing the species
Returns:
tuple: A tuple of lists. list (species_rf) contains species with response type 'rf',
i.e. a response file must be given comprising the response surface
from emissions to RF,
list (species_tau) contains species with response type 'tau',
i.e. a response file must be given comprising the response surface
from emissions to inverse species lifetime.
Raises:
KeyError: If no valid response type is defined in the configuration for a species.
"""
species_rf = []
species_tau = []
for spec in species_arr:
if "tau" in config["responses"][spec]:
if spec != "CH4":
raise KeyError(f'Response type "tau" not supported for {spec}')
species_tau.append(spec)
elif (
"rf" in config["responses"][spec]
and "file" in config["responses"][spec]["rf"]
):
species_rf.append(spec)
else:
raise KeyError("No valid response type defined in config for", spec)
return species_rf, species_tau
def _check_metrics(config: dict) -> None:
"""
Checks if metrics are properly defined.
Args:
config (dict): Configuration dictionary
"""
# metric types, H and t_0 must not be empty
req_keys = ("types", "H", "t_0")
arrs = {}
for key in req_keys:
val = config["metrics"].get(key)
if not isinstance(val, Iterable):
raise ValueError(f"config['metrics']['{val}'] must be an Iterable.")
val_lst = list(val)
if not val_lst:
raise ValueError(f"config['metrics']['{val}'] must not be empty.")
arrs[key] = val_lst
# get time information
time_config = config["time"]["range"]
time_range = np.arange(time_config[0], time_config[1], time_config[2], dtype=int)
delta_t = time_config[2]
if delta_t != 1.0:
msg = (
"Time step in time range is NOT 1.0 years which could "
"produce wrong metrics values."
)
logging.warning(msg)
# Iterate through all metrics time ranges
for t_zero, horizon in zip(arrs["t_0"], arrs["H"]):
time_metrics = np.arange(t_zero, (t_zero + horizon), delta_t)
for year_metrics in time_metrics:
if year_metrics not in time_range:
msg = (
f"Metrics time settings with t_0 = {t_zero} and H = "
f"{horizon} are outside of defined time range"
)
logging.error(msg)
raise ValueError(msg)
# Check if last year of time_metrics previous to last year in time range
if time_metrics[-1] < time_range[-1]:
logging.warning(
"Last year in metrics time with t_0 = %s and H = %s is earlier "
"than last year in time range.",
t_zero,
horizon,
)