"""Classes related to customizing the framework's command line interface and
setting option values from user input.
"""
import os
import sys
import io
import argparse
import collections
import dataclasses
import importlib
import itertools
import json
import operator
import shlex
import re
import textwrap
import typing
from src import util
import logging
_log = logging.getLogger(__name__)
_SCRIPT_NAME = 'mdtf.py' # mimick argparse error message text
[docs]def canonical_arg_name(str_):
"""Convert a flag or other specification to a destination variable name.
The destination variable name always has underscores, never hyphens, in
accordance with PEP8. Eg., "--GNU-style-flag" -> "GNU_style_flag".
"""
return str_.lstrip('-').rstrip().replace('-', '_')
[docs]def plugin_key(plugin_name):
"""Ignore spaces and underscores in supplied choices for CLI plugins, and
make matching of plugin names case-insensititve.
"""
return re.sub(r"[\s_]+", "", plugin_name).lower()
[docs]def word_wrap(str_):
"""Clean whitespace and produces better word wrapping for multi-line help
and description strings. Explicit paragraph breaks must be encoded as a
double newline ( ``\n\n`` ).
"""
paragraphs = textwrap.dedent(str_).split('\n\n')
paragraphs = [re.sub(r'\s+', ' ', s).strip() for s in paragraphs]
paragraphs = [textwrap.fill(s, width=80) for s in paragraphs]
return '\n\n'.join(paragraphs)
[docs]def read_config_files(code_root, file_name, site=""):
"""Utility function to read a *pair* of configuration files (one for the
framework defaults, another optional one for site-specific config.)
Args:
file_name (str): Name of file to search for. We search for the file
in all subdirectories of :meth:`._CLIConfigHandler.site_dir`
and :meth:`._CLIConfigHandler.framework_dir`, respectively.
Returns: a tuple of the two files' contents. First entry is the
site specific file (empty dict if that file isn't found) and second
is the framework file (fatal error; exit immediately if not found.)
"""
src_dir = os.path.join(code_root, 'src')
site_dir = os.path.join(code_root, 'sites', site)
site_d = util.find_json(site_dir, file_name, exit_if_missing=False, log=_log)
fmwk_d = util.find_json(src_dir, file_name, exit_if_missing=True, log=_log)
return (site_d, fmwk_d)
[docs]def read_config_file(code_root, file_name, site=""):
"""Return site's file if present, else the framework's file.
"""
site_d, fmwk_d = read_config_files(code_root, file_name, site=site)
if not site_d:
return fmwk_d
return site_d
[docs]class RecordDefaultsAction(argparse.Action):
""":py:class:`~argparse.Action` that adds a boolean to record if user
actually set argument's value, or if we're using the default value specified
in the parser. From `<https://stackoverflow.com/a/50936474>`__. This also
re-implements the 'store_true' and 'store_false' actions, in order to give
defaults information on boolean flags.
If the user specifies a value for ``<option>``, the :meth:`__call__` method
adds a variable named ``<option>_is_default_`` to the returned
:py:class:`argparse.Namespace`. This information is used by
:meth:`.MDTFArgParser.parse_args` to populate the ``is_default`` attribute
of :class:`.MDTFArgParser`.
Subclasses of :py:class:`argparse.Action` are only called on user-supplied
values, not default values. If the ``call_on_defaults`` flag is set on a
subclass, :meth:`.MDTFArgParser.parse_args` will also call the action on
default values.
"""
default_value_suffix = '_is_default_'
call_on_defaults = False # call action on default values
def __init__(self, option_strings, dest, nargs=None, const=None,
default=None, type=None, **kwargs):
if isinstance(default, bool):
nargs = 0 # behave like a flag
const = (not default) # set flag = store opposite of default
elif (isinstance(default, str) or type == str) and nargs is None:
# unless nargs given explictly, string-valued options accept 1 argument
nargs = 1
const = None
super(RecordDefaultsAction, self).__init__(
option_strings, dest, nargs=nargs, const=const, default=default,
type=type, **kwargs
)
def __call__(self, parser, namespace, values, option_string=None):
if self.nargs == 0 and self.const is not None:
setattr(namespace, self.dest, self.const)
elif self.nargs == 1:
setattr(namespace, self.dest, util.from_iter(values))
else:
setattr(namespace, self.dest, values)
# set additional flag to indicate user has set this argument
setattr(namespace, self.dest+self.default_value_suffix, False)
[docs]class PathAction(RecordDefaultsAction):
""":py:class:`~argparse.Action` that performs shell environment variable
expansion and resolution of relative paths, using
:func:`src.util.filesystem.resolve_path`.
"""
call_on_defaults = True
def __call__(self, parser, namespace, values, option_string=None):
# config = CLIConfigManager()
path = util.from_iter(values)
if path is None:
path = ''
# Don't do anything else here: may need to use env vars to properly
# resolve paths, so that's now done later, in core.PathManager.__init__()
super(PathAction, self).__call__(parser, namespace, path, option_string)
[docs]class ClassImportAction(RecordDefaultsAction):
""":py:class:`~argparse.Action` to import classes on demand. Values are
looked up from the 'cli_plugins.jsonc' file.
Placeholder used to trigger behavior when arguments are parsed.
"""
call_on_defaults = False
def __call__(self, parser, namespace, values, option_string=None):
"""Do case-insensitive matching on plugin names.
"""
p_key = plugin_key(util.from_iter(values))
super(ClassImportAction, self).__call__(
parser, namespace, p_key, option_string
)
[docs]class PluginArgAction(ClassImportAction):
""":py:class:`~argparse.Action` to invoke the CLI plugin functionality.
Placeholder used to trigger behavior when arguments are parsed.
"""
call_on_defaults = False
# ===========================================================================
# classes for represting CLI configuration information
[docs]@dataclasses.dataclass
class CLIArgument(object):
"""Class holding configuration options for a single argument of an
:py:class:`argparse.ArgumentParser`, with several custom options to simplify
the parsing of CLIs defined in JSON files.
"""
name: str
action: str = None
nargs: str = None
const: typing.Any = None
default: typing.Any = None
type: typing.Any = None
choices: typing.Iterable = None
required: bool = False
help: str = None
metavar: str = None
dest: str = None
# following are custom additions to syntax
arg_flags: list = dataclasses.field(init=False)
is_positional: bool = False
short_name: str = None
hidden: bool = False
def __post_init__(self):
"""Post-initialization type converstion of attributes.
"""
def _flag_names(_arg_name):
_arg_flags = [_arg_name]
if '_' in _arg_name:
# recognize both --hyphen_opt and --hyphen-opt (GNU CLI convention)
_arg_flags.append(_arg_name.replace('_', '-'))
return ['--'+s for s in _arg_flags]
# Format flag name(s) and destination variables
if self.is_positional:
assert isinstance(self.name, str) # not a list
arg_name = canonical_arg_name(self.name)
self.arg_flags = [arg_name]
self.name = arg_name
self.dest = None # positionals can't specify independent dest
self.required = None # positionals always required
else:
# argument is a command-line flag (default)
# if self.name is a list, recognize all entries as synonyms
arg_flags = [canonical_arg_name(s) for s in util.to_iter(self.name)]
if self.dest is None:
# if synonyms provided, destination is first in list
self.dest = arg_flags[0]
self.arg_flags = list(itertools.chain.from_iterable(
_flag_names(s) for s in arg_flags
))
if self.short_name is not None:
# recognize both --option and -O, if short_name defined
self.arg_flags.insert(1, '-' + self.short_name)
# Add default value set from site-specific file (so that it will show up
# when the user runs --help)
config = CLIConfigManager()
if self.dest in config.site_defaults:
self.default = config.site_defaults[self.dest]
# Type conversion of default value:
if self.type is not None:
if isinstance(self.type, str):
self.type = util.deserialize_class(self.type)
if self.default is not None:
if not isinstance(self.default, self.type):
self.default = self.type(self.default)
if self.action == 'count':
self.default = int(self.default)
if self.const is not None and not isinstance(self.const, self.type):
self.const = self.type(self.const)
# parse action
if self.action is None:
self.action = RecordDefaultsAction
elif isinstance(self.action, str) and self.action not in ['store',
'store_const','store_true','store_false','append','append_const',
'count','help','version']:
self.action = util.deserialize_class(self.action)
if isinstance(self.action, type) and issubclass(self.action, ClassImportAction):
# Enforce case-insensitive matching on plugin names.
self.nargs = 1
self.const = None
self.type = plugin_key
if self.default:
self.default = plugin_key(self.default)
# do not list argument in "mdtf --help", but recognize it
if self.hidden:
self.help = argparse.SUPPRESS
[docs] def add(self, target_p):
"""Adds the CLI argument to the parser ``target_p``. Wraps
:py:meth:`~argparse.ArgumentParser.add_argument`.
Args:
target_p: Parser object (or parser group, or subparser) to which the
argument will be added.
"""
kwargs = {k:v for k,v in dataclasses.asdict(self).items() \
if v is not None and k not in [
'name', 'arg_flags', 'is_positional', 'short_name', 'hidden'
]}
return target_p.add_argument(*self.arg_flags, **kwargs)
[docs]@dataclasses.dataclass
class CLIArgumentGroup(object):
"""Class holding configuration options for an
:py:class:`argparse.ArgumentParser` `argument group
<https://docs.python.org/3.7/library/argparse.html#argument-groups>`__.
"""
title: str
description: str = None
arguments: list = dataclasses.field(default_factory=list)
[docs] @classmethod
def from_dict(cls, d):
"""Initialize an instance of this object from a nested dict obtained
from reading a JSON file.
"""
args_list = d.get('arguments', [])
d['arguments'] = [CLIArgument(**kwargs) for kwargs in args_list]
return cls(**d)
[docs] def add(self, target_p):
"""Adds the CLI argument group, as well as all arguments it contains,
to the parser ``target_p``. Wraps
:py:meth:`~argparse.ArgumentParser.add_argument_group`.
Args:
target_p: Parser object (or parser group, or subparser) to which the
argument group will be added.
"""
if self.arguments:
# only add group if it has > 0 arguments
kwargs = {k:v for k,v in dataclasses.asdict(self).items() \
if v is not None and k in ['title', 'description']}
arg_gp = target_p.add_argument_group(**kwargs)
for arg in self.arguments:
_ = arg.add(arg_gp)
return arg_gp
[docs]@dataclasses.dataclass
class CLIParser(object):
"""Class holding configuration options for an instance of
:py:class:`argparse.ArgumentParser` (or equivalently a subparser or a
command plugin).
"""
prog: str = None
usage: str = None
description: str = None
epilog: str = None
arguments: list = dataclasses.field(default_factory=list)
argument_groups: list = dataclasses.field(default_factory=list)
def __post_init__(self):
"""Post-initialization type converstion of attributes.
"""
for attr_ in ['prog', 'usage', 'description', 'epilog']:
str_ = getattr(self, attr_, None)
if str_:
setattr(self, attr_, word_wrap(str_))
[docs] @classmethod
def from_dict(cls, d):
"""Initialize an instance of this object from a nested dict obtained
from reading a JSON file.
"""
args_list = d.get('arguments', [])
if args_list:
d['arguments'] = [CLIArgument(**arg_d) for arg_d in args_list]
arg_gps = d.get('argument_groups', [])
if arg_gps:
d['argument_groups'] = \
[CLIArgumentGroup.from_dict(gp_d) for gp_d in arg_gps]
return cls(**d)
[docs] def iter_args(self, filter_class=None):
"""Iterator over all :class:`.CLIArgument` objects associated with this
parser.
"""
def _iter_all_args():
yield from self.arguments
for arg_gp in self.argument_groups:
yield from arg_gp.arguments
if filter_class is None:
yield from _iter_all_args()
else:
filter_fn = (lambda arg: isinstance(arg.action, type) \
and issubclass(arg.action, filter_class))
yield from filter(filter_fn, _iter_all_args())
[docs] def add_plugin_args(self, preparsed_d):
"""Revise arguments after we know what plugins are being used. This
annotates the help string of the plugin selector argument and configures
its ``choices`` attribute. It then inserts the plugin-specifc CLI
arguments following that argument.
Args:
preparsed_d: dict of results of the preparsing operation. Keys are
the destination strings of the plugin selector arguments
(identified by having their ``action`` set to
:class:`.PluginArgAction`), and values are the values assigned
to them by preparsing.
"""
def _add_plugins_to_arg_list(arg_list, splice_d):
# insert plugin args into arg_list
return util.splice_into_list(
arg_list, splice_d, operator.attrgetter('dest'), log=_log
)
config = CLIConfigManager()
d = dict()
for flag_name, flag_value in preparsed_d.items():
plugin = config.get_plugin(flag_name, flag_value)
if not plugin:
choices = [f"'{x}'" for x in config.get_plugin(flag_name).keys()]
_log.critical(("%s: error: argument --%s: invalid choice: '%s' "
"(choose from %s)"),
_SCRIPT_NAME, flag_name, flag_value, ', '.join(choices)
)
exit(2) # exit code for CLI syntax error
d[flag_name] = list(plugin.cli.iter_args())
self.arguments = _add_plugins_to_arg_list(self.arguments, d)
for arg_gp in self.argument_groups:
arg_gp.arguments = _add_plugins_to_arg_list(arg_gp.arguments, d)
for arg in self.iter_args(filter_class=PluginArgAction):
if arg.help == argparse.SUPPRESS:
# skip hidden CLI items
continue
flag_value = preparsed_d[arg.name]
plugin = config.get_plugin(arg.name, flag_value)
if plugin.help:
arg.help = arg.help.strip() + \
f" Selected value = '{flag_value}': {plugin.help.strip()} "
else:
arg.help = arg.help.strip() + f" Selected value = '{flag_value}'. "
arg.help = arg.help + word_wrap(f"""
NOTE: flags below are specific to this value. Set a different
value along with '--help' to see flags for that option.
""")
[docs]@dataclasses.dataclass
class CLICommand(object):
"""Class holding configuration options for a subcommand (invoked via a
subparser) or a plugin.
"""
name: str
entry_point: str
help: str = ""
cli_file: str = None
cli: dict = None
parser: dataclasses.field(init=False) = None
code_root: dataclasses.InitVar = ""
def __post_init__(self, code_root):
"""Post-initialization type converstion of attributes.
"""
if self.cli is None and self.cli_file is not None:
try:
self.cli = util.read_json(
os.path.join(code_root, self.cli_file), log=_log
)
except util.MDTFFileNotFoundError:
_log.critical("Couldn't find CLI file %s.", self.cli_file)
exit(2) # exit code for CLI syntax error
if self.cli is not None:
self.cli = CLIParser.from_dict(self.cli)
[docs] def import_target(self):
"""Imports the function or class referred to by the ``entry_point``
attribute.
"""
mod_name, cls_name = self.entry_point.split(':')
try:
mod_ = importlib.import_module(mod_name)
except ImportError:
_log.error('Unable to import %s.', mod_name)
raise ValueError(self.entry_point)
try:
return getattr(mod_, cls_name)
except Exception:
_log.error('Unable to import %s in %s.', cls_name, mod_name)
raise ValueError(self.entry_point)
[docs] def call(self, *args, **kwargs):
"""Imports the function or class referred to by the
``entry_point`` attribute, and calls it with the passed arguments.
"""
cls_ = self.import_target()
instance = cls_(*args, **kwargs)
return instance
DefaultsFileTypes = util.MDTFEnum('DefaultsFileTypes', 'USER SITE GLOBAL')
DefaultsFileTypes.__doc__ = """
:class:`~util.MDTFEnum` to distinguish the three different categories of
input settings files. In order of precedence:
1. ``USER``: Input settings read from a file supplied by the user.
2. ``SITE``: Settings specific to the given site (``--site`` flag.)
3. ``GLOBAL``: Settings applicable to all sites. The main intended use case
of this file is to enable the user to configure a default site at
install time.
"""
[docs]class CLIConfigManager(util.Singleton):
""":class:`~src.util.Singleton` to handle search, loading and parsing
of configuration files for the CLI and CLI default values. We encapsulate
this functionality in its own class, instead of :class:`.MDTFArgParser` or
its children, to try to make the code easier to understand (not out of
necessity).
.. warning::
This is intended to be initialized by a calling script *before* being
referenced by the classes in this module.
"""
def __init__(self, code_root=None, skip_defaults=False):
# singleton, so this will only be invoked once
self.code_root = code_root
self.skip_defaults = skip_defaults
self.site = self.default_site
self.subcommands = dict()
self.subcommand_files = []
self.subparser_kwargs = dict()
self.plugins = dict()
self.plugin_files = []
self.defaults_files = dict()
self.site_defaults = {'site': self.default_site}
self.user_defaults = dict()
default_site = 'local'
defaults_filename = "defaults.jsonc"
subcommands_filename = "cli_subcommands.jsonc"
plugins_filename = "cli_plugins.jsonc"
@property
def framework_dir(self):
return os.path.join(self.code_root, 'src')
@property
def sites_dir(self):
return os.path.join(self.code_root, 'sites')
@property
def site_dir(self):
assert self.site is not None
return os.path.join(self.sites_dir, self.site)
[docs] def read_defaults(self, def_type, path=None):
"""Populate one of the entries in ``self.defaults`` by reading from the
appropriate defaults file.
Args:
def_type (:class:`DefaultsFileTypes`): Type of defaults file to read.
path (str, optional): path of the file. Only used for user-specified
defaults.
"""
if self.skip_defaults:
return
if def_type == DefaultsFileTypes.GLOBAL:
# NB file lives in "sites_dir", not a "site_dir" for a given site
path = os.path.join(self.sites_dir, self.defaults_filename)
dest_d = self.site_defaults
elif def_type == DefaultsFileTypes.SITE:
path = os.path.join(self.site_dir, self.defaults_filename)
dest_d = self.site_defaults
elif def_type == DefaultsFileTypes.USER:
assert path # is not none
dest_d = self.user_defaults
try:
d = util.read_json(path, log=_log)
self.defaults_files[def_type] = path
# drop values equal to the empty string
d = {k:v for k,v in d.items() if (v is not None and v != "")}
dest_d.update(d)
except util.MDTFFileNotFoundError:
_log.debug('Config file %s not found; not updating defaults.', path)
[docs] def site_default_text(self):
files_str = [self.defaults_files.get(x, None) for x in [
DefaultsFileTypes.SITE, DefaultsFileTypes.GLOBAL]]
files_str = '\n'.join([x for x in files_str if x is not None])
if files_str:
return "Default values have been set from the following files:" \
+ '\n' + files_str
else:
return None
[docs] def read_subcommands(self):
"""Populates ``subcommands`` and ``subparser_kwargs`` attributes with
contents of CLI plugin files for the framework and site. Site-specific
subcommand definitions override those defined on the framework.
"""
(site_d, fmwk_d) = read_config_files(
self.code_root, self.subcommands_filename, self.site
)
site_cmds = site_d.pop('subcommands', dict())
fmwk_cmds = fmwk_d.pop('subcommands', dict())
self.subparser_kwargs = fmwk_d
self.subparser_kwargs.update(site_d)
self.subcommands = {
k: CLICommand(name=k, **v, code_root=self.code_root) \
for k,v in fmwk_cmds.items()
}
for k,v in site_cmds.items():
if k in self.subcommands:
_log.debug("Replacing subcommand '%s' with site-specific version.", k)
self.subcommands[k] = CLICommand(name=k, **v, code_root=self.code_root)
[docs] def read_plugins(self):
"""Populates ``plugins`` attribute with contents of CLI plugin files for
the framework and site.
"""
def _add_new_plugin_type(plugin_arg, arg_choices):
self.plugins[plugin_arg] = {
plugin_key(k): CLICommand(name=k, **v, code_root=self.code_root) \
for k,v in arg_choices.items()
}
(site_d, fmwk_d) = read_config_files(
self.code_root, self.plugins_filename, self.site
)
for k, v in fmwk_d.items():
_add_new_plugin_type(k, v)
for k, v in site_d.items():
if k not in self.plugins:
_add_new_plugin_type(k, v)
continue
for kk, vv in v.items():
p_key = plugin_key(kk)
if p_key in self.plugins[k]:
_log.debug(
'Replacing plugin %s (for %s) with site-specific version.',
kk, k
)
self.plugins[k][p_key] = \
CLICommand(name=kk, **vv, code_root=self.code_root)
[docs] def get_plugin(self, plugin_name, choice_of_plugin=None):
"""Lookup requested CLI plugin from ``plugins`` attribute, logging
appropriate errors where KeyErrors would be raised.
Args:
plugin_name (str): Name of the plugin selected.
choice_of_plugin (str, optional): if provided, the name of the
choice of plugin.
Returns: :class:`.CLICommand` object corresponding to the requested
plugin choice if both arguments are given, or a dict of recognized
choices if only the first argument is given.
"""
if plugin_name not in self.plugins:
_log.error('Plugin %s not found (recognized: %s)',
plugin_name, str(list(self.plugins.keys()))
)
return dict()
if choice_of_plugin is None:
# return entire dict
return self.plugins[plugin_name]
p_key = plugin_key(choice_of_plugin)
if p_key not in self.plugins[plugin_name]:
_log.critical(("%s: error: argument --%s: invalid choice: '%s' "
"(choose from %s)"),
_SCRIPT_NAME, plugin_name, choice_of_plugin,
str(list(self.plugins[plugin_name].keys()))
)
exit(2) # exit code for CLI syntax error
return self.plugins[plugin_name][p_key]
# ===========================================================================
# CLI parsers
[docs]class MDTFArgParser(argparse.ArgumentParser):
"""Customized :py:class:`argparse.ArgumentParser`. Added functionality:
- Configuring the parser from an external file (:meth:`~MDTFArgParser.configure`).
- Customized help text formatting provided by :class:`.CustomHelpFormatter`.
- Recording whether the user specified each argument value, or whether the
default was used, via :class:`.RecordDefaultsAction`.
- Better bookkeeping of `argument groups
<https://docs.python.org/3.7/library/argparse.html#argument-groups>`__,
e.g. which arguments belong to which group.
"""
def __init__(self, *args, **kwargs):
# Dict to store whether default value was used, for arguments using the
# RecordDefaultsAction
self.is_default = dict()
kwargs['formatter_class'] = CustomHelpFormatter
super(MDTFArgParser, self).__init__(*args, **kwargs)
self._positionals.title = None
self._optionals.title = 'COMMAND OPTIONS'
[docs] @staticmethod
def split_args(argv):
"""Wrapper for :py:meth:`shlex.split`.
"""
if isinstance(argv, str):
argv = shlex.split(argv, posix=True)
return argv
[docs] def iter_actions(self):
"""Iterator over :py:class:`~argparse.Action` objects associated with
all user-defined arguments in parser, as well as those for any
subcommands.
"""
def _iter_all_actions():
for act in self._actions:
if isinstance(act, argparse._SubParsersAction):
choices = getattr(act, 'choices', dict())
for p in choices.values():
yield from p._actions
else:
yield act
def _pred(act):
return not isinstance(act,
(argparse._HelpAction, argparse._VersionAction))
return filter(_pred, _iter_all_actions())
[docs] def _set_is_default(self, parsed_args):
"""Populates the ``is_default`` attribute based on whether the user
explicitly specified a value, or whether a default was used.
Args:
parsed_args: dict of args and values returned by initial parsing.
"""
self.is_default = dict() # clear in case we parsed previously
for act in self.iter_actions():
if isinstance(act, RecordDefaultsAction):
default_value_flag = act.dest + act.default_value_suffix
if default_value_flag in parsed_args:
self.is_default[act.dest] = False
# delete the flag set by RecordDefaultsAction.__call__,
# since we're transferring the information to is_default
del parsed_args[default_value_flag]
else:
self.is_default[act.dest] = True
else:
# check if value is equal to default; doesn't handle the case
# where the user set the option equal to its default value
# (which is why RecordDefaultsAction is necessary.)
self.is_default[act.dest] = (act.dest is act.default)
[docs] def _call_actions_on_defaults(self, namespace):
"""Subclasses of :py:class:`argparse.Action` are only called on
user-supplied values, not default values. If the ``call_on_defaults``
flag has been set on our custom actions, call the action on default
values to do the same parsing for default values that we would've done
for user input.
"""
for act in self.iter_actions():
if isinstance(act, RecordDefaultsAction) and act.call_on_defaults:
values = getattr(namespace, act.dest, None)
act(self, namespace, values, None)
[docs] def _default_argv(self, parsed_args):
"""Utility method returning the arguments passed to the parser for
lowest-priority defaults in
:meth:`.MDTFArgParser.parse_known_args`.
"""
config = CLIConfigManager()
default_site = parsed_args.get('site', config.default_site)
return ['--site', util.from_iter(default_site)]
[docs] def parse_known_args(self, args=None, namespace=None):
"""Wrapper for :py:meth:`~argparse.ArgumentParser.parse_known_args` which
handles intermediate levels of default settings derived from the
user's settings files. These override defaults defined in the parser
itself. The precedence order is:
1. Argument values explictly given by the user on the command line, as
recorded in the ``is_default`` attribute of :class:`.MDTFArgParser`.
2. Argument values from a file the user gave via the ``-f`` flag.
(CLIConfigManager.defaults[DefaultsFileTypes.USER]).
3. Argument values specified as the default values in the argument
parser, which in turn are set with the following precedence order:
a. Default values from a site-specfic file (defaults.jsonc), stored in
CLIConfigManager.defaults[DefaultsFileTypes.SITE].
b. Default values from a defaults.jsonc file in the sites/ directory,
stored in CLIConfigManager.defaults[DefaultsFileTypes.GLOBAL].
c. Default values hard-coded in the CLI definition file itself.
Args:
args (optional): String or list of strings to parse. If a single
string is passed, it's split using :py:meth:`shlex.split`.
If not supplied, the default behavior parses :py:meth:`sys.argv`.
namespace (optional): An object to store the parsed arguments.
The default is a new empty :py:class:`argparse.Namespace` object.
Returns:
Tuple of 1) populated namespace containing parsed arguments and 2)
unrecognized arguments, as with
:py:meth:`argparse.ArgumentParser.parse_known_args`.
"""
def _to_dict(ns):
if isinstance(ns, argparse.Namespace):
return vars(ns)
else:
return dict(ns)
config = CLIConfigManager()
try:
(parsed_args, remainder) = super(MDTFArgParser, self).parse_known_args(
self.split_args(args), None
)
except SystemExit as exc:
if exc.code != 0:
# hit a parse error; include description of the source of error.
print("Error occurred in user-supplied explict CLI flags.")
raise
parsed_args = _to_dict(parsed_args)
self._set_is_default(parsed_args)
# Highest priority: options that were explicitly set by user on CLI
# Note that is_default[opt] = None (not True or False) if no default
# value is defined for that option.
user_cli_opts = {k:v for k,v in parsed_args.items() \
if not self.is_default.get(k, True)}
# Lowest priority: set of defaults from running parser on empty input
try:
parser_defaults, _ = super(MDTFArgParser, self).parse_known_args(
self._default_argv(parsed_args), None
)
except SystemExit as exc:
if exc.code != 0:
# hit a parse error; include description of the source of error.
print("Error occurred in CLI definitions.")
raise
# CLI opts override options set from file, which override defaults
parsed_args = _to_dict(collections.ChainMap(
user_cli_opts, config.user_defaults, vars(parser_defaults)
))
if namespace is None:
namespace = argparse.Namespace(**parsed_args)
else:
for k,v in parsed_args.items():
setattr(namespace, k, v)
self._call_actions_on_defaults(namespace)
return (namespace, remainder)
[docs] def parse_args(self, args=None, namespace=None):
"""Subclassed implementation of :py:meth:`~argparse.ArgumentParser.parse_args`
which wraps :meth:`~.MDTFArgParser.parse_known_args`.
"""
args, argv = self.parse_known_args(args, namespace)
if argv:
_log.error('unrecognized arguments: %s', ' '.join(argv))
return args
[docs]class MDTFArgPreparser(MDTFArgParser):
"""Parser class used to "preparse" plugin selector arguments, to determine
which plugins to use. Plugin selector arguments are identified by having
their ``action`` set to :class:`.PluginArgAction`.
"""
def __init__(self):
super(MDTFArgPreparser, self).__init__(add_help=False)
[docs] def parse_site(self, argv=None, default_site=None):
"""Wrapper for :py:meth:`~argparse.ArgumentParser.parse_known_args`
used to determine what site to use.
"""
namespace = self.parse_known_args(argv)[0]
return getattr(namespace, 'site', default_site)
[docs] def parse_plugins(self, argv=None):
"""Wrapper for :py:meth:`~argparse.ArgumentParser.parse_known_args`
used to parse the plugin selector arguments.
"""
d = vars(self.parse_known_args(argv)[0])
keys = [act.dest for act in self.iter_actions() \
if isinstance(act, PluginArgAction)]
return {k: d.get(k, None) for k in keys}
[docs]class MDTFTopLevelArgParser(MDTFArgParser):
"""Class for constructing the command-line interface, parsing the options,
and handing off execution to the selected subcommand.
"""
def __init__(self, code_root, skip_defaults=False, argv=None):
_ = CLIConfigManager(code_root, skip_defaults=skip_defaults)
self.code_root = code_root
self.installed = False
self.sites = []
self.site = None
if argv is None:
self.argv = sys.argv[1:]
else:
self.argv = self.split_args(argv)
self.file_case_list = []
self.config = dict()
self.log_config = dict()
self.imports = dict()
self.setup()
[docs] def iter_arg_groups(self, subcommand=None):
config = CLIConfigManager()
if subcommand:
subcmds = config.subcommands.get(subcommand, [])
else:
subcmds = list(config.subcommands.values())
for cmd in subcmds:
if hasattr(cmd, 'cli') and cmd.cli:
yield from cmd.cli.argument_groups
[docs] def iter_group_actions(self, subcommand=None, group=None):
groups = util.to_iter(group)
for arg_gp in self.iter_arg_groups(subcommand=subcommand):
if groups:
if arg_gp.title in groups:
yield from arg_gp.arguments
else:
yield from arg_gp.arguments
[docs] def init_user_defaults(self):
"""Set user defaults using values read in from a configuration
file in one of two formats.
Args:
config_str (str): contents of the configuration file, either:
1. A JSON/JSONC file of key-value pairs. This is parsed using
:func:`~src.util.filesystem.parse_json`.
2. A plain text file containing flags and arguments as they would
be passed on the command line (except shell expansions are not
performed). This is parsed by the :meth:`MDTFArgParser.parse_args`
method of the configured parser.
The format is determined from context. ValueError is raised if the string
cannot be parsed.
"""
config = CLIConfigManager()
input_p = MDTFArgPreparser()
self.add_input_file_arg(input_p)
path = input_p.parse_input_file(self.argv)
if not path:
return
try:
with io.open(path, 'r', encoding='utf-8') as f:
str_ = f.read()
except Exception:
_log.exception("Can't read user input file at %s.", path)
raise ValueError()
if not str_:
return
# try to determine if file is json
if 'json' in os.path.splitext(path)[1].lower():
# assume config_file a JSON dict of option:value pairs.
try:
d = util.parse_json(str_)
self.file_case_list = d.pop('case_list', [])
d = {canonical_arg_name(k): v for k,v in d.items()}
config.user_defaults.update(d)
except json.JSONDecodeError as exc:
sys.exit(f"ERROR: JSON syntax error in {path}:\n\t{exc}")
except Exception:
_log.exception('Attempted to parse %s as JSONC; failed.', path)
raise ValueError()
else:
# assume config_file is a plain text file containing flags, etc.
# as they would be passed on the command line.
try:
self.argv = self.argv + shlex.split(str_, comments=True, posix=True)
except Exception:
_log.exception(
'Attempted to parse %s as shell input; failed.', path)
raise ValueError()
[docs] def add_site_arg(self, target_p):
"""Convenience method to add the argument flag to select which
site-specific code to use, to the parser ``target_p`` (either the
top-level parser, or the preparser.)
"""
config = CLIConfigManager()
kwargs = {'default': config.default_site, 'nargs': 1}
if isinstance(target_p, MDTFTopLevelArgParser):
kwargs.update({
'choices': self.sites,
'help': word_wrap(f"""
Site-specific functionality to use. Options below are
specific to the selected value '{self.site}'.
""")
})
target_p.add_argument('--site', '-s', **kwargs)
[docs] def init_site(self):
"""We allow site-specific installations to customize the CLI, so before
we construct the CLI parser we need to determine what site to use. We do
this by running a parser that only looks for the ``--site`` flag.
This sets the ``site`` attribute and populates the global and
site-specific settings dicts.
"""
config = CLIConfigManager()
config.read_defaults(DefaultsFileTypes.GLOBAL)
default_site = config.site_defaults.get('site', config.default_site)
self.sites = [d for d in os.listdir(config.sites_dir) \
if os.path.isdir(os.path.join(config.sites_dir, d)) \
and not d.startswith(('.','_'))]
# TODO: if we're checking to see if installer has been run, only set
# self.installed = True if default_site in self.sites
self.installed = True
site_p = MDTFArgPreparser()
self.add_site_arg(site_p)
site = util.from_iter(site_p.parse_site(self.argv, default_site))
if site not in self.sites \
and not (site == default_site and not self.installed):
_log.critical("Requested site %s not found in sites directory %s.",
site, config.sites_dir)
exit(2) # exit code for CLI syntax error
config.default_site = default_site
config.site = site
self.site = site
config.read_defaults(DefaultsFileTypes.SITE)
[docs] def add_contents(self, target_p):
"""Convenience method to fully configure a parser ``target_p`` (either
the top-level parser, or the preparser), adding subparsers for all
subcommands.
"""
config = CLIConfigManager()
self.add_site_arg(target_p)
self.add_input_file_arg(target_p)
assert len(config.subcommands) == 1
cmd = tuple(config.subcommands.values())[0]
cmd.cli.configure(target_p)
[docs] def setup(self):
"""Method to wrap all configuration methods needed to configure the
parser before calling parse_arg: reading the defaults files and
configuring plugins based on existing values.
"""
config = CLIConfigManager()
self.init_user_defaults()
self.init_site()
config.read_subcommands()
config.subparser_kwargs.update({
'required':True, 'dest':'subcommand', 'parser_class': MDTFArgParser
})
config.read_plugins()
# preparse arguments to get plugin configuration, and revise CLI
temp_p = MDTFArgPreparser()
self.add_contents(temp_p)
plugin_args = temp_p.parse_plugins(self.argv)
for cmd in config.subcommands.values():
cmd.cli.add_plugin_args(plugin_args)
# Build the real CLI parser now that we have plugins
self.configure()
[docs] def parse_args(self, args=None, namespace=None):
"""Wrapper for :py:meth:`~argparse.ArgumentParser.parse_args` which
handles intermediate levels of default settings.
"""
if args is None:
args = self.argv
else:
args = self.split_args(args)
return super(MDTFTopLevelArgParser, self).parse_args(args, namespace)
[docs] def parse_known_args(self, args=None, namespace=None):
"""Wrapper for :py:meth:`~argparse.ArgumentParser.parse_known_args` which
handles intermediate levels of default settings.
"""
if args is None:
args = self.argv
else:
args = self.split_args(args)
return super(MDTFTopLevelArgParser, self).parse_known_args(args, namespace)
[docs] def dispatch(self, args=None):
"""Parse args, and call the subcommand that was selected.
"""
config = CLIConfigManager()
# finally parse the user's CLI arguments
self.config = vars(self.parse_args(args))
# log use of site-wide default files here (not earlier, in case user
# just wanted --help or --version)
defaults_text = config.site_default_text()
if defaults_text:
_log.info(defaults_text)
# import plugin classes
for act in self.iter_actions():
if isinstance(act, ClassImportAction):
key = act.dest
assert key in self.config
plugin_cmd = config.get_plugin(key, self.config[key])
self.imports[key] = plugin_cmd.import_target()
# multiple subcommand functionality not being used yet
assert len(config.subcommands) == 1
cmd = tuple(config.subcommands.values())[0]
return cmd.call(self)
[docs]class MDTFTopLevelSubcommandArgParser(MDTFTopLevelArgParser):
"""Implement top-level parser with multiple subcommands.
"""
[docs] def _default_argv(self, parsed_args):
"""Utility method returning the arguments passed to the parser for
lowest-priority defaults in
:meth:`.MDTFArgParser.parse_known_args`.
"""
config = CLIConfigManager()
return [
'--site', parsed_args.get('site', config.default_site),
parsed_args.get('subcommand', 'help')
]
[docs] def add_contents(self, target_p):
"""Convenience method to fully configure a parser ``target_p`` (either
the top-level parser, or the preparser), adding subparsers for all
subcommands.
"""
config = CLIConfigManager()
add_help = isinstance(target_p, MDTFTopLevelArgParser)
self.add_site_arg(target_p)
self.add_input_file_arg(target_p)
sub_p = target_p.add_subparsers(**config.subparser_kwargs)
_ = sub_p.add_parser(
"help", help="Show this help message and exit.", add_help=add_help
)
for cmd in config.subcommands.values():
cmd.parser = sub_p.add_parser(
cmd.name, help=cmd.help, add_help=add_help,
usage=cmd.cli.usage, description=cmd.cli.description
)
cmd.cli.configure(cmd.parser)
[docs] def dispatch(self):
raise NotImplementedError()