src.data_manager module

Base classes implementing logic for querying, fetching and preprocessing model data requested by the PODs.

class src.data_manager.AbstractQueryMixin[source]

Bases: abc.ABC

abstract query_dataset(var)[source]

Sets data attribute on var or raises an exception.

setup_query()[source]

Called once, before the iterative query_and_fetch() process starts. Use to, eg, initialize database or remote filesystem connections.

pre_query_hook(vars)[source]

Called before querying the presence of a new batch of variables.

set_experiment()[source]

Called after querying the presence of a new batch of variables, to filter or otherwise ensure that the returned DataKeys for all variables comes from the same experimental run of the model, by setting the status attribute of those DataKeys to ACTIVE.

post_query_hook(vars)[source]

Called after select_experiment(), after each query of a new batch of variables.

tear_down_query()[source]

Called once, after the iterative query_and_fetch() process ends. Use to, eg, close database or remote filesystem connections.

_abc_impl = <_abc_data object>
class src.data_manager.AbstractFetchMixin[source]

Bases: abc.ABC

abstract fetch_dataset(var, data_key)[source]

Fetches data corresponding to data_key. Populates its local_data attribute with a list of identifiers for successfully fetched data (paths to locally downloaded copies of data).

setup_fetch()[source]

Called once, before the iterative query_and_fetch() process starts. Use to, eg, initialize database or remote filesystem connections.

pre_fetch_hook(vars)[source]

Called before fetching each batch of query results.

post_fetch_hook(vars)[source]

Called after fetching each batch of query results.

tear_down_fetch()[source]

Called once, after the iterative query_and_fetch() process ends. Use to, eg, close database or remote filesystem connections.

_abc_impl = <_abc_data object>
class src.data_manager.AbstractDataSource(*args, **kwargs)[source]

Bases: src.data_manager.AbstractQueryMixin, src.data_manager.AbstractFetchMixin

pre_query_and_fetch_hook()[source]

Called once, before the iterative query_and_fetch() process starts. Use to, eg, initialize database or remote filesystem connections.

post_query_and_fetch_hook()[source]

Called once, after the iterative query_and_fetch() process ends. Use to, eg, close database or remote filesystem connections.

_abc_impl = <_abc_data object>
class src.data_manager.DataKeyBase(*args, **kwargs)[source]

Bases: src.core.MDTFObjectBase

value: Any = sentinel.Mandatory
expt_key: Any = None
local_data: list
property _log_name
property _children

Iterable of child objects associated with this object.

_deactivation_log_level = 20
abstract remote_data()[source]

Returns paths, urls, etc. to be used as input to a fetch_data method to specify how this dataset is fetched.

_abc_impl = <_abc_data object>
class src.data_manager.DataSourceAttributesBase(CASENAME: str = sentinel.Mandatory, FIRSTYR: str = sentinel.Mandatory, LASTYR: str = sentinel.Mandatory, CASE_ROOT_DIR: str = '', convention: str = sentinel.Mandatory, log: dataclasses.InitVar = <Logger src.data_manager (WARNING)>)[source]

Bases: object

Class defining attributes that any DataSource needs to specify:

  • CASENAME: User-supplied label to identify output of this run of the package.

  • FIRSTYR, LASTYR, date_range: Analysis period, specified as a closed interval (i.e. running from 1 Jan of FIRSTYR through 31 Dec of LASTYR).

  • CASE_ROOT_DIR: Root directory containing input model data. Different DataSources may interpret this differently.

  • convention: name of the variable naming convention used by the source of model data.

CASENAME: str = sentinel.Mandatory
FIRSTYR: str = sentinel.Mandatory
LASTYR: str = sentinel.Mandatory
date_range: util.DateRange
CASE_ROOT_DIR: str = ''
convention: str = sentinel.Mandatory
log: dataclasses.InitVar = <Logger src.data_manager (WARNING)>
_set_case_root_dir(log=<Logger src.data_manager (WARNING)>)[source]
class src.data_manager.PodVarTuple(pod, var)

