"""Utilities related to configuration and handling of framework logging.
"""
import os
import sys
import argparse
import datetime
import io
import subprocess
import signal
import traceback
from . import basic, exceptions
import logging
import logging.config
import logging.handlers
_log = logging.getLogger(__name__)
[docs]class MDTFConsoleHandler(logging.StreamHandler):
"""Dummy class to designate logging to stdout or stderr from the root logger.
"""
pass
[docs]class StringIOHandler(logging.StreamHandler):
""":py:class:`~logging.StreamHandler` instance that writes log entries to
an internal :py:class:`~io.StringIO` buffer.
"""
def __init__(self):
self._log_buffer = io.StringIO()
super(StringIOHandler, self).__init__(stream=self._log_buffer)
[docs] def reset_buffer(self):
# easier to create a new buffer than reset the existing one
new_buffer = io.StringIO()
old_buffer = self.setStream(new_buffer)
old_buffer.close()
self._log_buffer = new_buffer
[docs] def close(self):
super(StringIOHandler, self).close()
self._log_buffer.close()
[docs] def buffer_contents(self):
"""Return contents of buffer as a string."""
return self._log_buffer.getvalue()
[docs]class MultiFlushMemoryHandler(logging.handlers.MemoryHandler):
"""Subclass :py:class:`logging.handlers.MemoryHandler` to enable flushing
the contents of its log buffer to multiple targets. We do this to solve the
chicken-and-egg problem of logging any events that happen before the log
outputs are configured: those events are captured by an instance of this
handler and then transfer()'ed to other handlers once they're set up.
See `<https://stackoverflow.com/a/12896092>`__.
"""
[docs] def transfer(self, target_handler):
"""Transfer contents of buffer to target_handler.
Args:
target_handler (:py:class:`logging.Handler`): log handler to transfer
contents of buffer to.
"""
self.acquire()
try:
self.setTarget(target_handler)
if self.target:
for record in self.buffer:
if self.target.level <= record.levelno:
self.target.handle(record)
# self.buffer = [] # don't clear buffer!
finally:
self.release()
[docs] def transfer_to_non_console(self, logger):
"""Transfer contents of buffer to all non-console-based handlers attached
to *logger* (handlers that aren't :py:class:`MDTFConsoleHandler`.)
If no handlers are attached to the logger, a warning is printed and the
buffer is transferred to the :py:class:`logging.lastResort` handler, i.e.
printed to stderr.
Args:
logger (:py:class:`logging.Logger`): logger to transfer
contents of buffer to.
"""
no_transfer_flag = True
for h in logger.handlers:
if not isinstance(h, (MultiFlushMemoryHandler, MDTFConsoleHandler)):
self.transfer(h)
no_transfer_flag = False
if no_transfer_flag:
logger.warning("No non-console loggers configured.")
self.transfer(logging.lastResort)
[docs]class HeaderFileHandler(logging.FileHandler):
"""Subclass :py:class:`logging.FileHandler` to print system information to
start of file without writing it to other loggers.
"""
[docs] def _log_header(self):
return ""
[docs] def _open(self):
"""Write header information right after we open the log file, then
proceed normally.
"""
fp = super(HeaderFileHandler, self)._open()
fp.write(self._log_header())
return fp
[docs]class MDTFHeaderFileHandler(HeaderFileHandler):
[docs] def _log_header(self):
return mdtf_log_header("MDTF PACKAGE LOG")
[docs]def _hanging_indent(str_, initial_indent, subsequent_indent):
"""Poor man's indenter. Easier than using textwrap for this case.
Args:
str_ (str): String to be indented.
initial_indent (str): string to insert as the indent for the first
line.
subsequent_indent (str): string to insert as the indent for all
subsequent lines.
Returns:
Indented string.
"""
if isinstance(initial_indent, int):
initial_indent = initial_indent * ' '
if isinstance(subsequent_indent, int):
subsequent_indent = subsequent_indent * ' '
lines_ = str_.splitlines()
lines_out = []
if len(lines_) > 0:
lines_out = lines_out + [initial_indent + lines_[0]]
if len(lines_) > 1:
lines_out = lines_out + [(subsequent_indent+l) for l in lines_[1:]]
return '\n'.join(lines_out)
[docs]class _LevelFilterBase(logging.Filter):
"""Base class to implement common features of level-based filters."""
def __init__(self, name="", level=None):
super(_LevelFilterBase, self).__init__(name=name)
if level is None:
level = logging.NOTSET
if not isinstance(level, int):
if hasattr(logging, str(level)):
level = getattr(logging, str(level))
else:
level = int(level)
self.levelno = level
[docs]class GeqLevelFilter(_LevelFilterBase):
""":py:class:`logging.Filter` to include only log messages with a severity of
*level* or worse. This is normally done by setting the *level* attribute on a
:py:class:`logging.Handler`, but we need to add a filter when transferring
records from another logger, as shown in
`<https://stackoverflow.com/a/24324246>`__."""
[docs] def filter(self, record):
return record.levelno >= self.levelno
[docs]class LtLevelFilter(_LevelFilterBase):
""":py:class:`logging.Filter` to include only log messages with a severity
less than *level*.
"""
[docs] def filter(self, record):
return record.levelno < self.levelno
[docs]class EqLevelFilter(_LevelFilterBase):
""":py:class:`logging.Filter` to include only log messages with a severity
equal to *level*.
"""
[docs] def filter(self, record):
return record.levelno == self.levelno
[docs]class NameMatchFilter(logging.Filter):
""":py:class:`logging.Filter` that rejects all log events coming from child
loggers.
Intended to be attached to a handler -- the effect of attaching this to a
logger is the same as setting *propagate* = False on it.
"""
def __init__(self, name=""):
super(NameMatchFilter, self).__init__()
self._name = name
[docs] def filter(self, record):
return (record.name == self._name)
OBJ_LOG_TAG_ATTR_NAME = 'tags'
[docs]class TagMatchFilter(logging.Filter):
""":py:class:`logging.Filter` which only accepts records having the
designated combination of 'tag' attributes. These are assigned by the
methods in :class:`MDTFObjectLogger` or can be passed via the 'extra' kwarg
on any logger (see discussion in entry for
<https://docs.python.org/3.7/library/logging.html#logging.Logger.debug>__.)
"""
def __init__(self, name="", tags=None):
super(TagMatchFilter, self).__init__(name=name)
self._tags = basic.to_iter(tags, set)
[docs] def filter(self, record):
if not hasattr(record, OBJ_LOG_TAG_ATTR_NAME):
return False
tags = basic.to_iter(record.tags, set)
return self._tags.issubset(tags)
# ------------------------------------------------------------------------------
# standardize
OBJ_LOG_ROOT = 'MDTF' # "root logger" of the object logger hierarchy
ObjectLogTag = basic.MDTFEnum(
"ObjectLogTag",
"NC_HISTORY BANNER IN_FILE OUT_FILE",
module=__name__
)
ObjectLogTag.__doc__ = """Standardized values that the package-defined *tags*
attribute on :py:class:`~logging.LogRecord` objects can take, and that
:class:`TagMatchFilter` can listen for.
"""
[docs]class MDTFObjectLogger(logging.Logger):
"""This class wraps functionality for use by :class:`MDTFObjectLoggerMixin`:
- A :py:class:`~logging.Logger` to record events affecting the parent object
only. This logger does not propagate events up the log hierarchy: the
module-level logger should be passed if that functionality is desired.
- A list for holding :py:class:`Exception` objects received by the parent
object.
"""
def __init__(self, name):
super(MDTFObjectLogger, self).__init__(name)
self._exceptions = []
self._tracebacks = []
[docs] def log(self, level, msg, *args, **kw):
# add "tags" attribute to all emitted LogRecords
if 'extra' not in kw:
kw['extra'] = dict()
kw['extra'][OBJ_LOG_TAG_ATTR_NAME] = basic.to_iter(
kw['extra'].get(OBJ_LOG_TAG_ATTR_NAME, None), set
)
if OBJ_LOG_TAG_ATTR_NAME in kw:
kw['extra'][OBJ_LOG_TAG_ATTR_NAME].update(
basic.to_iter(kw[OBJ_LOG_TAG_ATTR_NAME])
)
del kw[OBJ_LOG_TAG_ATTR_NAME]
super(MDTFObjectLogger, self).log(level, msg, *args, **kw)
# wrap convenience methods
# nb: typo in https://github.com/python/cpython/blob/3.7/Lib/logging/__init__.py line 1407
[docs] def debug(self, msg, *args, **kw):
self.log(logging.DEBUG, msg, *args, **kw)
[docs] def info(self, msg, *args, **kw):
self.log(logging.INFO, msg, *args, **kw)
[docs] def warning(self, msg, *args, **kw):
self.log(logging.WARNING, msg, *args, **kw)
[docs] def error(self, msg, *args, **kw):
self.log(logging.ERROR, msg, *args, **kw)
[docs] def critical(self, msg, *args, **kw):
self.log(logging.CRITICAL, msg, *args, **kw)
[docs] def exception(self, msg, *args, exc_info=True, **kw):
if exc_info:
exc_type, _, _ = sys.exc_info()
if issubclass(exc_type, exceptions.MDTFEvent):
# "exception" isn't an exception, just an event, so log with
# severity info
self.log(logging.INFO, msg, *args, exc_info=False, **kw)
return
self.log(logging.ERROR, msg, *args, exc_info=exc_info, **kw)
# exception object storage
@property
def has_exceptions(self):
return (len(self._exceptions) > 0)
[docs] def store_exception(self, exc):
# add Exception object to internal list
self._exceptions.append(exc)
tb_exc = traceback.TracebackException(*(sys.exc_info()))
self._tracebacks.append(tb_exc)
[docs] @classmethod
def get_logger(cls, log_name):
# logger objects have global scope (calling getLogger with the same name
# returns the same object, like a Singleton), so need to toggle global
# state
old_log_class = logging.getLoggerClass()
logging.setLoggerClass(cls)
log = logging.getLogger(log_name)
logging.setLoggerClass(old_log_class)
# default settings
log.propagate = True
log.setLevel(logging.NOTSET)
return log
[docs]class MDTFObjectLoggerMixinBase():
pass
[docs]class MDTFObjectLoggerMixin(MDTFObjectLoggerMixinBase):
"""Class to implement per-object logging via inheritance. Used for
:class:`~diagnostic.VarlistEntry` and :class:`~diagnostic.Diagnostic`.
Based on `<https://stackoverflow.com/q/57813096>`__.
This wraps related functionalities:
- A :py:class:`~logging.Logger` to record events affecting the object
only. Log messages are cached in a :py:class:`~io.StringIO` buffer.
- A method :meth:`~MDTFObjectLogger.format` for formatting the contents of
the above into a string, along with log messages of any child objects.
This is intended for preparing log files (at the per-POD or per-case
level); logging intended for the console should use the module loggers.
"""
[docs] def init_log(self, fmt=None):
# Mixin class, so no __init__ for simplicity
if fmt is None:
fmt = '%(levelname)s: %(message)s'
assert hasattr(self, 'log')
self._log_handler = StringIOHandler()
# don't record events from children in StringIO buffer
self._log_handler.addFilter(NameMatchFilter(self._log_name))
formatter = logging.Formatter(fmt=fmt, datefmt='%H:%M:%S')
self._log_handler.setFormatter(formatter)
self.log.addHandler(self._log_handler)
self.init_extra_log_handlers()
[docs] def init_extra_log_handlers(self):
# add and name class-specific log handlers
pass
@property
def last_exception(self):
if self.log.has_exceptions:
return self.log._exceptions[-1]
else:
return None
[docs]class VarlistEntryLoggerMixin(MDTFObjectLoggerMixin):
"""Mixin configuring log functionality for :class:`~diagnostic.VarlistEntry`.
"""
[docs] def init_extra_log_handlers(self):
# add extra handler for additions to netCDF history attribute
self._nc_history_log = StringIOHandler()
self._nc_history_log.addFilter(TagMatchFilter(tags=ObjectLogTag.NC_HISTORY))
formatter = logging.Formatter(
fmt='%(asctime)s: MDTF package: %(message)s',
datefmt='%a %b %d %H:%M:%S %Y' # mimic date format used by NCO, CDO
)
self._nc_history_log.setFormatter(formatter)
self.log.addHandler(self._nc_history_log)
[docs]class _CaseAndPODHandlerMixin():
"""Common methods for :class:`PODLoggerMixin`, :class:`CaseLoggerMixin`.
"""
[docs] def init_extra_log_handlers(self):
# add handler for warning banner
self._banner_log = StringIOHandler()
self._banner_log.addFilter(TagMatchFilter(tags=ObjectLogTag.BANNER))
formatter = logging.Formatter(
fmt='%(levelname)s: %(message)s', datefmt='%H:%M:%S'
)
self._banner_log.setFormatter(formatter)
self.log.addHandler(self._banner_log)
# add handlers for log of data files used
formatter = logging.Formatter(fmt='%(message)s', datefmt='%H:%M:%S')
self._in_file_log = StringIOHandler()
self._in_file_log.addFilter(TagMatchFilter(tags=ObjectLogTag.IN_FILE))
self._in_file_log.setFormatter(formatter)
self.log.addHandler(self._in_file_log)
self._out_file_log = StringIOHandler()
self._out_file_log.addFilter(TagMatchFilter(tags=ObjectLogTag.OUT_FILE))
self._out_file_log.setFormatter(formatter)
self.log.addHandler(self._out_file_log)
[docs]class PODLoggerMixin(_CaseAndPODHandlerMixin, MDTFObjectLoggerMixin):
"""Mixin configuring log functionality for :class:`~diagnostic.Diagnostic`.
"""
pass
[docs]class CaseLoggerMixin(_CaseAndPODHandlerMixin, MDTFObjectLoggerMixinBase):
"""Mixin configuring log functionality for :class:`~data_manager.DataSourceBase`.
"""
[docs] def init_log(self, log_dir, fmt=None):
# Mixin class, so no __init__ for simplicity
# NB: no super(); redefining the method
if fmt is None:
fmt = ("%(asctime)s %(levelname)s: %(funcName)s (%(filename)s line "
"%(lineno)d):\n%(message)s")
assert hasattr(self, 'log')
self.log.propagate = True
self.log.setLevel(logging.DEBUG)
if self.log.hasHandlers():
for handler in self.log.handlers:
self.log.removeHandler(handler)
self._log_handler = MDTFHeaderFileHandler(
filename=os.path.join(log_dir, f"{self.name}.log"),
mode="w", encoding="utf-8"
)
formatter = HangingIndentFormatter(
fmt=fmt, datefmt='%H:%M:%S',
header="", footer="\n"
)
self._log_handler.setFormatter(formatter)
self.log.addHandler(self._log_handler)
self.init_extra_log_handlers()
# transfer stuff from root logger cache
transfer_log_cache(self.log, close=False)
[docs] def close_log_file(self, log=True):
self._log_handler.close()
self._log_handler = None
# ------------------------------------------------------------------------------
[docs]def git_info():
"""Get the current git branch, hash, and list of uncommitted files, if
available.
Called by :meth:`DebugHeaderFileHandler._debug_header`. Based on NumPy's
implementation: `<https://stackoverflow.com/a/40170206>`__.
"""
def _minimal_ext_cmd(cmd):
# construct minimal environment
env = os.environ.copy()
env.update({'LANGUAGE':'C', 'LANG':'C', 'LC_ALL':'C'})
try:
out = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, env=env
).communicate()[0]
except subprocess.CalledProcessError:
out = ''
return out.strip().decode('utf-8')
git_branch = ""
git_hash = ""
git_dirty = ""
try:
git_branch = _minimal_ext_cmd(['git', 'rev-parse', '--abbrev-ref', 'HEAD'])
git_hash = _minimal_ext_cmd(['git', 'rev-parse', 'HEAD'])
git_dirty = _minimal_ext_cmd(['git', 'diff', '--no-ext-diff', '--name-only'])
except OSError:
pass
if git_dirty:
git_dirty = git_dirty.splitlines()
elif git_hash:
git_dirty = "<none>"
else:
git_dirty = "<couldn't get uncomitted files>"
if not git_branch:
git_branch = "<no branch>"
if not git_hash:
git_hash = "<couldn't get git hash>"
return (git_branch, git_hash, git_dirty)
[docs]def signal_logger(caller_name, signum=None, frame=None, log=_log):
"""Lookup signal name from number and write to log.
Taken from `<https://stackoverflow.com/a/2549950>`__.
Args:
caller_name (str): Calling function name, only used in log message.
signum, frame: parameters of the signal we recieved.
"""
if signum:
sig_lookup = {
k:v for v, k in reversed(sorted(list(signal.__dict__.items()))) \
if v.startswith('SIG') and not v.startswith('SIG_')
}
log.info(
"%s caught signal %s (%s)",
caller_name, sig_lookup.get(signum, 'UNKNOWN'), signum
)
else:
log.info("%s caught unknown signal.", caller_name)
[docs]def _set_excepthook(root_logger):
"""Ensure all uncaught exceptions, other than user KeyboardInterrupt, are
logged to the root logger.
See `<https://docs.python.org/3/library/sys.html#sys.excepthook>`__.
"""
def uncaught_exception_handler(exc_type, exc_value, exc_traceback):
if issubclass(exc_type, KeyboardInterrupt):
# Skip logging for user interrupt
sys.__excepthook__(exc_type, exc_value, exc_traceback)
return
root_logger.critical(
(70*'*') + "\nUncaught exception:\n", # banner so it stands out
exc_info=(exc_type, exc_value, exc_traceback)
)
sys.excepthook = uncaught_exception_handler
[docs]def initial_log_config():
"""Configure the root logger for logging to console and to a cache provided
by :class:`MultiFlushMemoryHandler`. For debugging purposes
we want to get console output set up before we've read in the real log config
files (which requires doing a full parse of the user input).
"""
logging.captureWarnings(True)
# log uncaught exceptions
root_logger = logging.getLogger()
_set_excepthook(root_logger)
log_d = {
"version": 1,
"disable_existing_loggers": True,
"root": {
"level": "NOTSET",
"handlers": ["debug", "stdout", "stderr", "cache"]
},
"handlers": {
"debug": {
"()": "src.util.logs.MDTFConsoleHandler",
"formatter": "level",
"level": logging.DEBUG,
"stream": "ext://sys.stdout"
},
"stdout": {
"()": "src.util.logs.MDTFConsoleHandler",
"formatter": "normal",
"level": logging.INFO,
"stream": "ext://sys.stdout"
},
"stderr": {
"()": "src.util.logs.MDTFConsoleHandler",
"formatter": "level",
"level": logging.WARNING,
"stream": "ext://sys.stderr"
},
"cache": {
"()": "src.util.logs.MultiFlushMemoryHandler",
"level": logging.NOTSET,
"capacity": 8*1024,
"flushOnClose": False
}
},
"formatters": {
"normal": {"format": "%(message)s"},
"level": {"format": "%(levelname)s: %(message)s"},
"debug": {
"()": HangingIndentFormatter,
"format": ("%(levelname)s in %(funcName)s() (%(filename)s line "
"%(lineno)d):\n%(message)s"),
"tabsize": 4,
"footer": "\n"
}
}
}
log_parser = argparse.ArgumentParser(add_help=False)
log_parser.add_argument('--verbose', '-v', default=0, action="count")
log_parser.add_argument('--quiet', '-q', default=0, action="count")
log_args, _ = log_parser.parse_known_args()
log_d = _configure_logging_dict(log_d, log_args)
logging.config.dictConfig(log_d)
root_logger.debug('Console loggers configured.')
[docs]def transfer_log_cache(target_log=None, close=False):
"""Transfer the contents of the root log cache
(:class:`MultiFlushMemoryHandler`) to logs on newly-configured objects.
"""
root_logger = logging.getLogger()
cache_idx = [i for i,handler in enumerate(root_logger.handlers) \
if isinstance(handler, MultiFlushMemoryHandler)]
if len(cache_idx) > 0:
temp_log_cache = root_logger.handlers[cache_idx[0]]
if target_log is not None:
try:
temp_log_cache.transfer_to_non_console(target_log)
except Exception:
root_logger.debug("Couldn't transfer log to '%s'.", target_log.name)
if close:
# delete it
temp_log_cache.close()
root_logger.removeHandler(temp_log_cache)