Source code for aospy.calc

"""Functionality for performing user-specified calculations on aospy data."""
from collections import OrderedDict
import logging
import os
import shutil
import subprocess
import tarfile
from time import ctime

import numpy as np
import xarray as xr

from ._constants import GRAV_EARTH
from .var import Var
from . import internal_names
from . import utils

logging.basicConfig(level=logging.INFO)


_P_VARS = {internal_names.ETA_STR: utils.vertcoord.p_eta,
           'pressure': utils.vertcoord.p_level}
_DP_VARS = {internal_names.ETA_STR: utils.vertcoord.dp_eta,
            'pressure': utils.vertcoord.dp_level}
_TIME_DEFINED_REDUCTIONS = ['av', 'std', 'ts', 'reg.av', 'reg.std', 'reg.ts']


def _replace_pressure(arguments, dtype_in_vert):
    """Replace p and dp Vars with appropriate Var objects specific to
    the dtype_in_vert."""
    arguments_out = []
    for arg in arguments:
        if isinstance(arg, Var):
            if arg.name == 'p':
                arguments_out.append(_P_VARS[dtype_in_vert])
            elif arg.name == 'dp':
                arguments_out.append(_DP_VARS[dtype_in_vert])
            else:
                arguments_out.append(arg)
        else:
            arguments_out.append(arg)
    return arguments_out


