"""Code to parse CMIP6 controlled vocabularies and elements of the CMIP6 DRS.
Specifications for the above were taken from the planning document
`<http://goo.gl/v1drZl>`__, which doesn't seem to have a permanent link. The
CMIP6 controlled vocabularies (lists of registered MIPs, modeling centers, etc.)
are derived from data in the
`PCMDI/cmip6-cmor-tables <https://github.com/PCMDI/cmip6-cmor-tables>`__
repo, which is included as a subtree under ``/data``.
.. warning::
Functionality here has been added as needed for the project and is incomplete,
for example parsing subexperiments is not supported.
"""
import os
import re
import dataclasses as dc
from src import util, core
import logging
_log = logging.getLogger(__name__)
[docs]class CMIP6_CVs(util.Singleton):
"""Interface for looking up information from the CMIP6 CV file.
.. note::
Lookups are implemented in an ad-hoc way with :class:`util.MultiMap`; a
more robust solution would use sqlite.
"""
def __init__(self, unittest=False):
if unittest:
# value not used, when we're testing will mock out call to read_json
# below with actual translation table to use for test
file_ = 'dummy_filename'
else:
paths = core.PathManager()
file_ = os.path.join(paths.CODE_ROOT, 'data',
'cmip6-cmor-tables','Tables','CMIP6_CV.json')
self._contents = util.read_json(file_, log=_log)
self._contents = self._contents['CV']
for k in ['product','version_metadata','required_global_attributes',
'further_info_url','Conventions','license']:
# remove unecessary information
del self._contents[k]
# munge table_ids
self._contents['table_id'] = dict.fromkeys(self._contents['table_id'])
for tbl in self._contents['table_id']:
self._contents['table_id'][tbl] = dc.asdict(CMIP6_MIPTable(tbl))
self.cv = dict()
self._lookups = dict()
[docs] def _make_cv(self):
"""Populate the *cv* attribute of :class:`CMIP6_CVs` with the tables
read in during __init__().
Do this on-demand rather than in __init__, in case this information isn't
needed for this run of the framework.
"""
if self.cv:
return
for k in self._contents:
self.cv[k] = util.to_iter(self._contents[k])
[docs] def is_in_cv(self, category, items):
"""Determine if *items* take values that are valid for the CV category
*category*.
Args:
category (str): the CV category to use to validate values.
items (str or list of str): Entries whose validity we'd like to
check.
Returns: boolean or list of booleans, corresponding to the validity of
the entries in *items*.
"""
self._make_cv()
if category not in self.cv:
raise KeyError(f"Unrecognized CMIP6 CV category {category}.")
if util.is_iterable(items):
return [(item in self.cv[category]) for item in items]
else:
return (items in self.cv[category])
[docs] def get_lookup(self, source, dest):
"""Find the appropriate lookup table to convert values in *source* (keys)
to values in *dest* (values), generating it if necessary.
Args:
source (str): the CV category to use for the keys.
dest (str): the CV category to use for the values.
Returns: :class:`util.MultiMap` providing a dict-like lookup interface,
ie dest_value = d[source_key].
"""
if (source, dest) in self._lookups:
return self._lookups[(source, dest)]
elif (dest, source) in self._lookups:
return self._lookups[(dest, source)].inverse()
elif source in self._contents:
k = list(self._contents[source])[0]
if dest not in self._contents[source][k]:
raise KeyError(f"Can't find {dest} in attributes for {source}.")
mm = util.MultiMap()
for k in self._contents[source]:
mm[k].update(
util.to_iter(self._contents[source][k][dest], set)
)
self._lookups[(source, dest)] = mm
return mm
elif dest in self._contents:
return self._lookups[(dest, source)].inverse()
else:
raise KeyError(f"Neither {source} or {dest} in CV table list.")
[docs] def lookup(self, source_items, source, dest):
"""Lookup the corresponding *dest* values for *source_items* (keys).
Args:
source_items (str or list): one or more keys
source (str): the CV category that the items in *source_items*
belong to.
dest (str): the CV category we'd like the corresponding values for.
Returns: list of *dest* values corresponding to each entry in *source_items*.
"""
_lookup = self.get_lookup(source, dest)
if util.is_iterable(source_items):
return [util.from_iter(_lookup[item]) for item in source_items]
else:
return util.from_iter(_lookup[source_items])
[docs] def lookup_single(self, source_item, source, dest):
"""The same as :meth:`lookup`, but perform lookup for a single
*source_item*, and raise KeyError if the number of values returned is
!= 1.
"""
_lookup = self.get_lookup(source, dest)
dest_items = _lookup[source_item]
if len(dest_items) != 1:
raise KeyError(f"Non-unique lookup for {dest} from {source}='{source_item}'.")
return dest_items.pop()
# TODO: Represent contents as pandas DataFrame, allow pseudo-SQL multi-column
# lookups
# ----------------------------------
[docs] def table_id_from_freq(self, frequency):
"""Specialized lookup to determine which MIP tables use data at the
requested *frequency*.
Should really be handled as a special case of :meth:`lookup`.
Args:
frequency (:class:`CMIP6DateFrequency`): DateFrequency
Returns: list of MIP table ``table_id`` names, if any, that use data at
the given *frequency*.
"""
self._make_cv()
assert 'table_id' in self.cv
d = self.cv['table_id'] # abbreviate
return [tbl for tbl, tbl_d in d.items() \
if tbl_d.get('frequency', None) == frequency]
[docs]class CMIP6DateFrequency(util.DateFrequency):
"""Subclass of :class:`src.util.datelabel.DateFrequency` to parse data frequency
information as encoded in MIP tables, DRS filenames, etc.
Extends DateFrequency in that this records if the data is a climatological
average, although this information is not currently used.
Reference: `<http://goo.gl/v1drZl>`__ page 16.
"""
_precision_lookup = {
'fx': 0, 'yr': 1, 'mo': 2, 'day': 3,
'hr': 5, # includes minutes
'min': 6, # = subhr, minutes and seconds
}
_regex = re.compile(r"""
^
(?P<quantity>(1|3|6)?)
(?P<unit>[a-z]*?)
(?P<avg>(C|CM|Pt)?)
$
""", re.VERBOSE)
__str__ = format
def __copy__(self):
return self.__class__(self.format())
def __deepcopy__(self, memo):
return self.__class__(self.format())
# ===========================================================================
variant_label_regex = util.RegexPattern(r"""
(r(?P<realization_index>\d+))? # (optional) int prefixed with 'r'
(i(?P<initialization_index>\d+))? # (optional) int prefixed with 'i'
(p(?P<physics_index>\d+))? # (optional) int prefixed with 'p'
(f(?P<forcing_index>\d+))? # (optional) int prefixed with 'f'
""",
input_field="variant_label"
)
[docs]@util.regex_dataclass(variant_label_regex)
class CMIP6_VariantLabel():
"""Dataclass which represents and parses the CMIP6 DRS variant label identifier
string.
References: `<https://earthsystemcog.org/projects/wip/mip_table_about>`__,
although this doesn't document all cases used in CMIP6. See also note 8 on
page 9 of `<http://goo.gl/v1drZl>`__.
"""
variant_label: str = util.MANDATORY
realization_index: int = None
initialization_index: int = None
physics_index: int = None
forcing_index: int = None
mip_table_regex = util.RegexPattern(r"""
# ^ # start of line
(?P<table_prefix>(A|CF|E|I|AER|O|L|LI|SI)?)
# maybe a digit, followed by as few lowercase letters as possible:
(?P<table_freq>\d?[a-z]*?)
(?P<table_suffix>(ClimMon|Lev|Plev|Ant|Gre)?)
(?P<table_qualifier>(Pt|Z|Off)?)
# $ # end of line - necessary for lazy capture to work
""",
input_field="table_id"
)
[docs]@util.regex_dataclass(mip_table_regex)
class CMIP6_MIPTable():
"""Dataclass which represents and parses MIP table identifier string.
Reference: `<https://earthsystemcog.org/projects/wip/mip_table_about>`__,
although this doesn't document all cases used in CMIP6.
"""
table_id: str = util.MANDATORY
table_prefix: str = ""
table_freq: dc.InitVar = ""
table_suffix: str = ""
table_qualifier: str = ""
frequency: CMIP6DateFrequency = dc.field(init=False)
spatial_avg: str = dc.field(init=False)
temporal_avg: str = dc.field(init=False)
region: str = dc.field(init=False)
def __post_init__(self, table_freq=None):
if table_freq is None:
raise ValueError()
elif table_freq == 'clim':
self.frequency = CMIP6DateFrequency('mon')
else:
self.frequency = CMIP6DateFrequency(table_freq)
if self.table_qualifier == 'Z':
self.spatial_avg = 'zonal_mean'
else:
self.spatial_avg = None
if self.table_qualifier == 'Pt':
self.temporal_avg = 'point'
else:
self.temporal_avg = 'interval'
if self.table_suffix == 'a':
self.region = 'Antarctica'
elif self.table_suffix == 'g':
self.region = 'Greenland'
else:
self.region = None
grid_label_regex = util.RegexPattern(r"""
g
(?P<global_mean>m?)
(?P<regrid>n|r?)
(?P<grid_number>\d?)
(?P<region>a|g?)
(?P<zonal_mean>z?)
""",
input_field="grid_label"
)
[docs]@util.regex_dataclass(grid_label_regex)
class CMIP6_GridLabel():
"""Dataclass which represents and parses the CMIP6 DRS grid label identifier string.
Reference: `<http://goo.gl/v1drZl>`__, note 11 on page 11.
"""
grid_label: str = util.MANDATORY
global_mean: dc.InitVar = ""
regrid: str = ""
grid_number: int = 0
region: str = ""
zonal_mean: dc.InitVar = ""
spatial_avg: str = dc.field(init=False)
native_grid: bool = dc.field(init=False)
def __post_init__(self, global_mean=None, zonal_mean=None):
if not self.grid_number:
self.grid_number = 0
if global_mean:
self.spatial_avg = 'global_mean'
elif zonal_mean:
self.spatial_avg = 'zonal_mean'
else:
self.spatial_avg = None
self.native_grid = not (self.regrid == 'r')
if self.region == 'a':
self.region = 'Antarctica'
elif self.region == 'g':
self.region = 'Greenland'
else:
self.region = None
drs_directory_regex = util.RegexPattern(r"""
/? # maybe initial separator
(CMIP6/)?
(?P<activity_id>\w+)/
(?P<institution_id>[a-zA-Z0-9_-]+)/
(?P<source_id>[a-zA-Z0-9_-]+)/
(?P<experiment_id>[a-zA-Z0-9_-]+)/
(?P<variant_label>\w+)/
(?P<table_id>\w+)/
(?P<variable_id>\w+)/
(?P<grid_label>\w+)/
v(?P<version_date>\d+)
/? # maybe final separator
""",
input_field="directory"
)
[docs]@util.regex_dataclass(drs_directory_regex)
class CMIP6_DRSDirectory(CMIP6_VariantLabel, CMIP6_MIPTable, CMIP6_GridLabel):
"""Dataclass which represents and parses the DRS directory, using regex
defined above.
Reference: `<http://goo.gl/v1drZl>`__, page 17.
.. warning::
This regex will fail on paths involving subexperiments.
"""
directory: str = util.MANDATORY
activity_id: str = ""
institution_id: str = ""
source_id: str = ""
experiment_id: str = ""
variant_label: CMIP6_VariantLabel = ""
table_id: CMIP6_MIPTable = ""
grid_label: CMIP6_GridLabel = ""
version_date: util.Date = None
_drs_dates_filename_regex = util.RegexPattern(r"""
(?P<variable_id>\w+)_ # field name
(?P<table_id>\w+)_ # field name
(?P<source_id>[a-zA-Z0-9_-]+)_ # field name
(?P<experiment_id>[a-zA-Z0-9_-]+)_ # field name
(?P<variant_label>\w+)_ # field name
(?P<grid_label>\w+)_ # field name
(?P<start_date>\d+)-(?P<end_date>\d+) # file's date range
\.nc # netCDF file extension
"""
)
_drs_static_filename_regex = util.RegexPattern(r"""
(?P<variable_id>\w+)_ # field name
(?P<table_id>\w+)_ # field name
(?P<source_id>[a-zA-Z0-9_-]+)_ # field name
(?P<experiment_id>[a-zA-Z0-9_-]+)_ # field name
(?P<variant_label>\w+)_ # field name
(?P<grid_label>\w+)
\.nc # netCDF file extension, no dates
""",
defaults={'start_date': util.FXDateMin, 'end_date': util.FXDateMax},
)
drs_filename_regex = util.ChainedRegexPattern(
# try the first regex, and if no match, try second
_drs_dates_filename_regex, _drs_static_filename_regex,
input_field="filename"
)
[docs]@util.regex_dataclass(drs_filename_regex)
class CMIP6_DRSFilename(CMIP6_VariantLabel, CMIP6_MIPTable, CMIP6_GridLabel):
"""Dataclass which represents and parses the DRS filename, using regex
defined above.
Reference: `<http://goo.gl/v1drZl>`__, page 14-15.
"""
filename: str = util.MANDATORY
variable_id: str = ""
table_id: CMIP6_MIPTable = ""
source_id: str = ""
experiment_id: str = ""
variant_label: CMIP6_VariantLabel = ""
grid_label: CMIP6_GridLabel = ""
start_date: util.Date = None
end_date: util.Date = None
date_range: util.DateRange = dc.field(init=False)
def __post_init__(self, *args):
if self.start_date == util.FXDateMin \
and self.end_date == util.FXDateMax:
# Assume we're dealing with static/fx-frequency data, so use special
# placeholder values
self.date_range = util.FXDateRange
if not self.frequency.is_static: # frequency inferred from table_id
raise util.DataclassParseError(("Inconsistent filename parse: "
f"cannot determine if '{self.filename}' represents static data."))
else:
self.date_range = util.DateRange(self.start_date, self.end_date)
if self.frequency.is_static: # frequency inferred from table_id
raise util.DataclassParseError(("Inconsistent filename parse: "
f"cannot determine if '{self.filename}' represents static data."))
drs_path_regex = util.RegexPattern(r"""
(?P<directory>\S+)/ # any non-whitespace
(?P<filename>[^/\s]+) # nonwhitespace and not directory separator
""",
input_field="path"
)
[docs]@util.regex_dataclass(drs_path_regex)
class CMIP6_DRSPath(CMIP6_DRSDirectory, CMIP6_DRSFilename):
"""Dataclass which represents and parses a full CMIP6 DRS path.
"""
path: str = util.MANDATORY
directory: CMIP6_DRSDirectory = ""
filename: CMIP6_DRSFilename = ""