Source code for aospy.automate

"""Functionality for specifying and cycling through multiple calculations."""
from __future__ import print_function

from distutils.version import LooseVersion
from multiprocessing import cpu_count

import dask
import dask.bag as db
import distributed
import itertools
import logging
import pprint
import traceback

from .calc import Calc, _TIME_DEFINED_REDUCTIONS
from .region import Region
from .var import Var


_OBJ_LIB_STR = 'library'
_PROJECTS_STR = 'projects'
_MODELS_STR = 'models'
_RUNS_STR = 'runs'
_REGIONS_STR = 'regions'
_VARIABLES_STR = 'variables'
_TAG_ATTR_MODIFIERS = dict(all='', default='default_')


[docs]class AospyException(Exception): """Base exception class for the aospy package.""" pass
def _get_attr_by_tag(obj, tag, attr_name): """Get attribute from an object via a string tag. Parameters ---------- obj : object from which to get the attribute attr_name : str Unmodified name of the attribute to be found. The actual attribute that is returned may be modified be 'tag'. tag : str Tag specifying how to modify 'attr_name' by pre-pending it with 'tag'. Must be a key of the _TAG_ATTR_MODIFIERS dict. Returns ------- the specified attribute of obj """ attr_name = _TAG_ATTR_MODIFIERS[tag] + attr_name return getattr(obj, attr_name) def _permuted_dicts_of_specs(specs): """Create {name: value} dict, one each for every permutation. Each permutation becomes a dictionary, with the keys being the attr names and the values being the corresponding value for that permutation. These dicts can then be directly passed to the Calc constructor. """ permuter = itertools.product(*specs.values()) return [dict(zip(specs.keys(), perm)) for perm in permuter] def _merge_dicts(*dict_args): """Merge the given dictionaries into single dict. Given any number of dicts, shallow copy and merge into a new dict, precedence goes to key value pairs in latter dicts. From http://stackoverflow.com/a/26853961/1706640 """ result = {} for dictionary in dict_args: result.update(dictionary) return result def _input_func_py2_py3(): """Find function for reading user input that works on Python 2 and 3. See e.g. http://stackoverflow.com/questions/21731043 """ try: input = raw_input except NameError: import builtins input = builtins.input return input def _user_verify(input_func=_input_func_py2_py3(), prompt='Perform these computations? [y/n] '): """Prompt the user for verification.""" if not input_func(prompt).lower()[0] == 'y': raise AospyException('Execution cancelled by user.') def _get_all_objs_of_type(type_, parent): """Get all attributes of the given type from the given object. Parameters ---------- type_ : The desired type parent : The object from which to get the attributes with type matching 'type_' Returns ------- A list (possibly empty) of attributes from 'parent' """ return set([obj for obj in parent.__dict__.values() if isinstance(obj, type_)])
[docs]class CalcSuite(object): """Suite of Calc objects generated from provided specifications.""" _CORE_SPEC_NAMES = {_OBJ_LIB_STR, _PROJECTS_STR, _MODELS_STR, _RUNS_STR} _AUX_SPEC_NAMES = {_VARIABLES_STR, _REGIONS_STR, 'date_ranges', 'input_time_intervals', 'input_time_datatypes', 'input_time_offsets', 'input_vertical_datatypes', 'output_time_intervals', 'output_time_regional_reductions', 'output_vertical_reductions'} _NAMES_SUITE_TO_CALC = { _PROJECTS_STR: 'proj', _MODELS_STR: 'model', _RUNS_STR: 'run', _VARIABLES_STR: 'var', _REGIONS_STR: 'region', 'date_ranges': 'date_range', 'input_time_intervals': 'intvl_in', 'input_time_datatypes': 'dtype_in_time', 'input_time_offsets': 'time_offset', 'input_vertical_datatypes': 'dtype_in_vert', 'output_time_intervals': 'intvl_out', 'output_time_regional_reductions': 'dtype_out_time', 'output_vertical_reductions': 'dtype_out_vert', } def __init__(self, calc_suite_specs): self._specs_in = calc_suite_specs self._obj_lib = self._specs_in[_OBJ_LIB_STR] def _get_requested_spec(self, obj, spec_name): """Helper to translate user specifications to needed objects.""" requested = self._specs_in[spec_name] if isinstance(requested, str): return _get_attr_by_tag(obj, requested, spec_name) else: return requested def _permute_core_specs(self): """Generate all requested combinations of the core objects.""" obj_trees = [] projects = self._get_requested_spec(self._obj_lib, _PROJECTS_STR) for project in projects: models = self._get_requested_spec(project, _MODELS_STR) for model in models: runs = self._get_requested_spec(model, _RUNS_STR) for run in runs: obj_trees.append({ self._NAMES_SUITE_TO_CALC[_PROJECTS_STR]: project, self._NAMES_SUITE_TO_CALC[_MODELS_STR]: model, self._NAMES_SUITE_TO_CALC[_RUNS_STR]: run, }) return obj_trees def _get_regions(self): """Get the requested regions.""" if self._specs_in[_REGIONS_STR] == 'all': return [_get_all_objs_of_type( Region, getattr(self._obj_lib, 'regions', self._obj_lib) )] else: return [set(self._specs_in[_REGIONS_STR])] def _get_variables(self): """Get the requested variables.""" if self._specs_in[_VARIABLES_STR] == 'all': return _get_all_objs_of_type( Var, getattr(self._obj_lib, 'variables', self._obj_lib) ) else: return set(self._specs_in[_VARIABLES_STR]) def _get_date_ranges(self): """Parse the input to get the desired date ranges.""" if self._specs_in['date_ranges'] == 'default': return ['default'] else: return self._specs_in['date_ranges'] def _get_time_reg_reducts(self): """Parse the input to get the desired spatiotemporal reductions.""" return [self._specs_in['output_time_regional_reductions']] def _get_aux_specs(self): """Get and pre-process all of the non-core specifications.""" # Drop the "core" specifications, which are handled separately. specs = self._specs_in.copy() [specs.pop(core) for core in self._CORE_SPEC_NAMES] specs[_REGIONS_STR] = self._get_regions() specs[_VARIABLES_STR] = self._get_variables() specs['date_ranges'] = self._get_date_ranges() specs['output_time_regional_reductions'] = self._get_time_reg_reducts() return specs def _permute_aux_specs(self): """Generate all permutations of the non-core specifications.""" # Convert to attr names that Calc is expecting. calc_aux_mapping = self._NAMES_SUITE_TO_CALC.copy() # Special case: manually add 'library' to mapping calc_aux_mapping[_OBJ_LIB_STR] = None [calc_aux_mapping.pop(core) for core in self._CORE_SPEC_NAMES] specs = self._get_aux_specs() for suite_name, calc_name in calc_aux_mapping.items(): specs[calc_name] = specs.pop(suite_name) return _permuted_dicts_of_specs(specs) def _combine_core_aux_specs(self): """Combine permutations over core and auxilliary Calc specs.""" all_specs = [] for core_dict in self._permute_core_specs(): for aux_dict in self._permute_aux_specs(): all_specs.append(_merge_dicts(core_dict, aux_dict)) return all_specs
[docs] def create_calcs(self): """Generate a Calc object for each requested parameter combination.""" specs = self._combine_core_aux_specs() for spec in specs: spec['dtype_out_time'] = _prune_invalid_time_reductions(spec) return [Calc(**sp) for sp in specs]
def _prune_invalid_time_reductions(spec): """Prune time reductions of spec with no time dimension.""" valid_reductions = [] if not spec['var'].def_time and spec['dtype_out_time'] is not None: for reduction in spec['dtype_out_time']: if reduction not in _TIME_DEFINED_REDUCTIONS: valid_reductions.append(reduction) else: msg = ("Var {0} has no time dimension " "for the given time reduction " "{1} so this calculation will " "be skipped".format(spec['var'].name, reduction)) logging.info(msg) else: valid_reductions = spec['dtype_out_time'] return valid_reductions def _compute_or_skip_on_error(calc, compute_kwargs): """Execute the Calc, catching and logging exceptions, but don't re-raise. Prevents one failed calculation from stopping a larger requested set of calculations. """ try: return calc.compute(**compute_kwargs) except Exception: msg = ("Skipping aospy calculation `{0}` due to error with the " "following traceback: \n{1}") logging.warning(msg.format(calc, traceback.format_exc())) return None def _submit_calcs_on_client(calcs, client, func): """Submit calculations via dask.bag and a distributed client""" logging.info('Connected to client: {}'.format(client)) if LooseVersion(dask.__version__) < '0.18': dask_option_setter = dask.set_options else: dask_option_setter = dask.config.set with dask_option_setter(get=client.get): return db.from_sequence(calcs).map(func).compute() def _n_workers_for_local_cluster(calcs): """The number of workers used in a LocalCluster An upper bound is set at the cpu_count or the number of calcs submitted, depending on which is smaller. This is to prevent more workers from being started than needed (but also to prevent too many workers from being started in the case that a large number of calcs are submitted). """ return min(cpu_count(), len(calcs)) def _exec_calcs(calcs, parallelize=False, client=None, **compute_kwargs): """Execute the given calculations. Parameters ---------- calcs : Sequence of ``aospy.Calc`` objects parallelize : bool, default False Whether to submit the calculations in parallel or not client : distributed.Client or None The distributed Client used if parallelize is set to True; if None a distributed LocalCluster is used. compute_kwargs : dict of keyword arguments passed to ``Calc.compute`` Returns ------- A list of the values returned by each Calc object that was executed. """ if parallelize: def func(calc): """Wrap _compute_or_skip_on_error to require only the calc argument""" if 'write_to_tar' in compute_kwargs: compute_kwargs['write_to_tar'] = False return _compute_or_skip_on_error(calc, compute_kwargs) if client is None: n_workers = _n_workers_for_local_cluster(calcs) with distributed.LocalCluster(n_workers=n_workers) as cluster: with distributed.Client(cluster) as client: result = _submit_calcs_on_client(calcs, client, func) else: result = _submit_calcs_on_client(calcs, client, func) if compute_kwargs['write_to_tar']: _serial_write_to_tar(calcs) return result else: return [_compute_or_skip_on_error(calc, compute_kwargs) for calc in calcs] def _serial_write_to_tar(calcs): for calc in calcs: if calc.proj.tar_direc_out: for dtype_out_time in calc.dtype_out_time: calc._write_to_tar(dtype_out_time) def _print_suite_summary(calc_suite_specs): """Print summary of requested calculations.""" return ('\nRequested aospy calculations:\n' + pprint.pformat(calc_suite_specs) + '\n')
[docs]def submit_mult_calcs(calc_suite_specs, exec_options=None): """Generate and execute all specified computations. Once the calculations are prepped and submitted for execution, any calculation that triggers any exception or error is skipped, and the rest of the calculations proceed unaffected. This prevents an error in a single calculation from crashing a large suite of calculations. Parameters ---------- calc_suite_specs : dict The specifications describing the full set of calculations to be generated and potentially executed. Accepted keys and their values: library : module or package comprising an aospy object library The aospy object library for these calculations. projects : list of aospy.Proj objects The projects to permute over. models : 'all', 'default', or list of aospy.Model objects The models to permute over. If 'all', use all models in the ``models`` attribute of each ``Proj``. If 'default', use all models in the ``default_models`` attribute of each ``Proj``. runs : 'all', 'default', or list of aospy.Run objects The runs to permute over. If 'all', use all runs in the ``runs`` attribute of each ``Model``. If 'default', use all runs in the ``default_runs`` attribute of each ``Model``. variables : list of aospy.Var objects The variables to be calculated. regions : 'all' or list of aospy.Region objects The region(s) over which any regional reductions will be performed. If 'all', use all regions in the ``regions`` attribute of each ``Proj``. date_ranges : 'default' or a list of tuples The range of dates (inclusive) over which to perform calculations. If 'default', use the ``default_start_date`` and ``default_end_date`` attribute of each ``Run``. Else provide a list of tuples, each containing a pair of start and end dates, such as ``date_ranges=[(start, end)]`` where ``start`` and ``end`` are each ``datetime.datetime`` objects, partial datetime strings (e.g. '0001'), ``np.datetime64`` objects, or ``cftime.datetime`` objects. output_time_intervals : {'ann', season-string, month-integer} The sub-annual time interval over which to aggregate. - 'ann' : Annual mean - season-string : E.g. 'JJA' for June-July-August - month-integer : 1 for January, 2 for February, etc. Each one is a separate reduction, e.g. [1, 2] would produce averages (or other specified time reduction) over all Januaries, and separately over all Februaries. output_time_regional_reductions : list of reduction string identifiers Unlike most other keys, these are not permuted over when creating the :py:class:`aospy.Calc` objects that execute the calculations; each :py:class:`aospy.Calc` performs all of the specified reductions. Accepted string identifiers are: - Gridpoint-by-gridpoint output: - 'av' : Gridpoint-by-gridpoint time-average - 'std' : Gridpoint-by-gridpoint temporal standard deviation - 'ts' : Gridpoint-by-gridpoint time-series - Averages over each region specified via `region`: - 'reg.av', 'reg.std', 'reg.ts' : analogous to 'av', 'std', 'ts' output_vertical_reductions : {None, 'vert_av', 'vert_int'}, optional How to reduce the data vertically: - None : no vertical reduction - 'vert_av' : mass-weighted vertical average - 'vert_int' : mass-weighted vertical integral input_time_intervals : {'annual', 'monthly', 'daily', '#hr'} A string specifying the time resolution of the input data. In '#hr' above, the '#' stands for a number, e.g. 3hr or 6hr, for sub-daily output. These are the suggested specifiers, but others may be used if they are also used by the DataLoaders for the given Runs. input_time_datatypes : {'inst', 'ts', 'av'} What the time axis of the input data represents: - 'inst' : Timeseries of instantaneous values - 'ts' : Timeseries of averages over the period of each time-index - 'av' : A single value averaged over a date range input_vertical_datatypes : {False, 'pressure', 'sigma'}, optional The vertical coordinate system used by the input data: - False : not defined vertically - 'pressure' : pressure coordinates - 'sigma' : hybrid sigma-pressure coordinates input_time_offsets : {None, dict}, optional How to offset input data in time to correct for metadata errors - None : no time offset applied - dict : e.g. ``{'hours': -3}`` to offset times by -3 hours See :py:meth:`aospy.utils.times.apply_time_offset`. exec_options : dict or None (default None) Options regarding how the calculations are reported, submitted, and saved. If None, default settings are used for all options. Currently supported options (each should be either `True` or `False`): - prompt_verify : (default False) If True, print summary of calculations to be performed and prompt user to confirm before submitting for execution. - parallelize : (default False) If True, submit calculations in parallel. - client : distributed.Client or None (default None) The dask.distributed Client used to schedule computations. If None and parallelize is True, a LocalCluster will be started. - write_to_tar : (default True) If True, write results of calculations to .tar files, one for each :py:class:`aospy.Run` object. These tar files have an identical directory structures the standard output relative to their root directory, which is specified via the `tar_direc_out` argument of each Proj object's instantiation. Returns ------- A list of the return values from each :py:meth:`aospy.Calc.compute` call If a calculation ran without error, this value is the :py:class:`aospy.Calc` object itself, with the results of its calculations saved in its ``data_out`` attribute. ``data_out`` is a dictionary, with the keys being the temporal-regional reduction identifiers (e.g. 'reg.av'), and the values being the corresponding result. If any error occurred during a calculation, the return value is None. Raises ------ AospyException If the ``prompt_verify`` option is set to True and the user does not respond affirmatively to the prompt. """ if exec_options is None: exec_options = dict() if exec_options.pop('prompt_verify', False): print(_print_suite_summary(calc_suite_specs)) _user_verify() calc_suite = CalcSuite(calc_suite_specs) calcs = calc_suite.create_calcs() if not calcs: raise AospyException( "The specified combination of parameters yielded zero " "calculations. Most likely, one of the parameters is " "inadvertently empty." ) return _exec_calcs(calcs, **exec_options)