Source code for openairclim.read_netcdf

"""
Methods for reading netCDF input
"""

from pathlib import Path
import logging
import numpy as np
import xarray as xr

# CONSTANTS
INV_SPEC_UNITS = ["kg", "km"]


[docs] def open_netcdf(netcdf): """Converts netCDF file or list of netCDF files to dictionary of xarray Datasets Args: netcdf (str or list): (List of) netCDF file names Returns: dict: Dictionary of xarray Datasets, keys are basenames of input netCDF """ xr_dict = {} if isinstance(netcdf, list) and all(isinstance(ele, str) for ele in netcdf): netcdf_arr = netcdf elif not isinstance(netcdf, list) and isinstance(netcdf, str): netcdf_arr = [netcdf] else: raise TypeError("Argument is not of type str or list of str") for ele in netcdf_arr: netcdf_name = Path(ele).stem xr_dict[netcdf_name] = xr.load_dataset(ele) return xr_dict
[docs] def open_inventories(config, base=False): """Open inventories from config, check attribute sections and time constraints Args: config (dict): Configuration dictionary from config base (bool): If TRUE, loads base inventory, else input inventory Raises: IndexError: if no inv_year is within time_range, for evolution_type = "norm" or "scaling" or False IndexError: if time_range is not within evolution_time, for evolution_type = "norm" or "scaling" IndexError: if no inv_year is within evolution_time, for evolution_tpye = "norm" or "scaling" IndexError: if time_range first and last year are not in inv_years, for evolution_type = "scaling" Returns: dict: Dictionary of xarray Datasets, keys are years of input inventories """ # initialise array of inventories inv_arr = [] # if base is TRUE, base inventories are loaded if base: if "dir" in config["inventories"]["base"]: inv_dir = config["inventories"]["base"]["dir"] else: inv_dir = "" files_arr = config["inventories"]["base"]["files"] # otherwise, load input inventories else: if "dir" in config["inventories"]: inv_dir = config["inventories"]["dir"] else: inv_dir = "" files_arr = config["inventories"]["files"] # load files for inv_file in files_arr: inv_arr.append(inv_dir + inv_file) time_config = config["time"]["range"] time_range = np.arange(time_config[0], time_config[1], time_config[2], dtype=int) # Open inventories as dictionary of xarray Datasets inv_inp_dict = open_netcdf(inv_arr) # Check attribute sections for all emission species given in config check_spec_attributes(config, inv_inp_dict) # Get years of inventories, keys of new dictionary are years # Check time constraint: At least one inv_year must be within time_range # Inventories relevant for simulation: inv_year overlapping with time_range, # For all evolution_type: "norm" or "scaling" or False inv_dict = {} for inv_name, inv in inv_inp_dict.items(): # Update longitudes to be between 0 and 360 degrees if inv.lon.min() < 0.0: logging.warning( "Longitude values have been automatically updated to be between " "0 and 360 degrees to match pre-calculated data." ) inv = inv.assign(lon=inv.lon % 360.0) try: year = inv.attrs["Inventory_Year"] if time_range[0] <= year <= time_range[-1]: inv_dict[year] = inv else: pass except KeyError as exc: msg = "No Inventory_Year attribute found in inventory" + inv_name raise KeyError(msg) from exc # Check if new dictionary of inventories empty and sort if inv_dict: inv_dict = dict(sorted(inv_dict.items())) inv_years = list(inv_dict.keys()) else: raise IndexError("At least one inv_year must be within time_range!") # Get evolution_type evolution_type = get_evolution_type(config) if evolution_type in ("scaling", "norm"): time_dir = config["time"]["dir"] evolution_name = config["time"]["file"] evolution_file = time_dir + evolution_name evolution = xr.load_dataset(evolution_file) try: evolution_time = evolution.time.values except AttributeError as exc: raise AttributeError("No time coordinate found in evolution file") from exc # Check time constraint: time_range must be within evolution_time if ( time_range[0] >= evolution_time[0] and time_range[0] <= evolution_time[-1] and time_range[-1] >= evolution_time[0] and time_range[-1] <= evolution_time[-1] ): pass else: raise IndexError("time_range must be within evolution_time!") # Check time constraint: At least one inv_year must be within evolution_time overlap = False for year in inv_years: if evolution_time[0] <= year <= evolution_time[-1]: overlap = True else: pass if not overlap: raise IndexError("At least one inv_year must be within evolution_time!") # For evolution_type = False, check if part of time_range is outside # of inventories interval. If so, print warning elif evolution_type is False: if time_range[0] not in inv_years or time_range[-1] not in inv_years: logging.warning( "time_range is partly outside interval of inventories, " "emissions are assumed to be zero during that time period!" ) else: raise ValueError("evolution_type must be either 'scaling', 'norm' or False.") # evolution_type = "scaling" # Check time constraint: time_range first and last year must be inventory years if evolution_type == "scaling": if time_range[0] not in inv_years or time_range[-1] not in inv_years: raise IndexError("time_range first and last year must be inventory years!") else: pass logging.info( "Emission inventories openend, attribute sections " "and time constraints checked successfully." ) return inv_dict
[docs] def split_inventory_by_aircraft(config, inv_dict): """Split dictionary of emission inventories by aircraft identifiers defined in the config file. Args: config (dict): Configuration dictionary from config inv_dict (dict): Dictionary of emission inventory xarrays, keys are inventory years. Returns: dict: Nested dictionary of emission inventories. Keys are aircraft identifier, followed by year. """ # check which aircraft are defined in inventories and config ac_lst_inv = np.array( sorted( { ac for _, inv in inv_dict.items() if "ac" in inv for ac in np.unique(inv.ac.data) } ), dtype=str, ) ac_lst_config = np.array(config["aircraft"]["types"], dtype=str) # TEMPORARY # since contrail attribution methodologies have not yet been implemented, # contrails cannot be calculated for multiple aircraft if len(ac_lst_inv) > 1 and "cont" in config["species"]["out"]: raise ValueError( "In the current version of OpenAirClim, it is not possible to " "calculate the contrail climate impact for multiple aircraft " "within the same emission inventory." ) # check to ensure all aircraft are defined in config if not np.isin(ac_lst_inv, ac_lst_config).all(): missing = ac_lst_inv[~np.isin(ac_lst_inv, ac_lst_config)] raise ValueError( "The following aircraft identifiers are present in the emission " f"inventories but not defined in config: {missing}." ) # if no "ac" data variable, check whether "DEFAULT" is defined in config # only necessary if contrails are to be calculated if ac_lst_inv.size == 0 and "cont" in config["species"]["out"]: if "DEFAULT" in ac_lst_config: ac_lst = np.array(["DEFAULT"]) logging.info( "No ac data variable found in the emission inventories. " "Reverting to 'DEFAULT' aircraft from config file." ) else: raise ValueError( "No ac data variable found in the emission inventories and " "'DEFAULT' aircraft not defined in config. G_250, eff_fac and " "PMrel parameters are required for contrail calculations." ) else: ac_lst = ac_lst_inv # initialise full dictionary full_inv_dict = { ac: {year: {} for year in inv_dict.keys()} for ac in np.append(ac_lst, "TOTAL") } # loop through emission inventories for year, inv in inv_dict.items(): # if emission inventory does not contain "ac" data variable if "ac" not in inv.data_vars: if "DEFAULT" in full_inv_dict: full_inv_dict["DEFAULT"].update({year: inv}) full_inv_dict["TOTAL"].update({year: inv}) else: for ac in ac_lst: # if ac in inv, add subset of inventory if ac in inv.ac: full_inv_dict[ac].update({year: inv.where(inv.ac == ac, drop=True)}) # if ac not in inv, add a zero-value inventory else: vars_in_inv = set(inv.data_vars) data_vars = { v: (("index",), [0.0]) for v in sorted(vars_in_inv - {"plev", "ac"}) } data_vars["plev"] = (("index",), [300.0]) # random plev zero_inv = xr.Dataset( data_vars=data_vars, coords={"index": np.array([0], dtype=np.int64)}, attrs={"Inventory_Year": year}, ) full_inv_dict[ac].update({year: zero_inv}) # add warning logging.warning( "Created zero-inventory for ac %s in year %s", ac, year ) # add "TOTAL" full_inv_dict["TOTAL"].update({year: inv.copy().drop_vars("ac")}) return full_inv_dict
[docs] def get_evolution_type(config): """Get evolution type Args: config (dict): Configuration dictionary from config Raises: ValueError: if type attribute in evolution file is invalid Returns: str, bool: evolution_tpye: norm or scaling or False """ evolution_type = False if "file" in config["time"]: time_dir = config["time"]["dir"] file_name = config["time"]["file"] file_path = time_dir + file_name try: evolution = xr.load_dataset(file_path) evolution_type = evolution.attrs["Type"] except ValueError as exc: raise ValueError("No evolution file found") from exc except KeyError as exc: raise KeyError("No Type attribute found in evolution file") from exc if evolution_type in ("norm", "scaling"): pass else: raise ValueError( "Type attribute in evolution file must be either scaling or norm." ) else: pass return evolution_type
[docs] def open_netcdf_from_config(config, section, species, resp_type): """Open netcdf files and convert to xarray Datasets for given section in config and given species Args: config (dict): Configuration dictionary from config section (str): Section in config species (list): List of considered species resp_type (str): Response type, e.g. "conc", "rf" Returns: dict: Dictionary of xarray Datasets, one Dataset for each species, keys are species names """ xr_dict = {} section_dict = config[section] dir_name = section_dict["dir"] for spec in species: inp_file = dir_name + section_dict[spec][resp_type]["file"] xr_dict[spec] = xr.load_dataset(inp_file) return xr_dict
[docs] def get_results(config: dict, ac="TOTAL") -> tuple[dict, dict, dict, dict]: """Get the simulation results from the output netCDF file. Args: config (dict): Configuration from config file. ac (str, optional): Aircraft identifier, defaults to TOTAL Returns: dict: dictionaries of numpy arrays containing the simulation results, keys are species. """ results_file = config["output"]["dir"] + config["output"]["name"] + ".nc" results = xr.load_dataset(results_file) emis_dict = {} conc_dict = {} rf_dict = {} dtemp_dict = {} for var_name, value_arr in results.items(): # handle multi-aircraft results if "ac" in value_arr.dims: if ac in value_arr.coords["ac"].values: value_arr = value_arr.sel(ac=ac) else: raise ValueError( f"'ac' coordinate exists in {var_name}, but no '{ac}'" "entry found." ) var_name = var_name.split("_") result_type = var_name[0] spec = var_name[-1] if result_type == "emis": emis_dict[spec] = value_arr elif result_type == "conc": conc_dict[spec] = value_arr elif result_type == "RF": rf_dict[spec] = value_arr elif result_type == "dT": dtemp_dict[spec] = value_arr else: pass return emis_dict, conc_dict, rf_dict, dtemp_dict
[docs] def check_spec_attributes(config, inv_dict): """Check emission attributes in inventories for given species in config TODO Expand list of possible emission units Args: config (dict): Configuration dictionary from config inv_dict (dict): Dictionary of xarray Datasets, keys are years of input inventories Raises: KeyError: if incorrect units found """ species = config["species"]["inv"] for year, inv in inv_dict.items(): for spec in species: try: attrs = inv[spec].attrs except KeyError as exc: msg = ( "No attributes found in inventory for year " + str(year) + " and species " + spec ) raise KeyError(msg) from exc try: units = attrs["units"] except KeyError as exc: msg = ( "No units founds in inventory for year " + str(year) + " and species " + spec ) raise KeyError(msg) from exc if units in INV_SPEC_UNITS: pass else: msg = ( "Incorrect units found in inventory for year " + str(year) + " and species " + spec ) raise KeyError(msg)