Bases: tuple

_asdict()

Return a new OrderedDict which maps field names to their values.

_field_defaults = {}
_fields = ('pod', 'var')
_fields_defaults = {}
classmethod _make(iterable)

Make a new PodVarTuple object from a sequence or iterable

_replace(**kwds)

Return a new PodVarTuple object replacing specified fields with new values

property pod

Alias for field number 0

property var

Alias for field number 1

class src.data_manager.DataSourceBase(*args, **kwargs)[source]

Bases: src.core.MDTFObjectBase, src.util.logs.CaseLoggerMixin, src.data_manager.AbstractDataSource

Base class for handling the data needs of PODs. Executes query for requested model data against the remote data source, fetches the required data locally, preprocesses it, and performs cleanup/formatting of the POD’s output.

_AttributesClass = <src.util.basic._AbstractAttributePlaceholder object>
_DiagnosticClass = <src.util.basic._AbstractAttributePlaceholder object>
_PreprocessorClass

alias of src.preprocessor.DefaultPreprocessor

_DataKeyClass = <src.util.basic._AbstractAttributePlaceholder object>
_deactivation_log_level = 40
property full_name
property _children

Iterable of child objects (Diagnostics) associated with this object.

iter_vars(active=None, pod_active=None)[source]

Iterator over all :class:~diagnostic.VarlistEntry`s (grandchildren) associated with this case. Returns PodVarTuples (namedtuples) of the Diagnostic and :class:~diagnostic.VarlistEntry` objects corresponding to the POD and its variable, respectively.

Parameters
  • active

    bool or None, default None. Selects subset of VarlistEntries which are returned in the namedtuples:

    • active = True: only iterate over currently active VarlistEntries.

    • active = False: only iterate over inactive VarlistEntries

      (VarlistEntries which have either failed or are currently unused alternate variables).

    • active = None: iterate over both active and inactive

      VarlistEntries.

  • pod_active – bool or None, default None. Same as active, but filtering the PODs that are selected.

iter_vars_only(active=None)[source]