[docs]class Calc(object): """Class for executing, saving, and loading a single computation.""" ARR_XARRAY_NAME = 'aospy_result' _grid_coords = [internal_names.LAT_STR, internal_names.LAT_BOUNDS_STR, internal_names.LON_STR, internal_names.LON_BOUNDS_STR, internal_names.ZSURF_STR, internal_names.SFC_AREA_STR, internal_names.LAND_MASK_STR, internal_names.PK_STR, internal_names.BK_STR, internal_names.PHALF_STR, internal_names.PFULL_STR, internal_names.PLEVEL_STR] _grid_attrs = OrderedDict([(key, internal_names.GRID_ATTRS[key]) for key in _grid_coords]) def __str__(self): """String representation of the object.""" return "<aospy.Calc instance: " + ', '.join( (self.name, self.proj.name, self.model.name, self.run.name) ) + ">" __repr__ = __str__ def _dir_out(self): """Create string of the data directory to save individual .nc files.""" return os.path.join(self.proj.direc_out, self.proj.name, self.model.name, self.run.name, self.name) def _dir_tar_out(self): """Create string of the data directory to store a tar file.""" return os.path.join(self.proj.tar_direc_out, self.proj.name, self.model.name, self.run.name) def _file_name(self, dtype_out_time, extension='nc'): """Create the name of the aospy file.""" if dtype_out_time is None: dtype_out_time = '' out_lbl = utils.io.data_out_label(self.intvl_out, dtype_out_time, dtype_vert=self.dtype_out_vert) in_lbl = utils.io.data_in_label(self.intvl_in, self.dtype_in_time, self.dtype_in_vert) start_year = utils.times.infer_year(self.start_date) end_year = utils.times.infer_year(self.end_date) yr_lbl = utils.io.yr_label((start_year, end_year)) return '.'.join( [self.name, out_lbl, in_lbl, self.model.name, self.run.name, yr_lbl, extension] ).replace('..', '.') def _path_out(self, dtype_out_time): return os.path.join(self.dir_out, self.file_name[dtype_out_time]) def _path_tar_out(self): return os.path.join(self.dir_tar_out, 'data.tar') @staticmethod def _print_verbose(*args): """Print diagnostic message.""" try: return '{0} {1} ({2})'.format(args[0], args[1], ctime()) except IndexError: return '{0} ({1})'.format(args[0], ctime())
[docs] def __init__(self, proj=None, model=None, run=None, var=None, date_range=None, region=None, intvl_in=None, intvl_out=None, dtype_in_time=None, dtype_in_vert=None, dtype_out_time=None, dtype_out_vert=None, level=None, time_offset=None): """Instantiate a Calc object. Parameters ---------- proj : aospy.Proj object The project for this calculation. model : aospy.Model object The model for this calculation. run : aospy.Run object The run for this calculation. var : aospy.Var object The variable for this calculation. region : sequence of aospy.Region objects The region(s) over which any regional reductions will be performed. date_range : tuple of datetime.datetime objects The range of dates over which to perform calculations. intvl_in : {None, 'annual', 'monthly', 'daily', '6hr', '3hr'}, optional The time resolution of the input data. dtype_in_time : {None, 'inst', 'ts', 'av', 'av_ts'}, optional What the time axis of the input data represents: - 'inst' : Timeseries of instantaneous values - 'ts' : Timeseries of averages over the period of each time-index - 'av' : A single value averaged over a date range dtype_in_vert : {None, 'pressure', 'sigma'}, optional The vertical coordinate system used by the input data: - None : not defined vertically - 'pressure' : pressure coordinates - 'sigma' : hybrid sigma-pressure coordinates intvl_out : {'ann', season-string, month-integer} The sub-annual time interval over which to compute: - 'ann' : Annual mean - season-string : E.g. 'JJA' for June-July-August - month-integer : 1 for January, 2 for February, etc. dtype_out_time : tuple with elements being one or more of: - Gridpoint-by-gridpoint output: - 'av' : Gridpoint-by-gridpoint time-average - 'std' : Gridpoint-by-gridpoint temporal standard deviation - 'ts' : Gridpoint-by-gridpoint time-series - Averages over each region specified via `region`: - 'reg.av', 'reg.std', 'reg.ts' : analogous to 'av', 'std', 'ts' dtype_out_vert : {None, 'vert_av', 'vert_int'}, optional How to reduce the data vertically: - None : no vertical reduction (i.e. output is defined vertically) - 'vert_av' : mass-weighted vertical average - 'vert_int' : mass-weighted vertical integral time_offset : {None, dict}, optional How to offset input data in time to correct for metadata errors - None : no time offset applied - dict : e.g. ``{'hours': -3}`` to offset times by -3 hours See :py:meth:`aospy.utils.times.apply_time_offset`. """ if run not in model.runs: raise AttributeError("Model '{0}' has no run '{1}'. Calc object " "will not be generated.".format(model, run)) self.proj = proj self.model = model self.run = run self.default_start_date = self.run.default_start_date self.default_end_date = self.run.default_end_date self.data_loader = self.run.data_loader self.var = var self.name = self.var.name self.domain = self.var.domain self.def_time = self.var.def_time self.def_vert = self.var.def_vert logging.debug(self._print_verbose('Initializing Calc ' 'instance:', self.__str__())) try: self.function = self.var.func except AttributeError: self.function = lambda x: x if getattr(self.var, 'variables', False): self.variables = self.var.variables else: self.variables = (self.var,) self.intvl_in = intvl_in self.intvl_out = intvl_out self.dtype_in_time = dtype_in_time self.dtype_in_vert = dtype_in_vert if isinstance(dtype_out_time, (list, tuple)): self.dtype_out_time = tuple(dtype_out_time) else: self.dtype_out_time = tuple([dtype_out_time]) if not self.def_time: for reduction in self.dtype_out_time: if reduction in _TIME_DEFINED_REDUCTIONS: msg = ("Var {0} has no time dimension " "for the given time reduction " "{1}".format(self.name, reduction)) raise ValueError(msg) self.level = level self.dtype_out_vert = dtype_out_vert self.region = region self.months = utils.times.month_indices(intvl_out) if date_range == 'default': self.start_date = utils.times.ensure_datetime( self.run.default_start_date) self.end_date = utils.times.ensure_datetime( self.run.default_end_date) else: self.start_date = utils.times.ensure_datetime(date_range[0]) self.end_date = utils.times.ensure_datetime(date_range[-1]) self.time_offset = time_offset self.data_loader_attrs = dict( domain=self.domain, intvl_in=self.intvl_in, dtype_in_vert=self.dtype_in_vert, dtype_in_time=self.dtype_in_time, intvl_out=self.intvl_out) self.model.set_grid_data() self.dir_out = self._dir_out() self.dir_tar_out = self._dir_tar_out() self.file_name = {d: self._file_name(d) for d in self.dtype_out_time} self.path_out = {d: self._path_out(d) for d in self.dtype_out_time} self.path_tar_out = self._path_tar_out() self.data_out = {}
def _to_desired_dates(self, arr): """Restrict the xarray DataArray or Dataset to the desired months.""" times = utils.times.extract_months( arr[internal_names.TIME_STR], self.months ) return arr.sel(time=times) def _add_grid_attributes(self, ds): """Add model grid attributes to a dataset""" for name_int, names_ext in self._grid_attrs.items(): ds_coord_name = set(names_ext).intersection(set(ds.coords) | set(ds.data_vars)) model_attr = getattr(self.model, name_int, None) if ds_coord_name and (model_attr is not None): # Force coords to have desired name. ds = ds.rename({list(ds_coord_name)[0]: name_int}) ds = ds.set_coords(name_int) if not np.array_equal(ds[name_int], model_attr): if np.allclose(ds[name_int], model_attr): msg = ("Values for '{0}' are nearly (but not exactly) " "the same in the Run {1} and the Model {2}. " "Therefore replacing Run's values with the " "model's.".format(name_int, self.run, self.model)) logging.info(msg) ds[name_int].values = model_attr.values else: msg = ("Model coordinates for '{0}' do not match those" " in Run: {1} vs. {2}" "".format(name_int, ds[name_int], model_attr)) logging.info(msg) else: # Bring in coord from model object if it exists. ds = ds.load() if model_attr is not None: ds[name_int] = model_attr ds = ds.set_coords(name_int) if (self.dtype_in_vert == 'pressure' and internal_names.PLEVEL_STR in ds.coords): self.pressure = ds.level return ds def _get_input_data(self, var, start_date, end_date): """Get the data for a single variable over the desired date range.""" logging.info(self._print_verbose("Getting input data:", var)) if isinstance(var, (float, int)): return var else: cond_pfull = ((not hasattr(self, internal_names.PFULL_STR)) and var.def_vert and self.dtype_in_vert == internal_names.ETA_STR) data = self.data_loader.recursively_compute_variable( var, start_date, end_date, self.time_offset, self.model, **self.data_loader_attrs) name = data.name data = self._add_grid_attributes(data.to_dataset(name=data.name)) data = data[name] if cond_pfull: try: self.pfull_coord = data[internal_names.PFULL_STR] except KeyError: pass # Force all data to be at full pressure levels, not half levels. bool_to_pfull = (self.dtype_in_vert == internal_names.ETA_STR and var.def_vert == internal_names.PHALF_STR) if bool_to_pfull: data = utils.vertcoord.to_pfull_from_phalf(data, self.pfull_coord) if var.def_time: # Restrict to the desired dates within each year. if self.dtype_in_time != 'av': return self._to_desired_dates(data) else: return data def _get_all_data(self, start_date, end_date): """Get the needed data from all of the vars in the calculation.""" return [self._get_input_data(var, start_date, end_date) for var in _replace_pressure(self.variables, self.dtype_in_vert)] def _local_ts(self, *data): """Perform the computation at each gridpoint and time index.""" return self.function(*data).rename(self.name) def _compute(self, data): """Perform the calculation.""" local_ts = self._local_ts(*data) dt = local_ts[internal_names.TIME_WEIGHTS_STR] # Convert dt to units of days to prevent overflow dt = dt / np.timedelta64(1, 'D') return local_ts, dt def _compute_full_ts(self, data): """Perform calculation and create yearly timeseries at each point.""" # Get results at each desired timestep and spatial point. full_ts, dt = self._compute(data) # Vertically integrate. vert_types = ('vert_int', 'vert_av') if self.dtype_out_vert in vert_types and self.var.def_vert: dp = self._get_input_data(_DP_VARS[self.dtype_in_vert], self.start_date, self.end_date) full_ts = utils.vertcoord.int_dp_g(full_ts, dp) if self.dtype_out_vert == 'vert_av': ps = self._get_input_data(utils.vertcoord.ps, self.start_date, self.end_date) full_ts *= (GRAV_EARTH / ps) return full_ts, dt def _full_to_yearly_ts(self, arr, dt): """Average the full timeseries within each year.""" time_defined = self.def_time and not ('av' in self.dtype_in_time) if time_defined: arr = utils.times.yearly_average(arr, dt) return arr def _time_reduce(self, arr, reduction): """Perform the specified time reduction on a local time-series.""" if self.dtype_in_time == 'av' or not self.def_time: return arr reductions = { 'ts': lambda xarr: xarr, 'av': lambda xarr: xarr.mean(internal_names.YEAR_STR), 'std': lambda xarr: xarr.std(internal_names.YEAR_STR), } try: return reductions[reduction](arr) except KeyError: raise ValueError("Specified time-reduction method '{}' is not " "supported".format(reduction))
[docs] def region_calcs(self, arr, func): """Perform a calculation for all regions.""" # Get pressure values for data output on hybrid vertical coordinates. bool_pfull = (self.def_vert and self.dtype_in_vert == internal_names.ETA_STR and self.dtype_out_vert is False) if bool_pfull: pfull_data = self._get_input_data(_P_VARS[self.dtype_in_vert], self.start_date, self.end_date) pfull = self._full_to_yearly_ts( pfull_data, arr[internal_names.TIME_WEIGHTS_STR] ).rename('pressure') # Loop over the regions, performing the calculation. reg_dat = {} for reg in self.region: # Just pass along the data if averaged already. if 'av' in self.dtype_in_time: data_out = reg.ts(arr) # Otherwise perform the calculation. else: method = getattr(reg, func) data_out = method(arr) if bool_pfull: # Don't apply e.g. standard deviation to coordinates. if func not in ['av', 'ts']: method = reg.ts # Convert Pa to hPa coord = method(pfull) * 1e-2 data_out = data_out.assign_coords( **{reg.name + '_pressure': coord} ) reg_dat.update(**{reg.name: data_out}) return xr.Dataset(reg_dat)
def _apply_all_time_reductions(self, data): """Apply all requested time reductions to the data.""" logging.info(self._print_verbose("Applying desired time-" "reduction methods.")) reduc_specs = [r.split('.') for r in self.dtype_out_time] reduced = {} for reduc, specs in zip(self.dtype_out_time, reduc_specs): func = specs[-1] if 'reg' in specs: reduced.update({reduc: self.region_calcs(data, func)}) else: reduced.update({reduc: self._time_reduce(data, func)}) return OrderedDict(sorted(reduced.items(), key=lambda t: t[0]))
[docs] def compute(self, write_to_tar=True): """Perform all desired calculations on the data and save externally.""" data = self._get_all_data(self.start_date, self.end_date) logging.info('Computing timeseries for {0} -- ' '{1}.'.format(self.start_date, self.end_date)) full, full_dt = self._compute_full_ts(data) full_out = self._full_to_yearly_ts(full, full_dt) reduced = self._apply_all_time_reductions(full_out) logging.info("Writing desired gridded outputs to disk.") for dtype_time, data in reduced.items(): data = _add_metadata_as_attrs(data, self.var.units, self.var.description, self.dtype_out_vert) self.save(data, dtype_time, dtype_out_vert=self.dtype_out_vert, save_files=True, write_to_tar=write_to_tar) return self
def _save_files(self, data, dtype_out_time): """Save the data to netcdf files in direc_out.""" path = self.path_out[dtype_out_time] if not os.path.isdir(self.dir_out): os.makedirs(self.dir_out) if 'reg' in dtype_out_time: try: reg_data = xr.open_dataset(path) except (EOFError, RuntimeError, IOError): reg_data = xr.Dataset() reg_data.update(data) data_out = reg_data else: data_out = data if isinstance(data_out, xr.DataArray): data_out = xr.Dataset({self.name: data_out}) data_out.to_netcdf(path, engine='netcdf4', format='NETCDF3_64BIT') def _write_to_tar(self, dtype_out_time): """Add the data to the tar file in tar_out_direc.""" # When submitted in parallel and the directory does not exist yet # multiple processes may try to create a new directory; this leads # to an OSError for all processes that tried to make the # directory, but were later than the first. try: os.makedirs(self.dir_tar_out) except OSError: pass # tarfile 'append' mode won't overwrite the old file, which we want. # So open in 'read' mode, extract the file, and then delete it. # But 'read' mode throws OSError if file doesn't exist: make it first. utils.io.dmget([self.path_tar_out]) with tarfile.open(self.path_tar_out, 'a') as tar: pass with tarfile.open(self.path_tar_out, 'r') as tar: old_data_path = os.path.join(self.dir_tar_out, self.file_name[dtype_out_time]) try: tar.extract(self.file_name[dtype_out_time], path=old_data_path) except KeyError: pass else: # The os module treats files on archive as non-empty # directories, so can't use os.remove or os.rmdir. shutil.rmtree(old_data_path) retcode = subprocess.call([ "tar", "--delete", "--file={}".format(self.path_tar_out), self.file_name[dtype_out_time] ]) if retcode: msg = ("The 'tar' command to save your aospy output " "exited with an error. Most likely, this is due " "to using an old version of 'tar' (especially if " "you are on a Mac). Consider installing a newer " "version of 'tar' or disabling tar output by " "setting `write_to_tar=False` in the " "`calc_exec_options` argument of " "`submit_mult_calcs`.") logging.warn(msg) with tarfile.open(self.path_tar_out, 'a') as tar: tar.add(self.path_out[dtype_out_time], arcname=self.file_name[dtype_out_time]) def _update_data_out(self, data, dtype): """Append the data of the given dtype_out to the data_out attr.""" try: self.data_out.update({dtype: data}) except AttributeError: self.data_out = {dtype: data}
[docs] def save(self, data, dtype_out_time, dtype_out_vert=False, save_files=True, write_to_tar=False): """Save aospy data to data_out attr and to an external file.""" self._update_data_out(data, dtype_out_time) if save_files: self._save_files(data, dtype_out_time) if write_to_tar and self.proj.tar_direc_out: self._write_to_tar(dtype_out_time) logging.info('\t{}'.format(self.path_out[dtype_out_time]))
def _load_from_disk(self, dtype_out_time, dtype_out_vert=False, region=False): """Load aospy data saved as netcdf files on the file system.""" ds = xr.open_dataset(self.path_out[dtype_out_time]) if region: arr = ds[region.name] # Use region-specific pressure values if available. if (self.dtype_in_vert == internal_names.ETA_STR and not dtype_out_vert): reg_pfull_str = region.name + '_pressure' arr = arr.drop([r for r in arr.coords.iterkeys() if r not in (internal_names.PFULL_STR, reg_pfull_str)]) # Rename pfull to pfull_ref always. arr = arr.rename({internal_names.PFULL_STR: internal_names.PFULL_STR + '_ref'}) # Rename region_pfull to pfull if its there. if hasattr(arr, reg_pfull_str): return arr.rename({reg_pfull_str: internal_names.PFULL_STR}) return arr return arr return ds[self.name] def _load_from_tar(self, dtype_out_time, dtype_out_vert=False): """Load data save in tarball form on the file system.""" path = os.path.join(self.dir_tar_out, 'data.tar') utils.io.dmget([path]) with tarfile.open(path, 'r') as data_tar: ds = xr.open_dataset( data_tar.extractfile(self.file_name[dtype_out_time]) ) return ds[self.name]
[docs] def load(self, dtype_out_time, dtype_out_vert=False, region=False, plot_units=False, mask_unphysical=False): """Load the data from the object if possible or from disk.""" msg = ("Loading data from disk for object={0}, dtype_out_time={1}, " "dtype_out_vert={2}, and region=" "{3}".format(self, dtype_out_time, dtype_out_vert, region)) logging.info(msg + ' ({})'.format(ctime())) # Grab from the object if its there. try: data = self.data_out[dtype_out_time] except (AttributeError, KeyError): # Otherwise get from disk. Try scratch first, then archive. try: data = self._load_from_disk(dtype_out_time, dtype_out_vert, region=region) except IOError: data = self._load_from_tar(dtype_out_time, dtype_out_vert) # Copy the array to self.data_out for ease of future access. self._update_data_out(data, dtype_out_time) # Apply desired plotting/cleanup methods. if mask_unphysical: data = self.var.mask_unphysical(data) if plot_units: data = self.var.to_plot_units(data, dtype_vert=dtype_out_vert) return data
def _add_metadata_as_attrs(data, units, description, dtype_out_vert): """Add metadata attributes to Dataset or DataArray""" if isinstance(data, xr.DataArray): return _add_metadata_as_attrs_da(data, units, description, dtype_out_vert) else: for name, arr in data.data_vars.items(): _add_metadata_as_attrs_da(arr, units, description, dtype_out_vert) return data def _add_metadata_as_attrs_da(data, units, description, dtype_out_vert): """Add metadata attributes to DataArray""" if dtype_out_vert == 'vert_int': if units != '': units = '(vertical integral of {0}): {0} kg m^-2)'.format(units) else: units = '(vertical integral of quantity with unspecified units)' data.attrs['units'] = units data.attrs['description'] = description return data