Source code for openairclim.calc_cont

"""
Calculates the contrail response.
"""

__author__ = "Liam Megill"
__email__ = "liam.megill@dlr.de"
__license__ = "Apache License 2.0"


from typing import Iterable, Any, Sequence, Optional
from collections import defaultdict
import logging
import numpy as np
import numpy.typing as npt
import xarray as xr
from openairclim.read_netcdf import open_inventories, split_inventory_by_aircraft
from openairclim.interpolate_time import apply_evolution
from openairclim._premium import OAC_PREMIUM_AVAILABLE, LOW_SOOT_CASES, pm_factor_low

# CONSTANTS
R_EARTH = 6371.0  # [km] radius of Earth
KAPPA = 287.0 / 1003.5

# TYPE HINTS
ContGrid = tuple[
    npt.NDArray[np.floating],  # lon
    npt.NDArray[np.floating],  # lat
    npt.NDArray[np.floating],  # plev
]


[docs] def get_cont_grid(ds_cont: xr.Dataset) -> ContGrid: """Get contrail grid from `ds_cont`. Args: ds_cont (xr.Dataset): Dataset of precalculated contrail data. Returns: ContGrid: Tuple ``(lon, lat, plev)``; each is 1-D float array with shapes ``(n_lon,)``, ``(n_lat,)``, ``(n_plev,)``. Units: lon [deg], lat [deg], plev [hPa]. """ cc_lon_vals = ds_cont.lon.data cc_lat_vals = ds_cont.lat.data cc_plev_vals = ds_cont.plev.data return (cc_lon_vals, cc_lat_vals, cc_plev_vals)
[docs] def check_cont_input(ds_cont: xr.Dataset) -> None: """Checks the input data for the contrail module. Args: ds_cont (xr.Dataset): Dataset of precalculated contrail data. """ # required variables for Megill et al. (2025) formation method required_vars = [ "ppcf", "g_250", "l_1", "k_1", "x0_1", "d_1", "l_2", "k_2", "x0_2", ] required_coords = ["lat", "lon", "plev", "AC"] required_units = ["degrees_north", "degrees_east", "hPa", "None"] for var in required_vars: if var not in ds_cont: raise KeyError(f"Missing required variable '{var}' in `resp_cont.nc`.") for coord, unit in zip(required_coords, required_units): if coord not in ds_cont: raise KeyError(f"Missing required coordinate '{coord}' in `resp_cont.nc`.") got_unit = ds_cont[coord].attrs.get("units") if got_unit != unit: raise ValueError( f"Incorrect unit for coordinate '{coord}'. Got '{got_unit}', " "should be '{unit}'." ) # ensure that lat values descend and lon values ascend if not np.all(ds_cont.lat.values == np.sort(ds_cont.lat.values)[::-1]): raise ValueError("Latitude values must descend in `resp_cont.nc`.") if not np.all(ds_cont.lon.values == np.sort(ds_cont.lon.values)): raise ValueError("Longitude values must ascend in `resp_cont.nc`.") if not np.all(ds_cont.lon.values == ds_cont.lon.values % 360.0): raise ValueError( "Longitude values must be defined between 0 and 360 in `resp_cont.nc`" )
[docs] def calc_cont_grid_areas(lat: np.ndarray, lon: np.ndarray) -> np.ndarray: """Calculate the cell area of the contrail grid using a simplified method. Args: lat (np.ndarray): Latitudes of the grid cells [deg]. lon (np.ndarray): Longitudes of the grid cells [deg]. Returns: np.ndarray : Contrail grid cell areas as a function of latitude [km^2]. """ # pre-conditions if len(lat) == 0: raise ValueError("Latitudes array cannot be empty.") if len(lon) == 0: raise ValueError("Longitudes array cannot be empty.") if len(lat) != len(np.unique(lat)): raise ValueError( "Duplicate latitude values detected. Latitudes must be unique." ) if len(lon) != len(np.unique(lon)): raise ValueError( "Duplicate longitude values detected. Longitudes must be unique." ) if not np.all((lat > -90.0) & (lat < 90.0)): raise ValueError( "Latitude values must be strictly between -90 and +90 degrees." ) if not np.all((lon >= 0.0) & (lon <= 360.0)): raise ValueError("Longitude values must be between 0 and 360 degrees.") if np.all((0.0 in lon) & (360.0 in lon)): raise ValueError("Longitude grid must not include both 0 and 360 degrees.") if not np.all(lat == np.sort(lat)[::-1]): raise ValueError("Latitude values must be sorted in descending order.") if not np.all(lon == np.sort(lon)): raise ValueError("Longitude values must be sorted in ascending order.") # calculate dlon lon_padded = np.concatenate(([lon[-1] - 360.0], lon, [lon[0] + 360.0])) lon_midpoints = 0.5 * (lon_padded[1:] + lon_padded[:-1]) dlon_deg = np.diff(lon_midpoints) dlon = np.deg2rad(dlon_deg) * R_EARTH # calculate dlat lat_padded = np.concatenate(([90], lat, [-90])) # add +/-90 deg lat_midpoints = 0.5 * (lat_padded[1:] + lat_padded[:-1]) dlat = R_EARTH * np.abs( np.sin(np.deg2rad(lat_midpoints[:-1])) - np.sin(np.deg2rad(lat_midpoints[1:])) ) # calculate areas areas = np.outer(dlat, dlon) # post-conditions if not np.all(areas > 0.0): raise ValueError("Not all calculated areas are positive.") sphere_area = 4 * np.pi * R_EARTH**2 relative_error = abs(areas.sum() - sphere_area) / sphere_area if relative_error >= 1e-3: raise ValueError( "Total area check failed: computed area differs from Earth's " f"surface area by {relative_error:.4%}, which exceeds acceptable " "tolerance. Please check the contrail grid in `resp_cont.nc`." ) return areas
[docs] def load_base_inventories( config: dict[str, Any], inv_yrs: Sequence[int], cont_grid: ContGrid, ) -> dict[str, dict[int, xr.Dataset]]: """Load the base emission inventories. These must at least span the same time range as the input emission inventories, but can also be wider. The base emission inventories are linearly interpolated onto years that are defined by the input emission inventories if those years do not otherwise exist. Args: config (dict[str, Any]): Configuration dictionary from config file. inv_yrs (Sequence[int]): Years for which the input emission inventories are defined. cont_grid (tuple): Precalculated contrail grid. Shape ``(lon, lat, plev)``; each is 1-D float array with shapes ``(n_lon,)``, ``(n_lat,)``, ``(n_plev,)``. Units: lon [deg], lat [deg], plev [hPa]. Raises: ValueError: If the base emission inventories do not at least span the input emission inventories (given by `inv_yrs`). Returns: dict[str, dict[int, xr.Dataset]]: Full base emission inventory. First- level keys are "ac", second-level years. """ # load base inventories base_inv_dict = open_inventories(config, base=True) base_yrs = list(sorted(base_inv_dict.keys())) # check base inventories if min(base_yrs) > min(inv_yrs): raise ValueError( f"The inv_dict key {min(inv_yrs)} is less than the " f"earliest base_inv_dict key {min(base_yrs)}." ) if max(base_yrs) < max(inv_yrs): raise ValueError( f"The inv_dict key {max(inv_yrs)} is larger than the " f"largest base_inv_dict key {max(base_yrs)}." ) # split base inventories by aircraft identifiers full_base_inv_dict = split_inventory_by_aircraft(config, base_inv_dict, base=True) base_ac_lst = list(full_base_inv_dict.keys()) # if necessary, augment the base_inv_dict with years in inv_dict for base_ac in base_ac_lst: # add zero arrays if aircraft not defined full_base_inv_dict[base_ac] = pad_inv_dict( base_yrs, full_base_inv_dict[base_ac], ["distance"], cont_grid, base_ac ) # interpolate between inventories to missing years full_base_inv_dict[base_ac] = interp_base_inv_dict( inv_yrs, full_base_inv_dict[base_ac], ["distance"], cont_grid ) return full_base_inv_dict
[docs] def pad_inv_dict( inv_yrs: Sequence[int], inv_dict: dict[int, xr.Dataset], pad_vars: list[str], cont_grid: ContGrid, ac: str, ) -> dict: """This function checks whether all years given in `inv_yrs` are present in the input emission inventory `inv_dict`. If a year is missing, a zero dataset is added to `inv_dict` for each variable in `pad_vars` on the pre-calculated contrail grid `cont_grid`. The `ac` variable is also added since this is necessary for other functions in the contrail module. This functionality can be necessary if a specific aircraft identifier is not included in an emission inventory passed to OpenAirClim, for example because the aircraft newly enters service at a later time. Args: inv_yrs (Sequence[int]): Years for which the `inv_dict` emission inventory should be defined. inv_dict (dict[int, xr.Dataset]): Dictionary of emission inventory xarrays, keys are inventory years. pad_vars (list[str]): Variables to be included in the xarrays. cont_grid (tuple): Precalculated contrail grid. Shape ``(lon, lat, plev)``; each is 1-D float array with shapes ``(n_lon,)``, ``(n_lat,)``, ``(n_plev,)``. Units: lon [deg], lat [deg], plev [hPa]. ac (str): Aircraft identifier from config. Returns: dict: `inv_dict` modified in-place with zero arrays in missing years. """ # pre-conditions if "ac" in pad_vars: raise ValueError( "The 'ac' data variable is automatically added to the output " "xarrays and cannot be included in `pad_vars`." ) # determine which years to add zero arrays to inp_yrs = sorted(inv_dict.keys()) new_yrs = sorted(set(inv_yrs) - set(inp_yrs)) # if all years are present, return the sorted inv_dict with no changes if not new_yrs: return dict(sorted(inv_dict.items())) # otherwise, create the zero arrays cc_lon_vals, cc_lat_vals, cc_plev_vals = cont_grid grid_shape = (len(cc_lon_vals), len(cc_lat_vals), len(cc_plev_vals)) zero_arr = np.zeros(grid_shape) ac_arr = np.full(grid_shape, ac) # for loop over new years for new_yr in new_yrs: zero_inv = {} # add each variable and "ac" for var in pad_vars + ["ac"]: zero_inv[var] = xr.DataArray( data=ac_arr if var == "ac" else zero_arr, dims=["lon", "lat", "plev"], coords={ "lon": cc_lon_vals, "lat": cc_lat_vals, "plev": cc_plev_vals, }, ) # create dataset ds_i = xr.Dataset(zero_inv) ds_i_flat = ds_i.stack(index=["lon", "lat", "plev"]) ds_i_flat = ds_i_flat.reset_index("index") inv_dict[new_yr] = ds_i_flat # post-conditions missing = [yr for yr in new_yrs if yr not in inv_dict] if missing: raise RuntimeError( f"Missing years not included in output dictionary: {missing}." ) # add message to log logging.info( "Zero-value xarrays have been created for aircraft identifier %s " "for the years %s", ac, new_yrs ) return dict(sorted(inv_dict.items()))
[docs] def interp_base_inv_dict( inv_yrs: Sequence[int], base_inv_dict: dict[int, xr.Dataset], intrp_vars: Iterable[str], cont_grid: ContGrid, ) -> dict[int, xr.Dataset]: """Create base emission inventories for years in `inv_yrs` that do not exist in `base_inv_dict`. Args: inv_yrs (Sequence[int]): Dictionary of emission inventory xarrays, keys are inventory years. base_inv_dict (dict[int, xr.Dataset]): Dictionary of base emission inventory xarrays. Keys are inventory years. intrp_vars (Iterable[str]): List of strings of data variables in base_inv_dict that are to be included in the missing base inventories, e.g. ["distance"] (for contrail calculations). cont_grid (tuple): Precalculated contrail grid. Shape ``(lon, lat, plev)``; each is 1-D float array with shapes ``(n_lon,)``, ``(n_lat,)``, ``(n_plev,)``. Units: lon [deg], lat [deg], plev [hPa]. Returns: dict[int, xr.Dataset]: Dictionary of base emission inventory xarrays including any missing years compared to inv_dict. Keys are inventory years. Note: A custom nearest neighbour method is used for regridding and a linear interpolation method for calculating data in missing years. In future versions, the user will be able to select methods for both. """ # if base_inv_dict is empty, then return the empty dictionary. if not base_inv_dict: return {} # pre-conditions if inv_yrs is None or np.size(inv_yrs) == 0: raise ValueError("inv_yrs cannot be empty.") if not intrp_vars: raise ValueError("intrp_vars cannot be empty.") for intrp_var in intrp_vars: for year in base_inv_dict.keys(): if intrp_var not in base_inv_dict[year]: raise KeyError( f"Variable '{intrp_var}' is missing from base_inv_dict " f"for year {year}." ) # get years that need to be calculated base_yrs = sorted(base_inv_dict.keys()) inv_yrs = sorted(set(inv_yrs)) intrp_yrs = sorted(set(inv_yrs) - set(base_yrs)) # initialise output intrp_base_inv_dict = base_inv_dict.copy() # get contrail grid cc_lon_vals, cc_lat_vals, cc_plev_vals = cont_grid # if there are years in inv_dict that do not exist in base_inv_dict if intrp_yrs: # find upper and lower neighbouring base_inv_dict years intrp_yr_idx = np.searchsorted(base_yrs, intrp_yrs) yrs_lb = [base_yrs[idx - 1] for idx in intrp_yr_idx] yrs_ub = [base_yrs[idx] for idx in intrp_yr_idx] yrs_regrid = np.unique(yrs_lb + yrs_ub) # regrid base inventories to contrail grid regrid_base_inv_dict = {} for yr in yrs_regrid: base_inv = base_inv_dict[yr] # find nearest neighbour indices lon_idxs = np.abs( cc_lon_vals[:, np.newaxis] - base_inv.lon.data ).argmin(axis=0) lat_idxs = np.abs( cc_lat_vals[:, np.newaxis] - base_inv.lat.data ).argmin(axis=0) plev_idxs = np.abs( cc_plev_vals[:, np.newaxis] - base_inv.plev.data ).argmin(axis=0) # create DataArray for yr regrid_base_inv = {} for intrp_var in intrp_vars: intrp_arr = np.zeros( (len(cc_lon_vals), len(cc_lat_vals), len(cc_plev_vals)) ) np.add.at( intrp_arr, (lon_idxs, lat_idxs, plev_idxs), base_inv[intrp_var].data.flatten(), ) regrid_base_inv[intrp_var] = xr.DataArray( data=intrp_arr, dims=["lon", "lat", "plev"], coords={ "lon": cc_lon_vals, "lat": cc_lat_vals, "plev": cc_plev_vals, }, ) # create dataset regrid_base_inv_dict[yr] = xr.Dataset(regrid_base_inv) # linearly interpolate base_inv for i, yr in enumerate(intrp_yrs): # linear weighting w = (yr - yrs_lb[i]) / (yrs_ub[i] - yrs_lb[i]) ds_i = ( regrid_base_inv_dict[yrs_lb[i]] * (1 - w) + regrid_base_inv_dict[yrs_ub[i]] * w ) # reset index to match input inventories ds_i_flat = ds_i.stack(index=["lon", "lat", "plev"]) ds_i_flat = ds_i_flat.reset_index("index") intrp_base_inv_dict[yr] = ds_i_flat # sort intrp_base_inv_dict intrp_base_inv_dict = dict(sorted(intrp_base_inv_dict.items())) # post-conditions missing = [yr for yr in intrp_yrs if yr not in intrp_base_inv_dict] if missing: raise RuntimeError( f"Missing years not included in output dictionary: {missing}." ) # only return values for years defined in inv_dict return {yr: intrp_base_inv_dict[yr] for yr in inv_yrs}
[docs] def calc_sac_slope( p: float, sac_eq: str, q_h: float, eta: Optional[float] = None, eta_elec: Optional[float] = None, ei_h2o: Optional[float] = None, r: Optional[float] = None, ) -> float: """Calculates the slope of the SAC mixing line. Args: p (float): Ambient pressure [Pa] sac_eq (str): SAC equation to be used, one of "CON", "HYB", "H2C", "H2FC" q_h (float): Lower heating value of the fuel [J/kg] for "CON", "HYB" and "H2C"; formation enthalpy of water vapour [J/mol] for "H2FC" eta (float, optional): Overall propulsion efficiency of the liquid fuel system [-] eta_elec (float, optional): Overall propulsion efficiency of the electric/fuel cell system [-] ei_h2o (float, optional): Emission index of water vapour [kg/kg] r (float, optional): Degree of hybridisation. r=1 is pure liquid fuel operation; r=0 pure electric operation Raises: ValueError: If p is likely given in hPa rather than Pa. ValueError: If unknown sac_eq. Returns: float: Slope of the SAC mixing line [Pa/K]. """ c_p = 1004.0 # isobaric heat capactiy of air [J/kg/K] c_p_bar = 30.6 # mole-based heat capacity of exhaust gas [J/mol/K] eps = 0.622 # molar mass ratio of water vapour and dry air # check p - if in hPa range, give error if np.any(p < 1.1e3): raise ValueError("Ambient pressure must have unit [Pa].") # conventional SAC function (eq. (1) & (3) in Megill & Grewe, 2025) if sac_eq in ("CON", "H2C"): if ei_h2o is None or eta is None: raise ValueError("Missing required values: ei_h2o, eta") return c_p * p / eps * ei_h2o / (1. - eta) / abs(q_h) # hybrid aircraft (Yin et al., 2020; eq. (2) in Megill & Grewe, 2025) if sac_eq == "HYB": if r is None or ei_h2o is None or eta is None or eta_elec is None: raise ValueError("Missing required values: r, ei_h2o, eta, eta_elec") num = c_p * p / eps * r * ei_h2o den= q_h * (r * (1. - eta) + (1. - r) * (1. - eta_elec) * eta / eta_elec) return num / den # hydrogen fuel cell (Gierens(2021); eq. (4) in Megill & Grewe, 2025) if sac_eq == "H2FC": if eta_elec is None: raise ValueError("Missing required values: eta_elec") return c_p_bar * p / (1. - eta_elec) / abs(q_h) raise ValueError(f"Invalid SAC equation {sac_eq}")
[docs] def calc_ppcf( config: dict[str, Any], ds_cont: xr.Dataset, ac: str, ) -> xr.DataArray: """Calculate Potential Persistent Contrail Formation (p_PCF) using the precalculated contrail data from the Limiting Factors study (Megill & Grewe, 2025; default). Args: config (dict[str, Any]): Configuration dictionary from config file. ds_cont (xr.Dataset): Dataset of precalculated contrail data. ac (str): Aircraft identifier from config. Raises: KeyError: If key "formation_method" is not defined. ValueError: If "formation_method" key is unknown. Returns: xr.DataArray: Interpolated p_PCF on precalculated contrail data grid """ # pre-conditions if "formation_method" not in config["responses"]["cont"]: raise KeyError( "Missing 'formation_method' key in config['responses']['cont']." ) form_method = config["responses"]["cont"]["formation_method"] if form_method not in ["Megill_2025"]: raise ValueError( "Unknown formation_method in config['responses']['cont']. " "Options are currently only 'Megill_2025' (default)." ) # Megill_2025 method (default) return calc_ppcf_megill(config, ds_cont, ac)
[docs] def calc_ppcf_megill( config: dict[str, Any], ds_cont: xr.Dataset, ac: str, ) -> xr.DataArray: """Calculate the Potential Persistent Contrail Formation (p_PCF) using the Megill & Grewe (2025) method and precalculated data from ERA5. Args: config (dict[str, Any]): Configuration dictionary from config file. ds_cont (xr.Dataset): Dataset of precalculated contrail data. ac (str): Aircraft identifier from config. Returns: xr.DataArray: Interpolated p_PCF on precalculated contrail data grid. """ # get G value at 250 hPa if "G_250" not in config["aircraft"][ac]: raise KeyError(f"Missing 'G_250' key in config['aircraft']['{ac}'].") g_in = config["aircraft"][ac]["G_250"] precal_g_vals = ds_cont.g_250.data # ensure that G is not lower than lowest pre-calculated G if g_in < min(precal_g_vals): raise ValueError( "Selected G_250 value is below pre-calculated values. OpenAirClim " "cannot guarantee the accuracy of the fits here. If this G_250 " "value is required, please contact the dev team." ) # find left and right neighbours right_idx = np.searchsorted(precal_g_vals, g_in) left_idx = max(right_idx - 1, 0) right_idx = min(right_idx, len(precal_g_vals) - 1) g_nbrs = precal_g_vals[[left_idx, right_idx]] # find p_pcf values for input and neighbours at 250 hPa params_1 = np.array( [ds_cont.sel(plev=250)[var] for var in ["l_1", "k_1", "x0_1", "d_1"]] ) params_2 = np.array([ds_cont.sel(plev=250)[var] for var in ["l_2", "k_2", "x0_2"]]) y_in = logistic_gen(g_in, *params_1) + logistic(g_in, *params_2) y_nbrs = logistic_gen(g_nbrs, *params_1) + logistic(g_nbrs, *params_2) # if G > largest pre-calculated G if left_idx == len(precal_g_vals) - 1: logging.warning( "Selected G is above pre-calculated values. Use results with caution." ) p_pcf = ds_cont.isel(AC=-1).ppcf # if G is between two pre-calculated values else: x = (y_in - y_nbrs[0]) / (y_nbrs[1] - y_nbrs[0]) p_pcf = (1 - x) * ds_cont.isel(AC=left_idx).ppcf + x * ds_cont.isel( AC=right_idx ).ppcf return p_pcf
[docs] def logistic(x: npt.ArrayLike, l: float, k: float, x0: float) -> np.ndarray: """Computes the logistic function, a sigmoid curve, commonly used to model growth or decay. Function from Megill & Grewe (2025): https://github.com/liammegill/contrail-limiting-factors Args: x (npt.ArrayLike): The input values for which the logistic function will be computed. l (float): The maximum value or carrying capacity of the function. k (float): The steepness of the curve. x0 (float): The midpoint value of `x` where the function reaches half of `l`. Returns: np.ndarray: The logistic function values for the given input `x`. """ np.seterr(all="raise") x = np.asarray(x) try: return l / (1 + np.exp(-k * (x - x0))) except FloatingPointError: # protect the exponential return np.full_like(x, np.nan, dtype=float)
[docs] def logistic_gen( x: npt.ArrayLike, l: float, k: float, x0: float, d: float ) -> np.ndarray: """Computes a generalized logistic function with an additional vertical shift. Function from Megill & Grewe (2025): https://github.com/liammegill/contrail-limiting-factors Args: x (npt.ArrayLike): The input values for which the logistic function will be computed. l (float): The maximum value or carrying capacity of the function. k (float): The steepness of the curve. x0 (float): The midpoint value of `x` where the function reaches half of `l`. d (float): The vertical shift applied to the function. Returns: np.ndarray: The values of the shifted logistic function for the input `x`. """ np.seterr(all="raise") x = np.asarray(x) try: return l / (1 + np.exp(-k * (x - x0))) + d except FloatingPointError: # protect the exponential return np.full_like(x, np.nan, dtype=float)
[docs] def interp_ppcf( inv: xr.Dataset, p_pcf: xr.DataArray, cont_grid: ContGrid ): """Interpolate p_PCF onto contrail grid. Args: inv (xr.Dataset): Emission inventory for a given year. p_pcf (xr.DataArray): p_PCF on precalculated contrail data grid. cont_grid (tuple): Precalculated contrail grid. Shape ``(lon, lat, plev)``; each is 1-D float array with shapes ``(n_lon,)``, ``(n_lat,)``, ``(n_plev,)``. Units: lon [deg], lat [deg], plev [hPa]. Returns: (np.ndarray, tuple): Interpolated p_PCF; tuple of indices. """ # get cont_grid cc_lon_vals, cc_lat_vals, cc_plev_vals = cont_grid # find indices lat_idxs = np.abs(cc_lat_vals[:, np.newaxis] - inv.lat.data).argmin(axis=0) lon_idxs = np.abs(cc_lon_vals[:, np.newaxis] - inv.lon.data).argmin(axis=0) plev_idxs = len(cc_plev_vals) - np.searchsorted( cc_plev_vals[::-1], inv.plev.data, side="right" ) # interpolate over plev using power law between upper and lower bounds plev_ub = cc_plev_vals[plev_idxs] plev_lb = cc_plev_vals[plev_idxs - 1] sigma_plev = 1 - ( (inv.plev.data**KAPPA - plev_lb**KAPPA) / (plev_ub**KAPPA - plev_lb**KAPPA) ) # calculate p_pcf p_pcf_ub = p_pcf.values[lat_idxs, lon_idxs, plev_idxs] p_pcf_lb = p_pcf.values[lat_idxs, lon_idxs, plev_idxs - 1] p_pcf_intrp = sigma_plev * p_pcf_lb + (1 - sigma_plev) * p_pcf_ub return p_pcf_intrp, (lat_idxs, lon_idxs, plev_idxs)
[docs] def calc_cfdd( config: dict[str, Any], inv_dict: dict[int, xr.Dataset], ds_cont: xr.Dataset, cont_grid: ContGrid, ac: str, ) -> dict[int, np.ndarray]: """Calculate the Contrail Flight Distance Density (CFDD) for each year in inv_dict. This function uses the p_pcf data calculated using ERA5 (Megill & Grewe, 2025). Args: config (dict[str, Any]): Configuration dictionary from config file. inv_dict (dict[int, xr.Dataset]): Dictionary of emission inventory xarray datasets. Keys are inventory years. ds_cont (xr.Dataset): Dataset of precalculated contrail data. cont_grid (tuple): Precalculated contrail grid. Shape ``(lon, lat, plev)``; each is 1-D float array with shapes ``(n_lon,)``, ``(n_lat,)``, ``(n_plev,)``. Units: lon [deg], lat [deg], plev [hPa]. ac (str): Aircraft identifier from config. Returns: dict[int, np.ndarray]: Dictionary with CFDD values [km/km2], keys are inventory years """ # calculate ppcf and ensure that it is of shape (lat, lon, plev) p_pcf = calc_ppcf(config, ds_cont, ac) p_pcf = p_pcf.T.transpose("lat", "lon", "plev") # calculate contrail grid areas cc_lon_vals, cc_lat_vals, cc_plev_vals = cont_grid areas = calc_cont_grid_areas(cc_lat_vals, cc_lon_vals) # cut inv_dict at pre-calculated contrail grid extremes inv_dict = check_plev_range(inv_dict.copy(), cont_grid) # calculate CFDD cfdd_dict = {} for year, inv in inv_dict.items(): # p_pcf is interpolated using a power law over pressure level and using # a nearest neighbour for latitude and longitude. p_pcf_intrp, (lat_idxs, lon_idxs, plev_idxs) = interp_ppcf( inv, p_pcf, cont_grid ) # calculate CFDD # 1800s since the CFDD method was developed using 30min intervals # 3153600s in one year sum_contrib = inv.distance.data * p_pcf_intrp * 1800.0 / 31536000.0 sum_km = np.zeros((len(cc_plev_vals), len(cc_lat_vals), len(cc_lon_vals))) np.add.at(sum_km, (plev_idxs, lat_idxs, lon_idxs), sum_contrib) cfdd = sum_km / areas cfdd_dict[year] = cfdd # post-conditions for year, cfdd in cfdd_dict.items(): expected_shape = (len(cc_plev_vals), len(cc_lat_vals), len(cc_lon_vals)) assert cfdd.shape == expected_shape, f"Shape of CFDD for year {year} is not correct." return cfdd_dict
[docs] def check_plev_range( inv_dict: dict[int, xr.Dataset], cont_grid: ContGrid, clamp: bool = True ) -> dict[int, xr.Dataset]: """Checks whether all pressure level values in `inv_dict` are within the bounds of the pre-calculated contrail grid. Logs a warning if any values are found and automatically clamps values into the allowed range. Args: inv_dict (dict[int, xr.Dataset]): Dictionary of emission inventory xarray datasets. Keys are inventory years. cont_grid (tuple): Precalculated contrail grid. Shape ``(lon, lat, plev)``; each is 1-D float array with shapes ``(n_lon,)``, ``(n_lat,)``, ``(n_plev,)``. Units: lon [deg], lat [deg], plev [hPa]. clamp (bool, optional): Whether the values should be clamped to within the allowed range. Defaults to True. Returns: dict[int, xr.Dataset]: Dictionary of emission inventory xarray datasets clamped to within the allowed plev range. """ # get pre-calculated contrail plev values cc_plev_vals = np.asarray(cont_grid[2]) pmin = float(cc_plev_vals.min()) pmax = float(cc_plev_vals.max()) # loop over inventories n_bad = 0 for year, inv in inv_dict.items(): # calculate number of values outside of range plev_inv = inv["plev"].values finite = np.isfinite(plev_inv) bad_low = finite & (plev_inv < pmin) bad_high = finite & (plev_inv > pmax) bad_mask = bad_low | bad_high n_bad += int(bad_mask.sum()) # get maximum/minimum values for logger warning min_val = plev_inv.min() max_val = plev_inv.max() if clamp: inv_dict[year]["plev"] = np.clip(inv["plev"], pmin, pmax) if n_bad > 0: logging.warning( "Found %d 'plev' values outside the allowed range [%g, %g]. " "Observed plev values min=%g, max=%g. Values were automatically " "clamped into the allowed range. Use results with caution.", n_bad, pmin, pmax, min_val, max_val ) return inv_dict
[docs] def cfdd_to_1d( cfdd_dict: dict[str, dict[int, np.ndarray]], cont_grid: ContGrid ) -> dict[str, dict[int, np.ndarray]]: """Convert 3D CFDD to 1D (lon axis) CFDD to match contrail cirrus coverage using a vertical sum and area-weighting to remove latitude-dependence. Args: cfdd_dict (dict[str, dict[int, np.ndarray]]): Dictionary with CFDD values [km/km2] in 3D (plev, lat, lon). Keys are inventory years. cont_grid (tuple): Precalculated contrail grid. Shape ``(lon, lat, plev)``; each is 1-D float array with shapes ``(n_lon,)``, ``(n_lat,)``, ``(n_plev,)``. Units: lon [deg], lat [deg], plev [hPa]. Returns: dict[str, dict[int, np.ndarray]]: Dictionary with CFDD values in 1D (lon). """ # get contrail grid areas cc_lon_vals, cc_lat_vals, _ = cont_grid areas = calc_cont_grid_areas(cc_lat_vals, cc_lon_vals) cfdd_1d: dict[str, dict[int, np.ndarray]] = {ac: {} for ac in cfdd_dict.keys()} for ac in cfdd_dict.keys(): for yr, val in cfdd_dict[ac].items(): cfdd_2d = val.sum(axis=0) cfdd_1d[ac][yr] = (cfdd_2d * areas).sum(axis=0) / areas.sum(axis=0) return cfdd_1d
[docs] def pm_factor_high(x: float, params: tuple) -> float: """Calculate nvPM factor in the high-soot regime. Args: x (float): relative nvPM emissions with respect to 1.5e15 kg^-1 params (tuple): fit parameters Returns: float: nvPM factor """ a, b, c = params return a * np.arctan(b * x**c)
[docs] def pm_factor_high_prime(x: float, params: tuple) -> float: """Calculate the first-order derivative of the nvPM factor in the high-soot regime. Args: x (float): relative nvPM emissions with respect to 1.5e15 kg^-1 params (tuple): fit parameters Returns: float: derivative of the nvPM factor """ a, b, c = params return a * (1.0 / (1.0 + (b * x**c) ** 2)) * b * c * x ** (c - 1.0)
[docs] def pm_factor(x: float, ls_case: str = "case_mid") -> float: """Calculate the nvPM factor depending on the relative nvPM emissions. The low-soot regime (x < 0.1) is currently only available with the OpenAirClim Premium license. Please contact the OpenAirClim team if you would like access to this. Note that the factor is not validated in the low-soot regime. Reference: Megill (2026) Args: x (float): relative nvPM emissions with respect to 1.5e15 kg^-1 ls_case (str, optional): Descriptor of pre-calculated case. One of: "case_low", "case_mid" or "case_high". Defaults to "case_mid". Raises: ValueError: For invalid nvPM emissions. ImportError: For low-soot regime calls if the `openairclim_premium` package cannot be found. Returns: float: nvPM factor """ # pre-conditions if x < 0.0: raise ValueError("nvPM emissions must be positive.") if ls_case not in ["case_low", "case_mid", "case_high"]: raise ValueError(f"Unknown low_soot_case {ls_case}.") # predefined fit parameters high_soot_params = (0.91, 1.96, 0.58) if 0.0 <= x < 0.1: logging.warning( "Selected nvPM emissions are in the low-soot regime, which is not" "validated. Use contrail results with caution." ) if not OAC_PREMIUM_AVAILABLE: raise ImportError( "Modelling of the low-soot regime (PMrel < 0.1) is currently" "premium functionality. Please install openairclim_premium on" "the path or contact the development team for licensing." ) assert LOW_SOOT_CASES is not None assert pm_factor_low is not None x0 = LOW_SOOT_CASES[ls_case][0] y0 = pm_factor_high(x0, high_soot_params) m0 = pm_factor_high_prime(x0, high_soot_params) return pm_factor_low(x, y0, m0, LOW_SOOT_CASES[ls_case]) return pm_factor_high(x, high_soot_params)
[docs] def calc_cccov_alltau( cfdd_dict: dict[int, np.ndarray], cont_grid: ContGrid, ) -> dict[int, np.ndarray]: """Calculate contrail cirrus coverage (all tau) using the Megill et al. (2025) method. Args: cfdd_dict (dict[int, np.ndarray]): Dictionary with 1D (lon) CFDD values [km/km2]. Keys are inventory years. cont_grid (tuple): Precalculated contrail grid. Shape ``(lon, lat, plev)``; each is 1-D float array with shapes ``(n_lon,)``, ``(n_lat,)``, ``(n_plev,)``. Units: lon [deg], lat [deg], plev [hPa]. Returns: dict[int, np.ndarray]: Dictionary with 1D (lon) cccov (all tau) values. Keys are inventory years """ # pre-conditions cc_lon_vals, cc_lat_vals, cc_plev_vals = cont_grid for year, cfdd in cfdd_dict.items(): assert cfdd.shape == ( len(cc_plev_vals), len(cc_lat_vals), len(cc_lon_vals), ), f"Shape of CFDD array for year {year} is not correct." # calculate areas areas = calc_cont_grid_areas(cc_lat_vals, cc_lon_vals) # calculate cccov cccov_dict = {} for year, cfdd in cfdd_dict.items(): cov_3d = 7.722e-2 * np.tanh(1.323e2 * cfdd) cov_2d = 1.0 - np.prod(1.0 - cov_3d, axis=0) cov_1d = (cov_2d * areas).sum(axis=0) / areas.sum(axis=0) cccov_dict[year] = cov_1d # post-conditions for year, cccov in cccov_dict.items(): assert cccov.shape == (len(cc_lon_vals),), ( f"Shape of cccov array for year {year} is not correct." ) return cccov_dict
[docs] def calc_cccov_taup05( config: dict[str, Any], cccov_dict: dict[int, np.ndarray], ac: str ) -> dict[int, np.ndarray]: """Convert contrail cirrus coverage (all tau) to optically thick contrail cirrus coverage (tau > 0.05). Args: config (dict[str, Any]): Configuration dictionary from config file. cccov_dict (dict[int, np.ndarray]): Dictionary with 1D (lon) contrail cirrus coverage (all optical thicknesses). Keys are inventory years. ac (str): Aircraft identifier from config. Raises: KeyError: If "PMrel" not defined in config. KeyError: If "low_soot_case" not defined in config (OpenAirClim will default to "case_mid"). Returns: dict[int, np.ndarray]: Dictionary with 1D (lon) cccov (tau > 0.05) values. Keys are inventory years. """ # pre-conditions if "PMrel" not in config["aircraft"][ac]: raise KeyError(f"Missing 'PMrel' key in config['aircraft']['{ac}'].") if "low_soot_case" not in config["responses"]["cont"]: raise KeyError("Missing 'low_soot_case' key in config['responses']['cont'].") pm_rel = config["aircraft"][ac]["PMrel"] ls_case = config["responses"]["cont"]["low_soot_case"] # convert all tau -> tau > 0.05 cccov_dict_taup05 = {} for year, cccov in cccov_dict.items(): cov_p05 = cccov * pm_factor(pm_rel, ls_case) cccov_dict_taup05[year] = cov_p05 # post-conditions for year, cov_p05 in cccov_dict_taup05.items(): assert ( cov_p05.shape == cccov_dict[year].shape ), f"Shape of cccov array for year {year} is not correct." return cccov_dict_taup05
[docs] def contrail_attribution( input_dict: dict[int, np.ndarray], ac_dict: dict[int, np.ndarray], total_dict: dict[int, np.ndarray], ) -> dict[int, np.ndarray]: """ Use proportional attribution to split the input into the contribution from ac_dict (single aircraft identifier). The keys of all inputs must match. Args: input_dict (dict[int, np.ndarray]): Dictionary to be split, keys are inventory years. ac_dict (dict[int, np.ndarray]): Dictionary with values for a single aircraft identifier, could be CFDD or coverage values for example. Keys are inventory years. total_dict (dict[int, np.ndarray]): Dictionary with values for all aircraft identifiers, could be CFDD or coverage values for example (must match ac_dict). Keys are inventory years Returns: dict[int, np.ndarray]: Dictionary with proportionally attributed values. Keys are years. """ # pre-conditions assert set(input_dict.keys()) == set(total_dict.keys()), ( "Keys of input_dict and total_dict do not match." ) assert set(input_dict.keys()) == set(ac_dict.keys()), ( "Keys of input_dict and ac_dict do not match." ) att_dict = {} for year in input_dict.keys(): result = np.zeros_like(total_dict[year], dtype=float) np.divide( input_dict[year] * ac_dict[year], total_dict[year], where=total_dict[year] != 0, out=result, ) att_dict[year] = result # post conditions for year in total_dict.keys(): assert np.all(att_dict[year] >= 0.0), "Negative attributed values detected." return att_dict
[docs] def calc_cont_rf( cccov_dict: dict[int, np.ndarray], cont_grid: ContGrid ) -> dict[int, np.ndarray]: """Calculate contrail Radiative Forcing (RF) using the Megill et al. (2025) method. Args: cccov_dict (dict[int, np.ndarray]): Dictionary with 1D (lon) contrail cirrus coverage (tau > 0.05; shape: lon), keys are inventory years cont_grid (tuple): Precalculated contrail grid. Shape ``(lon, lat, plev)``; each is 1-D float array with shapes ``(n_lon,)``, ``(n_lat,)``, ``(n_plev,)``. Units: lon [deg], lat [deg], plev [hPa]. Returns: dict[int, np.ndarray]: Dictionary with contrail RF values for all inventory years. """ # pre-conditions: check config if not cccov_dict: raise ValueError("cccov_dict cannot be empty.") for year, cccov in cccov_dict.items(): assert cccov.shape == ( len(cont_grid[0]), ), f"Shape of cccov_tot array for year {year} is not correct." # set up longitude-dependent regions rf_fit_arr = [[9.64, 1.02], [10.81, 1.0], [1.93, 0.81], [20.33, 1.14]] lon_bins = [-30, 60, 160, 235, 330] shift = lon_bins[0] lon_shifted = (cont_grid[0] - shift) % 360.0 lon_bins_shifted = np.array(lon_bins) - shift lon_idxs = np.digitize(lon_shifted, lon_bins_shifted) - 1 # calculate RF rf_dict = {} for year, cccov in cccov_dict.items(): rf_arr = np.empty((cont_grid[0].shape)) for i in range(len(lon_bins) - 1): mask = lon_idxs == i rf_arr[mask] = rf_fit_arr[i][0] * cccov[mask] ** rf_fit_arr[i][1] rf_dict[year] = rf_arr return rf_dict
[docs] def apply_wingspan_correction( config: dict[str, Any], rf_arr: Iterable[float], ac: str ) -> np.ndarray: """Apply wingspan correction to an array of RF values. The function is applied against a reference wingspan of 35 m. References: Bruder et al. (2025) and Megill et al. (2025). Args: config (dict[str, Any]): Configuration dictionary from config file. rf_arr (Iterable[float]): RF values ac (str): Aircraft identifier from config. Raises: KeyError: If wingspan "b" not defined for aircraft "ac" in config. ValueError: If wingspan "b" is outside of valid range [20 m, 80 m]. Returns: np.ndarray: RF values with wingspan correction """ # pre-conditions if "b" not in config["aircraft"][ac]: raise KeyError(f"Missing 'b' key in config['aircraft']['{ac}'].") if not 20.0 <= config["aircraft"][ac]["b"] <= 80.0: raise ValueError( f"Invalid wingspan {config['aircraft'][ac]['b']}. Must be " "within [20 m, 80 m]." ) v = 0.396 w = 0.0287 b_ref = 35.0 b = config["aircraft"][ac]["b"] corr = (v + w * b) / (v + w * b_ref) return rf_arr * corr
[docs] def calc_total_over_ac( data: dict[str, dict[int, Any]], ac_lst: Iterable[str], ) -> dict[str, dict[int, Any]]: """Add a "TOTAL" entry to `data` by summing the per-year values across all aircraft identifiers. Args: data (dict[str, dict[int, Any]]): Nested dictionary with keys ac, then yr ac_lst (Iterable[str]): List of aircraft identifiers to be used Returns: dict[str, dict[int, Any]]: Shallow copy of data with TOTAL added or overwritten """ out: dict[str, dict[int, Any]] = {k: dict(v) for k, v in data.items()} total: dict[int, float] = defaultdict(float) for ac in ac_lst: for yr, val in data[ac].items(): total[yr] += val out["TOTAL"] = dict(total) return out
[docs] def calc_contrails( ac_lst: Sequence[str], config: dict[str, Any], inv_dict: dict[int, xr.Dataset], full_inv_dict: dict[str, dict[int, xr.Dataset]], ds_cont: xr.Dataset, ) -> dict[str, np.ndarray]: """Contrail calculation loop for a main OpenAirClim run. Args: ac_lst (Sequence[str]): List of aircraft identifiers to be used. config (dict[str, Any]): Configuration dictionary from config file. inv_dict (dict[int, xr.Dataset]): Dictionary of emission inventory xarray datasets. Keys are inventory years. full_inv_dict (dict[str, dict[int, xr.Dataset]]): Nested dictionary of emission inventories. Keys are aircraft identifier, followed by year. ds_cont (xr.Dataset): Dataset of precalculated contrail data. Returns: dict[str, np.ndarray]: Dictionary of RF values over time for each aircraft identifier. """ # define ac_lst without "TOTAL" ac_no_tot = [ac for ac in ac_lst if ac != "TOTAL"] # get contrail grid cont_grid = get_cont_grid(ds_cont) # if necessary, add zero arrays if aircraft not defined in certain # inventory years inv_yrs: list[int] = list(inv_dict) for ac in ac_lst: full_inv_dict[ac] = pad_inv_dict( inv_yrs, full_inv_dict[ac], ["distance"], cont_grid, ac ) # load base inventories if rel_to_base is TRUE if config["inventories"]["rel_to_base"]: full_base_inv_dict = load_base_inventories( config, inv_yrs, cont_grid ) base_ac_lst = list(full_base_inv_dict.keys()) base_ac_lst = [bac for bac in base_ac_lst if bac != "BASE_TOTAL"] # copy ac config settings to base_ac for base_ac in base_ac_lst: ac = base_ac.removeprefix("BASE_") config["aircraft"][base_ac] = config["aircraft"][ac] else: full_base_inv_dict = {} base_ac_lst = [] # initialise storage dictionaries cfdd_dict: dict[str, dict[int, np.ndarray]] = defaultdict(dict) cccov_taup05: dict[str, dict[int, np.ndarray]] = defaultdict(dict) rf_cont_dict: dict[str, np.ndarray] = {} # loop over ac for CFDD calculation for ac in ac_no_tot + base_ac_lst: if ac in ac_no_tot: ac_inv_dict = full_inv_dict[ac] else: ac_inv_dict = full_base_inv_dict[ac] # calculate CFDD cfdd_dict[ac] = calc_cfdd( config, ac_inv_dict, ds_cont, cont_grid, ac ) # calculate total CFDD cfdd_dict = calc_total_over_ac(cfdd_dict, ac_no_tot + base_ac_lst) cfdd_dict_1d = cfdd_to_1d(cfdd_dict, cont_grid) # calculate contrail cirrus coverage (all optical depths) cccov_alltau_tot = calc_cccov_alltau( cfdd_dict["TOTAL"], cont_grid ) # loop over ac for cccov (tau > 0.05) calculation for ac in ac_no_tot + base_ac_lst: # attribute cccov (all tau) to ac att_cccov = contrail_attribution( cccov_alltau_tot, cfdd_dict_1d[ac], cfdd_dict_1d["TOTAL"] ) # calculate cccov (tau > 0.05) cccov_taup05[ac] = calc_cccov_taup05(config, att_cccov, ac) # calculate total cccov (tau > 0.05) cccov_taup05 = calc_total_over_ac( cccov_taup05, ac_no_tot + base_ac_lst ) # calculate total RF rf_cont_tot = calc_cont_rf(cccov_taup05["TOTAL"], cont_grid) # loop over ac for RF calculation for ac in ac_no_tot + base_ac_lst: # attribute RF to ac att_rf = contrail_attribution( rf_cont_tot, cccov_taup05[ac], cccov_taup05["TOTAL"] ) # calculate RF at each inventory year ac_rf_arr = np.array([np.mean(arr) for arr in att_rf.values()]) # apply wingspan correction ac_rf_arr = apply_wingspan_correction(config, ac_rf_arr, ac) # apply time evolution _, ac_rf_cont_dict = apply_evolution( config, {"cont": ac_rf_arr}, inv_dict, inventories_adjusted=True ) # add to rf_cont_dict rf_cont_dict[ac] = ac_rf_cont_dict["cont"] # calculate total rf_cont_dict["TOTAL"] = np.sum(np.stack(list(rf_cont_dict.values())), axis=0) return rf_cont_dict