Convenience wrapper for iter_vars() that returns only the :class:~diagnostic.VarlistEntry` objects (grandchildren) from all PODs in this DataSource.

setup()[source]
setup_pod(pod)[source]

Update POD with information that only becomes available after DataManager and Diagnostic have been configured (ie, only known at runtime, not from settings.jsonc.)

Could arguably be moved into Diagnostic’s init, at the cost of dependency inversion.

setup_var(pod, v)[source]

Update VarlistEntry fields with information that only becomes available after DataManager and Diagnostic have been configured (ie, only known at runtime, not from settings.jsonc.)

Could arguably be moved into VarlistEntry’s init, at the cost of dependency inversion.

variable_dest_path(pod, var)[source]

Returns the absolute path of the POD’s preprocessed, local copy of the file containing the requested dataset. Files not following this convention won’t be found by the POD.

data_key(value, expt_key=None, status=None)[source]

Constructor for an instance of DataKeyBase that’s used by this DataSource.

is_fetch_necessary(d_key, var=None)[source]
child_deactivation_handler(child, child_exc)[source]

When a DataKey (child) has been deactivated during query or fetch, log a message on all VarlistEntries using it, and deactivate any VarlistEntries with no remaining viable DataKeys.

query_data()[source]
select_data()[source]
fetch_data()[source]
preprocess_data()[source]

Hook to run the preprocessing function on all variables.

request_data()[source]

Top-level method to iteratively query, fetch and preprocess all data requested by PODs, switching to alternate requested data as needed.

query_and_fetch_cleanup(signum=None, frame=None)[source]

Called if framework is terminated abnormally. Not called during normal exit.

_abc_impl = <_abc_data object>
class src.data_manager.DataFrameDataKey(*args, **kwargs)[source]

Bases: src.data_manager.DataKeyBase

DataKeyBase for use with DataframeQueryDataSourceBase and child classes. The value*s stored in the DataKey are row indices on the catalog DataFrame in the DataSource, and the *remote_data method returns values from those rows from the column in the catalog containing the paths to remote data.

Note

Due to implementation, the catalog used by the DataSource must be static. This code could readily be adapted to a dynamic catalog if its schema provided a unique ID number for each row, to take the place of the row index used here.

remote_data()[source]

Returns paths, urls, etc. to be used as input to a fetch_data method to specify how this dataset is fetched.

_abc_impl = <_abc_data object>
local_data
class src.data_manager.DataFrameQueryColumnGroup(key_cols=None, derived_cols=None)[source]

Bases: object

Class wrapping a set of catalog (DataFrame) column names used by DataframeQueryDataSourceBase in selecting experiment attributes of a given scope (case-wide, pod-wide or var-wide).

One component of DataframeQueryColumnSpec.

_expt_key_col = 'expt_key'
expt_key(df, idx=None)[source]

Returns string-valued key for use in grouping the rows of df by experiment.

Note

We can’t just do a .groupby on column names, because pandas attempts to coerce DateFrequency to a timedelta64, which overflows for static DateFrequency. There doesn’t seem to be a way to disable this type coercion.

expt_key_func(df)[source]

Function that constructs the appropriate experiment_key column when apply()’ed to the query results DataFrame.

class src.data_manager.DataframeQueryColumnSpec(*args, **kwargs)[source]

Bases: object

  • expt_cols: Catalog columns whose values must be the same for all

    variables being fetched. This is the most common sense in which we “specify an experiment.”

  • pod_expt_cols: Catalog columns whose values must be the same for each

    POD, but may differ between PODs. An example could be spatial grid resolution. Defaults to the empty set.

  • var_expt_cols: Catalog columns whose values must “be the same for each

    variable”, i.e. are irrelevant differences for our purposes but must be constrained to a unique value in order to uniquely specify an experiment. An example is the CMIP6 MIP table: the same variable can appear in multiple MIP tables, but the choice of table isn’t relvant for PODs. Defaults to the empty set.

In addition, there are specially designated column names:

  • remote_data_col: Name of the column in the catalog containing the

    location of the data for that row (e.g., path to a netCDF file).

  • daterange_col: Name of the column in the catalog containing

    util.DateRange objects specifying the date range covered by the data for that row. If set to None, we assume this information isn’t available from the catalog and date range selection logic is skipped.

expt_cols: src.data_manager.DataFrameQueryColumnGroup = sentinel.Mandatory
pod_expt_cols: DataFrameQueryColumnGroup
var_expt_cols: DataFrameQueryColumnGroup
remote_data_col: str = None
daterange_col: str = None
property has_date_info
property all_expt_cols

Columns of the DataFrame specifying the experiment. We assume that specifying a valid value for each of the columns in this set uniquely identifies an experiment.

expt_key(df, idx=None)[source]

Returns tuple of string-valued keys for grouping files by experiment: (<values of expt_key_cols>, <values of pod_expt_key_cols>, <values of var_expt_key_cols>).

_abc_impl = <_abc_data object>
class src.data_manager.DataframeQueryDataSourceBase(*args, **kwargs)[source]

Bases: src.data_manager.DataSourceBase

DataSource which queries a data catalog made available as a pandas DataFrame, and includes logic for selecting experiment based on column values.

Note

This implementation assumes the catalog is static and locally loaded into memory. (I think) the only source of this limitation is the fact that it uses values of the DataFrame’s Index as its DataKeys, instead of storing the complete row contents, so this limitation could be lifted if needed.

TODO: integrate better with general Intake API.

_DataKeyClass

alias of DataFrameDataKey

col_spec = <src.util.basic._AbstractAttributePlaceholder object>
abstract property df

Synonym for the DataFrame containing the catalog.

property all_columns
property remote_data_col
_query_clause(col_name, query_attr_name, query_attr_val)[source]

Translate a single field value into a logical clause in the dataframe catalog query. All queryable field values are assumed to be attribute values on a local variable named _dict_var_name.

_query_catalog(var)[source]

Construct and execute the query to determine whether data matching var is present in the catalog.

Split off logic done here to perform the query against the catalog (returning a dataframe with results) from the processing of those results, in order to simplify overriding by child classes.

check_group_daterange(group_df, expt_key=None, log=<Logger src.data_manager (WARNING)>)[source]

Sort the files found for each experiment by date, verify that the date ranges contained in the files are contiguous in time and that the date range of the files spans the query date range.

_query_group_hook(group_df)[source]

Additional filtering to do on query results for a single experiment, for use by child classes.

query_dataset(var)[source]

Find all rows of the catalog matching relevant attributes of the DataSource and of the variable (VarlistEntry). Group these by experiments, and for each experiment make the corresponding DataFrameDataKey and store it in var’s data attribute. Specifically, the data attribute is a dict mapping experiments (labeled by experiment_keys) to data found for that variable by this query (labeled by the DataKeys).

_query_error_handler(msg, d_key, log=<Logger src.data_manager (WARNING)>)[source]

Log debugging message or raise an exception, depending on if we’re in strict mode.

_expt_df(obj, var_iterator, col_group, parent_id=None, obj_name=None)[source]

Return a DataFrame of partial experiment attributes (as determined by cols) that are shared by the query results of all variables covered by var_iterator.

get_expt_key(scope, obj, parent_id=None)[source]

Set experiment attributes with case, pod or variable scope. Given obj, construct a DataFrame of epxeriment attributes that are found in the queried data for all variables in obj.

If more than one choice of experiment is possible, call DataSource-specific heuristics in resolve_func to choose between them.

set_expt_key(obj, expt_key)[source]
set_experiment()[source]

Ensure that all data we’re about to fetch comes from the same experiment. If data from multiple experiments was returned by the query that just finished, either employ data source-specific heuristics to select one or return an error.

resolve_expt(expt_df, obj)[source]

Tiebreaker logic to resolve redundancies in experiments, to be specified by child classes.

resolve_pod_expt(expt_df, obj)[source]

Tiebreaker logic to resolve redundancies in experiments, to be specified by child classes.

resolve_var_expt(expt_df, obj)[source]

Tiebreaker logic to resolve redundancies in experiments, to be specified by child classes.

_abc_impl = <_abc_data object>
class src.data_manager.OnTheFlyFilesystemQueryMixin(*args, **kwargs)[source]

Bases: object

Mixin that creates an intake_esm.esm_datastore catalog by using a regex (_FileRegexClass) to query the existence of data files on a remote filesystem.

For the purposes of this class, all data attributes are inferred only from filea nd directory naming conventions: the contents of the files are not examined (i.e., the data files are not read from) until they are fetched to a local filesystem.

Note

At time of writing, the filename parsing functionality included in intake is too limited to correctly parse our use cases, which is why we use the RegexPattern class instead.

CATALOG_DIR = <src.util.basic._AbstractAttributePlaceholder object>
_FileRegexClass = <src.util.basic._AbstractAttributePlaceholder object>
_asset_file_format = 'netcdf'
property df
property remote_data_col

Name of the column in the catalog containing the path to the remote data file.

_dummy_esmcol_spec()[source]

Dummy specification dict that enables us to use intake_esm’s machinery. The catalog is temporary and not retained after the code finishes running.

abstract generate_catalog()[source]

Method (to be implemented by child classes) which returns the data catalog as a Pandas DataFrame. One of the columns of the DataFrame must have the name returned by remote_data_col() and contain paths to the files.

setup_query()[source]

Generate an intake_esm catalog of files found in CATALOG_DIR. Attributes of files listed in the catalog (columns of the DataFrame) are taken from the match groups (fields) of the class’s _FileRegexClass.

_abc_impl = <_abc_data object>
class src.data_manager.OnTheFlyDirectoryHierarchyQueryMixin(*args, **kwargs)[source]

Bases: src.data_manager.OnTheFlyFilesystemQueryMixin

Mixin that creates an intake_esm.esm_datastore catalog on-the-fly by crawling a directory hierarchy and populating catalog entry attributes by running a regex (_FileRegexClass) against the paths of files in the directory hierarchy.

_DirectoryRegex = {}
iter_files()[source]

Generator that yields instances of _FileRegexClass generated from relative paths of files in CATALOG_DIR. Only paths that match the regex in _FileRegexClass are returned.

generate_catalog()[source]

Crawl the directory hierarchy via iter_files() and return the set of found files as rows in a Pandas DataFrame.

_abc_impl = <_abc_data object>
class src.data_manager.FileGlobTuple(name, glob, attrs)

Bases: tuple

Class representing one file glob pattern. ‘attrs’ is a dict containing the data catalog values that will be associated with all files found using ‘glob’. ‘name’ is used for logging only.

_asdict()

Return a new OrderedDict which maps field names to their values.

_field_defaults = {}
_fields = ('name', 'glob', 'attrs')
_fields_defaults = {}
classmethod _make(iterable)

Make a new FileGlobTuple object from a sequence or iterable

_replace(**kwds)

Return a new FileGlobTuple object replacing specified fields with new values

property attrs

Alias for field number 2

property glob

Alias for field number 1

property name

Alias for field number 0

class src.data_manager.OnTheFlyGlobQueryMixin(*args, **kwargs)[source]

Bases: src.data_manager.OnTheFlyFilesystemQueryMixin

Mixin that creates an intake_esm.esm_datastore catalog on-the-fly by searching for files with (python’s implementation of) the shell glob syntax.

We still invoke _FileRegexClass to parse the paths, but the expected use case is that this will be the trivial regex (matching everything, with no labeled match groups), since the file selection logic is being handled by the globs. If you know your data is stored according to some relevant structure, you should use OnTheFlyDirectoryHierarchyQueryMixin instead.

abstract iter_globs()[source]

Iterator returning FileGlobTuple instances. The generated catalog contains the union of the files found by each of the globs.

iter_files(path_glob)[source]

Generator that yields instances of _FileRegexClass generated from relative paths of files in CATALOG_DIR. Only paths that match the regex in _FileRegexClass are returned.

generate_catalog()[source]

Build the catalog from the files returned from the set of globs provided by rel_path_globs().

_abc_impl = <_abc_data object>
class src.data_manager.LocalFetchMixin[source]

Bases: src.data_manager.AbstractFetchMixin

Mixin implementing data fetch for files on a locally mounted filesystem. No data is transferred; we assume that xarray can open the paths directly. Paths are unaltered and set as variable’s local_data.

fetch_dataset(var, d_key)[source]

Fetches data corresponding to data_key. Populates its local_data attribute with a list of identifiers for successfully fetched data (paths to locally downloaded copies of data).

_abc_impl = <_abc_data object>
class src.data_manager.LocalFileDataSource(*args, **kwargs)[source]

Bases: src.data_manager.OnTheFlyDirectoryHierarchyQueryMixin, src.data_manager.LocalFetchMixin, src.data_manager.DataframeQueryDataSourceBase

DataSource for dealing data in a regular directory hierarchy on a locally mounted filesystem. Assumes data for each variable may be split into several files according to date, with the dates present in their filenames.

_abc_impl = <_abc_data object>
class src.data_manager.SingleLocalFileDataSource(*args, **kwargs)[source]

Bases: src.data_manager.LocalFileDataSource

DataSource for dealing data in a regular directory hierarchy on a locally mounted filesystem. Assumes all data for each variable (in each experiment) is contained in a single file.

_abc_impl = <_abc_data object>
query_dataset(var)[source]

Verify that only a single file was found from each experiment.