"""
Writes output: results netCDF file and diagnositics files
"""
import os
import datetime
import getpass
import pandas as pd
import numpy as np
import xarray as xr
import joblib
import openairclim as oac
# CONSTANTS
RESULT_TYPE_DICT = {
"emis": {
"long_name": "Emission",
"units": {"CO2": "Tg", "H2O": "Tg", "NOx": "Tg", "distance": "km"},
},
"conc": {
"long_name": "Concentration",
"units": {"CO2": "ppmv", "H2O": "?", "O3": "?", "CH4": "ppbv"},
},
"RF": {
"long_name": "Radiative Forcing",
"units": {
"CO2": "W/m²",
"H2O": "W/m²",
"O3": "W/m²",
"CH4": "W/m²",
"PMO": "W/m²",
"cont": "W/m²",
},
},
"dT": {
"long_name": "Temperature change",
"units": {
"CO2": "K",
"H2O": "K",
"O3": "K",
"CH4": "K",
"PMO": "K",
"cont": "K",
},
},
"ATR": {"long_name": "Average Temperature Response", "units": "K"},
"AGWP": {
"long_name": "Absolute Global Warming Potential",
"units": "W m-2 year",
},
"AGTP": {
"long_name": "Absolute Global Temperature Change Potential",
"units": "K",
},
}
# dtype of output data variables in xarray Datasets and netCDF
OUT_DTYPE = "float32"
# Cache settings
CHECKSUM_PATH = "../cache/weights/"
# CHECKSUM_FILENAME = "checksum_neighbours.csv"
CHECKSUM_FILENAME = "checksum_weights.csv"
[docs]
def update_output_dict(output_dict, ac, result_type, val_arr_dict):
"""Update output_dict for a given aircraft with a new result type.
Args:
output_dict (dict): The main output dictionary to update.
Format: {ac: {var: np.ndarray}}
ac (str): Aircraft identifier from config file
result_type (str): Prefix for variable names, e.g. "RF"
val_arr_dict (dict): Dictionary of {species: np.ndarray} results.
Each array shold be 1D and represent a time series.
Returns:
None: Modifies output_dict in-place.
"""
if ac not in output_dict:
output_dict[ac] = {}
output_dict[ac].update({
f"{result_type}_{spec}": val_arr
for spec, val_arr in val_arr_dict.items()
})
[docs]
def write_output_dict_to_netcdf(config, output_dict, mode="w"):
"""Convert nested output dictionary into xarray Dataset and write to
netCDF file.
Args:
config (dict): Configuration from config file
output_dict (dict): Nested output dictionary. Levels are
{ac: {var: np.ndarray}}, where `ac` is the aircraft identifier,
`var` is a variable, e.g. "RF_CO2" and np.ndarray is of length
time (as defined in config)
mode (str, optional): Options: "a" (append) and "w" (write).
Returns:
xr.Dataset: OpenAirClim results
"""
# define output directory and name
output_dir = config["output"]["dir"]
output_name = config["output"]["name"]
output_filename = f"{output_dir}{output_name}.nc"
# define coordinates
time_config = config["time"]["range"]
time_arr = np.arange(
time_config[0], time_config[1], time_config[2], dtype=int
)
n_time = len(time_arr)
ac_lst = list(output_dict.keys())
# get (sorted) variable strings and check consistency
sort_order = {"emis": 0, "conc": 1, "RF": 2, "dT": 3}
variables = sorted(
list(next(iter(output_dict.values())).keys()),
key=lambda v: (sort_order.get(v.split("_")[0], 99), v.split("_")[1].lower())
)
for ac in ac_lst:
assert set(output_dict[ac].keys()) == set(variables), (
f"Variable mismatch in aircraft '{ac}'."
)
for var, arr in output_dict[ac].items():
assert isinstance(arr, np.ndarray), (
f"{ac}:{var} is not a np.ndarray"
)
assert arr.ndim == 1, f"{ac}:{var} must be 1D"
assert len(arr) == n_time, (
f"{ac}:{var} length {len(arr)} != expected {n_time}"
)
# get data
data_vars = {}
for var in variables:
result_type, spec = var.split("_")
descr = RESULT_TYPE_DICT[result_type]
stacked = np.stack([output_dict[ac][var] for ac in ac_lst], axis=0)
data_vars[var] = (
("ac", "time"),
stacked,
{
"long_name": f"{spec} {descr['long_name']}",
"units": descr["units"][spec],
}
)
# create dataset
coords = {
"time": ("time", time_arr, {"long_name": "time", "units": "years"}),
"ac": ("ac", ac_lst, {"long_name": "aircraft identifier"}),
}
ds = xr.Dataset(data_vars=data_vars, coords=coords)
# get username
try:
username = getpass.getuser()
except OSError:
username = "N/A"
ds.attrs = {
"title": output_name,
"created": f"{datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S')}",
"user": username,
"oac version": oac.__version__,
}
ds.to_netcdf(output_filename, mode=mode)
return ds
[docs]
def write_climate_metrics(
config: dict, metrics_dict: dict, mode: str = "w"
) -> xr.Dataset:
"""
Writes climate metrics to netCDF file.
Args:
config (dict): Configuration from config file
metrics_dict (dict): Dictionary of climate metrics, keys are metric types
mode (str, optional): Can be "w" for write or "a" for append. Defaults to "w".
Returns:
xr.Dataset: xarray Dataset of climate metrics
"""
output_dir = config["output"]["dir"]
output_name = config["output"]["name"]
output_filename = output_dir + output_name + "_metrics.nc"
metrics_type_arr = config["metrics"]["types"]
t_zero_arr = config["metrics"]["t_0"]
horizon_arr = config["metrics"]["H"]
# Construct xarray Datasets
var_arr = []
# get species names from keys of first dictionary in metrics_dict
# --> xarray dimension
first_dict = next(iter(metrics_dict.values()))
species_arr = np.array(list(first_dict.keys()))
xr_species = xr.Dataset(
data_vars={
"species": (
["species"],
species_arr,
{"long_name": "species"},
)
}
)
var_arr.append(xr_species)
# Iterate over all combinations of climate metrics
i = 0
for metrics_type in metrics_type_arr:
for t_zero in t_zero_arr:
t_zero_str = format(t_zero, ".0f")
for horizon in horizon_arr:
horizon_str = format(horizon, ".0f")
# get climate metrics values --> xarray Variable
#
# element i in outer dictionary --> inner dict = {'spec': val, ..}
inner_dict = list(metrics_dict.values())[i]
val_arr = np.array(list(inner_dict.values()))
descr = RESULT_TYPE_DICT[metrics_type]
var = xr.Dataset(
data_vars={
(
metrics_type + "_" + horizon_str + "_" + t_zero_str
): (
["species"],
val_arr,
{
"long_name": descr["long_name"],
"units": descr["units"],
"t_0": t_zero_str,
"H": horizon_str,
},
)
},
coords={"species": species_arr},
)
var_arr.append(var)
i = i + 1
output = xr.merge(var_arr)
output.attrs = {"Title": (output_name + " climate metrics")}
output = output.astype(OUT_DTYPE)
output.to_netcdf(output_filename, mode=mode)
return output
[docs]
def query_checksum_table(spec, resp, inv):
"""Look up in checksum table, if for the particular spec/resp/inv combination
pre-calculated data exists
Args:
spec (str): Name of the species
resp (xarray): Response xarray Dataset
inv (xarray): Emission inventory xarray Dataset
Returns:
xarray/None, int: xarray Dataset with weight parameters,
Number of rows in checksum table
"""
checksum_path = CHECKSUM_PATH
checksum_file = checksum_path + CHECKSUM_FILENAME
# Open checksum_file, if file or/and parent folder(s) not existing, create those
try:
checksum_df = pd.read_csv(checksum_file)
except IOError:
msg = (
"No checksum file "
+ checksum_file
+ " available, will create a new checksum file."
)
print(msg)
checksum_df = pd.DataFrame(
columns=["spec", "resp", "inv", "cache_file"]
)
try:
os.makedirs(checksum_path)
except FileExistsError:
pass
checksum_df.to_csv(checksum_file, index=False)
# Query if argument checksum combination is in checksum_file
resp_hash = joblib.hash(resp, hash_name="md5")
inv_hash = joblib.hash(inv, hash_name="md5")
found = False
for _index, row in checksum_df.iterrows():
if [row["spec"], row["resp"], row["inv"]] == [
spec,
resp_hash,
inv_hash,
]:
cache_file = row["cache_file"]
found = True
break
if found:
weights = xr.load_dataset(cache_file)
else:
weights = None
# Return existing xarray Dataset or None and the number of rows in checksum_file
return weights, checksum_df.shape[0]
[docs]
def update_checksum_table(spec, resp, inv, cache_file):
"""Add a row to the existing checksum table with hashes of resp and inv,
and path to cache_file
Args:
spec (str): Name of the species
resp (xarray): Response xarray
inv (xarray): Emission inventory
cache_file (str): Path to cache file
Returns:
pd.DataFrame: Representation of the current checksum table
"""
checksum_path = CHECKSUM_PATH
checksum_file = checksum_path + CHECKSUM_FILENAME
checksum_df = pd.read_csv(checksum_file)
resp_hash = joblib.hash(resp, hash_name="md5")
inv_hash = joblib.hash(inv, hash_name="md5")
checksum_df.loc[len(checksum_df)] = [spec, resp_hash, inv_hash, cache_file]
checksum_df.to_csv(checksum_file, index=False)
return checksum_df
[docs]
def write_concentrations(config, resp_dict, conc_dict):
"""Output of concentration changes,
Convert dictionary of time series numpy arrays into dictionary of xarray Datasets,
write to netCDF files, one per species
Args:
config (dict): Configuration from config file
resp_dict (dict): Dictionary of response xarray Datasets, keys are species
conc_dict (dict): Dictionary of time series numpy arrays (time, lat, plev),
keys are species
Returns:
dict: Dictionary of concentration changes (time, lat, plev), keys are species
"""
output_dir = config["output"]["dir"]
output_name = config["output"]["name"]
time_config = config["time"]["range"]
time_arr = np.arange(
time_config[0], time_config[1], time_config[2], dtype=int
)
output_dict = {}
for spec, resp in resp_dict.items():
output_path = output_dir + "conc_" + spec + ".nc"
conc_arr = conc_dict[spec]
output = xr.Dataset(
data_vars={
"time": (
["time"],
time_arr,
{"long_name": "time", "units": "years"},
)
},
attrs={
"Title": (output_name + " - concentration change"),
"Species": spec,
},
)
output["lat"] = resp.lat
output["plev"] = resp.plev
var = xr.DataArray(
data=conc_arr,
dims=["time", "lat", "plev"],
attrs={
"long_name": ("conc. change " + spec),
"units": "mol/mol",
},
)
var_name = "conc_" + spec
output[var_name] = var
output_dict[spec] = output
output.to_netcdf(output_path, mode="w")
return output_dict