"""Utility functions for interacting with the *local* filesystem and configuration
files.
"""
import os
import io
import collections
from distutils.spawn import find_executable
import glob
import json
import re
import shutil
import string
from . import basic
from . import exceptions
import logging
_log = logging.getLogger(__name__)
[docs]def abbreviate_path(path, old_base, new_base=None):
"""Express path as a path relative to old_base, optionally prepending
new_base.
"""
ps = tuple(os.path.abspath(p) for p in (path, old_base))
str_ = os.path.relpath(ps[0], start=os.path.commonpath(ps))
if new_base is not None:
str_ = os.path.join(new_base, str_)
return str_
[docs]def resolve_path(path, root_path="", env=None, log=_log):
"""Abbreviation to resolve relative paths.
Args:
path (:obj:`str`): path to resolve.
root_path (:obj:`str`, optional): root path to resolve `path` with. If
not given, resolves relative to `cwd`.
Returns: Absolute version of `path`, relative to `root_path` if given,
otherwise relative to `os.getcwd`.
"""
def _expandvars(path, env_dict):
"""Expand quoted variables of the form $key and ${key} in path,
where key is a key in env_dict, similar to os.path.expandvars.
See `<https://stackoverflow.com/a/30777398>`__; specialize to not skipping
escaped characters and not changing unrecognized variables.
"""
return re.sub(
r'\$(\w+|\{([^}]*)\})',
lambda m: env_dict.get(m.group(2) or m.group(1), m.group(0)),
path
)
if path == '':
return path # default value set elsewhere
path = os.path.expanduser(path) # resolve '~' to home dir
path = os.path.expandvars(path) # expand $VAR or ${VAR} for shell env_vars
if isinstance(env, dict):
path = _expandvars(path, env)
if '$' in path:
log.warning("Couldn't resolve all env vars in '%s'", path)
return path
if os.path.isabs(path):
return path
if root_path == "":
root_path = os.getcwd()
assert os.path.isabs(root_path)
return os.path.normpath(os.path.join(root_path, path))
[docs]def recursive_copy(src_files, src_root, dest_root, copy_function=None,
overwrite=False):
"""Copy src_files to dest_root, preserving relative subdirectory structure.
Copies a subset of files in a directory subtree rooted at src_root to an
identical subtree structure rooted at dest_root, creating any subdirectories
as needed. For example, `recursive_copy('/A/B/C.txt', '/A', '/D')` will
first create the destination subdirectory `/D/B` and copy '/A/B/C.txt` to
`/D/B/C.txt`.
Args:
src_files: Absolute path, or list of absolute paths, to files to copy.
src_root: Root subtree of all files in src_files. Raises a ValueError
if all files in src_files are not contained in the src_root directory.
dest_root: Destination directory in which to create the copied subtree.
copy_function: Function to use to copy individual files. Must take two
arguments, the source and destination paths, respectively. Defaults
to :py:meth:`shutil.copy2`.
overwrite: Boolean, deafult False. If False, raise an OSError if
any destination files already exist, otherwise silently overwrite.
"""
if copy_function is None:
copy_function = shutil.copy2
src_files = basic.to_iter(src_files)
for f in src_files:
if not f.startswith(src_root):
raise ValueError('{} not a sub-path of {}'.format(f, src_root))
dest_files = [
os.path.join(dest_root, os.path.relpath(f, start=src_root)) \
for f in src_files
]
for f in dest_files:
if not overwrite and os.path.exists(f):
raise OSError('{} exists.'.format(f))
os.makedirs(os.path.normpath(os.path.dirname(f)), exist_ok=True)
for src, dest in zip(src_files, dest_files):
copy_function(src, dest)
[docs]def check_executable(exec_name):
"""Tests if <exec_name> is found on the current $PATH.
Args:
exec_name (:py:obj:`str`): Name of the executable to search for.
Returns: :py:obj:`bool` True/false if executable was found on $PATH.
"""
return (find_executable(exec_name) is not None)
[docs]def find_files(src_dirs, filename_globs, n_files=None):
"""Return list of files in ``src_dirs``, or any subdirectories, matching any
of ``filename_globs``. Wraps :py:class:`glob.glob`.
Args:
src_dirs: Directory, or a list of directories, to search for files in. The
function will also search all subdirectories.
filename_globs: Glob, or a list of globs, for filenames to match. This
is a shell globbing pattern, not a full regex.
n_files (int, optional): If supplied, raise
:class:`~framework.util.exceptions.MDTFFileNotFoundError` if the
number of files found is not equal to this number.
Returns: :py:obj:`list` of paths to files matching any of the criteria.
If no files are found, the list is empty.
"""
src_dirs = basic.to_iter(src_dirs)
filename_globs = basic.to_iter(filename_globs)
files = set([])
for d in src_dirs:
for g in filename_globs:
files.update(glob.glob(os.path.join(d, g)))
files.update(glob.glob(os.path.join(d, '**', g), recursive=True))
if n_files is not None and len(files) != n_files:
# _log.debug('Expected to find %d files, instead found %d.', n_files, len(files))
raise exceptions.MDTFFileNotFoundError(str(filename_globs))
return list(files)
[docs]def check_dir(dir_, attr_name="", create=False):
"""Check existence of directories. No action is taken for directories that
already exist; nonexistent directories either raise a
:class:`~util.MDTFFileNotFoundError` or cause the creation of that directory.
Args:
dir\_: If a string, the absolute path to check; otherwise, assume the
path to check is given by the *attr_name* attribute on this object.
attr_name: Name of the attribute being checked (used in log messages).
create: (bool, default False): if True, nonexistent directories are
created.
"""
if not isinstance(dir_, str):
dir_ = getattr(dir_, attr_name, None)
if not isinstance(dir_, str):
raise ValueError(f"Received bad directory: {repr(dir_)}.")
try:
if not os.path.isdir(dir_):
if create:
os.makedirs(dir_, exist_ok=False)
else:
raise exceptions.MDTFFileNotFoundError(dir_)
except Exception as exc:
if isinstance(exc, FileNotFoundError):
path = getattr(exc, 'filename', '')
if attr_name:
raise exceptions.MDTFFileNotFoundError(
f"{attr_name} not found at '{path}'.")
else:
raise exceptions.MDTFFileNotFoundError(path)
else:
raise OSError(f"Caught exception when checking {attr_name}={dir_}.") \
from exc
[docs]def bump_version(path, new_v=None, extra_dirs=None):
"""Return a filename that doesn't conflict with existing files.
if extra_dirs supplied, make sure path doesn't conflict with pre-existing
files at those locations either.
"""
def _split_version(file_):
match = re.match(r"""
^(?P<file_base>.*?) # arbitrary characters (lazy match)
(\.v(?P<version>\d+)) # literal '.v' followed by digits
? # previous group may occur 0 or 1 times
$ # end of string
""", file_, re.VERBOSE)
if match:
return (match.group('file_base'), match.group('version'))
else:
return (file_, '')
def _reassemble(dir_, file_, version, ext_, final_sep):
if version:
file_ = ''.join([file_, '.v', str(version), ext_])
else:
# get here for version == 0, '' or None
file_ = ''.join([file_, ext_])
return os.path.join(dir_, file_) + final_sep
def _path_exists(dir_list, file_, new_v, ext_, sep):
new_paths = [_reassemble(d, file_, new_v, ext_, sep) for d in dir_list]
return any([os.path.exists(p) for p in new_paths])
if path.endswith(os.sep):
# remove any terminating slash on directory
path = path.rstrip(os.sep)
final_sep = os.sep
else:
final_sep = ''
dir_, file_ = os.path.split(path)
if not extra_dirs:
dir_list = []
else:
dir_list = basic.to_iter(extra_dirs)
dir_list.append(dir_)
file_, old_v = _split_version(file_)
if not old_v:
# maybe it has an extension and then a version number
file_, ext_ = os.path.splitext(file_)
file_, old_v = _split_version(file_)
else:
ext_ = ''
if new_v is not None:
# removes version if new_v ==0
new_path = _reassemble(dir_, file_, new_v, ext_, final_sep)
else:
if not old_v:
new_v = 0
else:
new_v = int(old_v)
while _path_exists(dir_list, file_, new_v, ext_, final_sep):
new_v = new_v + 1
new_path = _reassemble(dir_, file_, new_v, ext_, final_sep)
return (new_path, new_v)
# ---------------------------------------------------------
# CONFIG FILE PARSING
# ---------------------------------------------------------
[docs]def parse_json(str_):
def _pos_from_lc(lineno, colno, str_):
# fix line number, since we stripped commented-out lines. JSONDecodeError
# computes line/col no. in error message from character position in string.
lines = str_.splitlines()
return (colno - 1) + sum( (len(line) + 1) for line in lines[:lineno])
(strip_str, line_nos) = strip_comments(str_, delimiter= '//')
try:
parsed_json = json.loads(strip_str,
object_pairs_hook=collections.OrderedDict)
except json.JSONDecodeError as exc:
# fix reported line number, since we stripped commented-out lines.
assert exc.lineno <= len(line_nos)
raise json.JSONDecodeError(
msg=exc.msg, doc=str_,
pos=_pos_from_lc(line_nos[exc.lineno-1], exc.colno, str_)
)
except UnicodeDecodeError as exc:
raise json.JSONDecodeError(
msg=f"parse_json received UnicodeDecodeError:\n{exc}",
doc=strip_str, pos=0
)
return parsed_json
[docs]def read_json(file_path, log=_log):
log.debug('Reading file %s', file_path)
if not os.path.isfile(file_path):
raise exceptions.MDTFFileNotFoundError(file_path)
try:
with io.open(file_path, 'r', encoding='utf-8') as file_:
str_ = file_.read()
except Exception as exc:
# something more serious than missing file
_log.critical("Caught exception when trying to read %s: %r", file_path, exc)
exit(1)
return parse_json(str_)
[docs]def find_json(dir_, file_name, exit_if_missing=True, log=_log):
"""Wrap :func:`read_json` with more elaborate error handling. find_files()
will find a file named file_name at any level within dir\_.
"""
try:
f = find_files(dir_, file_name, n_files=1)
return read_json(f[0])
except exceptions.MDTFFileNotFoundError:
if exit_if_missing:
_log.critical("Couldn't find file %s in %s.", file_name, dir_)
exit(1)
else:
log.debug("Couldn't find file %s in %s; continuing.",
file_name, dir_)
return dict()
[docs]def write_json(struct, file_path, sort_keys=False, log=_log):
"""Wrapping file I/O simplifies unit testing.
Args:
struct (:py:obj:`dict`)
file_path (:py:obj:`str`): path of the JSON file to write.
"""
log.debug('Writing file %s', file_path)
try:
str_ = json.dumps(struct,
sort_keys=sort_keys, indent=2, separators=(',', ': '))
with io.open(file_path, 'w', encoding='utf-8') as file_:
file_.write(str_)
except IOError:
_log.critical(f'Fatal IOError when trying to write {file_path}. Exiting.')
exit(1)
[docs]def pretty_print_json(struct, sort_keys=False):
"""Convert struct to a pseudo-YAML string for human-readable debugging
purposes only. Output is not valid JSON (or YAML).
"""
str_ = json.dumps(struct, sort_keys=sort_keys, indent=2)
for char in [',', '{', '}', '[', ']']:
str_ = str_.replace(char, '')
# remove isolated double quotes, but keep ""
str_ = re.sub(r'(?<!\")\"(?!\")', "", str_)
# remove lines containing only whitespace
return os.linesep.join([s for s in str_.splitlines() if s.strip()])
# ---------------------------------------------------------
# HTML TEMPLATING
# ---------------------------------------------------------
[docs]class _DoubleBraceTemplate(string.Template):
"""Private class used by :func:`~util.append_html_template` to do
string templating with double curly brackets as delimiters, since single
brackets are also used in css.
See `<https://docs.python.org/3.7/library/string.html#string.Template>`_ and
`<https://stackoverflow.com/a/34362892>`__.
"""
flags = re.VERBOSE # matching is case-sensitive, unlike default
delimiter = '{{' # starting delimter is two braces, then apply
pattern = r"""
\{\{(?: # match delimiter itself, but don't include it
# Alternatives for what to do with string following delimiter:
# case 1) text is an escaped double bracket, written as '{{{{'.
(?P<escaped>\{\{)|
# case 2) text is the name of an env var, possibly followed by whitespace,
# followed by closing double bracket. Match POSIX env var names,
# case-sensitive (see https://stackoverflow.com/a/2821183), with the
# addition that hyphens are allowed.
# Can't tell from docs what the distinction between <named> and <braced> is.
\s*(?P<named>[a-zA-Z_][a-zA-Z0-9_-]*)\s*\}\}|
\s*(?P<braced>[a-zA-Z_][a-zA-Z0-9_-]*)\s*\}\}|
# case 3) none of the above: ignore & move on (when using safe_substitute)
(?P<invalid>)
)
"""
[docs]def append_html_template(template_file, target_file, template_dict={},
create=True, append=True):
"""Perform substitutions on template_file and write result to target_file.
Variable substitutions are done with custom
`templating <https://docs.python.org/3.7/library/string.html#template-strings>`__,
replacing *double* curly bracket-delimited keys with their values in template_dict.
For example, if template_dict is {'A': 'foo'}, all occurrences of the string
`{{A}}` in template_file are replaced with the string `foo`. Spaces between
the braces and variable names are ignored.
Double-curly-bracketed strings that don't correspond to keys in template_dict are
ignored (instead of raising a KeyError.)
Double curly brackets are chosen as the delimiter to match the default
syntax of, eg, django and jinja2. Using single curly braces leads to conflicts
with CSS syntax.
Args:
template_file: Path to template file.
target_file: Destination path for result.
template_dict: :py:obj:`dict` of variable name-value pairs. Both names
and values must be strings.
create: Boolean, default True. If true, create target_file if it doesn't
exist, otherwise raise an OSError.
append: Boolean, default True. If target_file exists and this is true,
append the substituted contents of template_file to it. If false,
overwrite target_file with the substituted contents of template_file.
"""
assert os.path.exists(template_file)
with io.open(template_file, 'r', encoding='utf-8') as f:
html_str = f.read()
html_str = _DoubleBraceTemplate(html_str).safe_substitute(template_dict)
if not os.path.exists(target_file):
if create:
# print("\tDEBUG: write {} to new {}".format(template_file, target_file))
mode = 'w'
else:
raise OSError("Can't find {}".format(target_file))
else:
if append:
# print("\tDEBUG: append {} to {}".format(template_file, target_file))
mode = 'a'
else:
# print("\tDEBUG: overwrite {} with {}".format(target_file, template_file))
os.remove(target_file)
mode = 'w'
with io.open(target_file, mode, encoding='utf-8') as f:
f.write(html_str)