Source code for src.verify_links

#!/usr/bin/env python
"""
Check output of the files returned by a run of the MDTF framework and determine
if any PODs failed to generate files, as determined by non-functional html links
in the output webpages.

Based on test_website by Dani Coleman, bundy@ucar.edu
"""
import sys
# do version check before importing other stuff
if sys.version_info[0] != 3 or sys.version_info[1] < 7:
    sys.exit("ERROR: MDTF currently only supports python >= 3.7.*. Please check "
    "which version is on your $PATH (e.g. with `which python`.)\n"
    f"Attempted to run with following python version:\n{sys.version}")
# passed; continue with imports
import os
import argparse
import collections
import itertools
from html.parser import HTMLParser
import re
import urllib.parse
import urllib.request
import urllib.error
from src import util

import logging
_log = logging.getLogger(__name__)

Link = collections.namedtuple('Link', ['origin', 'target'])
Link.__doc__ = """
Class representing individual links, to simplify bookkeeping.

Attributes:
    origin (str): URL of the document containing the link.
    target (str): URL referred to by the link.
"""

[docs]class LinkParser(HTMLParser): """Custom subclass of :py:class:`~html.parser.HTMLParser` which constructs an iterable over each <a> tag. Adapted from `<https://stackoverflow.com/a/41663924>`__. """
[docs] def reset(self): super(LinkParser, self).reset() self.links = iter([])
[docs] def handle_starttag(self, tag, attrs): if tag.lower() == 'a': for name, value in attrs: if name.lower() == 'href': self.links = itertools.chain(self.links, [value])
[docs]class LinkVerifier(object): def __init__(self, root, rel_path_root=None, verbose=False, log=None): """Initialize search for broken links. Args: root (str): Either a URL or path on the local filesystem. Location of the top-level html file to begin the search from. rel_path_root (str, optional): Either a URL or path on the local filesystem. If given, used as the path that relative paths to missing files are given relative to. Defaults to root (if root is a directory) or the directory containing root (if root is a file.) verbose (bool, default False): Set to True to print each file examined. """ def munge_input_url(url): url_parts = urllib.parse.urlsplit(url) if not url_parts.scheme: # given a filesystem path, not a URL path_ = os.path.abspath(url_parts.path) url_parts = url_parts._replace(path=path_) url_parts = url_parts._replace(scheme='file') if os.path.splitext(url_parts.path)[1].lower().startswith('.htm'): # URL points to an html file; get parent directory path_, file_ = os.path.split(url_parts.path) else: file_ = "" if not path_.endswith('/'): path_ = path_ + '/' url_parts = url_parts._replace(path=path_) return (urllib.parse.urlunsplit(url_parts), path_, file_) self.verbose = verbose self.pod_name = None # NB: WK_DIR isn't a "working directory"; it's just the base path # relative to which paths are reported (self.root_url, self.WK_DIR, self.root_file) = munge_input_url(root) if rel_path_root: self.rel_path_root, _, _ = munge_input_url(rel_path_root) else: self.rel_path_root = self.root_url if log is None: self.log = _log else: self.log = log
[docs] def check_one_url(self, link): """Get list of URLs linked to from the current URL (if any). Args: link (:obj:`Link`): Instance of :class:`Link`. Only the URL in link.target is examined. Returns: Either #. None if link.target can't be opened, #. the empty list if link.target is not an html document, or #. a list of links contained in link.target, expressed as :class:`Link` objects. """ if hasattr(link, 'target'): url = link.target else: return None try: f = urllib.request.urlopen(url) except urllib.error.HTTPError as e: self.log.error(f'Error code: {e.code}', tags=util.ObjectLogTag.BANNER) return None except urllib.error.URLError as e: # print('\nFailed to find file or connect to server.') # print('Reason: ', e.reason) tup = re.split(r"\[Errno 2\] No such file or directory: \'(.*)\'", str(e.reason)) if len(tup) == 3: str_ = util.abbreviate_path(tup[1], self.WK_DIR, '$WK_DIR') else: str_ = str(e.reason) self.log.error("Missing '%s'.", str_, tags=util.ObjectLogTag.BANNER) return None if f.info().get_content_subtype() != 'html': return [] else: parser = LinkParser() links = [ Link(origin=url, target=urllib.parse.urljoin(url, link_out)) \ for link_out in self.gen_links(f, parser) ] f.close() return links
[docs] def breadth_first(self, root_url): """Breadth-first search of all files linked from an initial root_url. The search correctly handles cycles (ie, A.html links to B.html and B.html links to A.html) and only examines files in subdirectories of root_url's directory, so that links to external sites are ignored, rather than trying to trace the link structure of the whole internet. Args: root_url (str): URL of an html file to start the search at. Returns: list of (link_source, link_target) tuples where the file in link_target couldn't be found. """ missing = [] known_urls = set([root_url]) root_parts = urllib.parse.urlsplit(root_url) root_parts = root_parts._replace(path=os.path.dirname(root_parts.path)) # root_parent = URL to directory containing file referred to in root_url root_parent = urllib.parse.urlunsplit(root_parts) queue = [Link(origin=None, target=root_url)] if self.verbose: self.log.info("Checking '%s'.", root_url) while queue: current_link = queue.pop(0) if self.verbose: self.log.info("\tChecking {}".format( current_link.target[len(root_parent) + 1:] ), end="") new_links = self.check_one_url(current_link) if new_links is None: if self.verbose: self.log.info('...MISSING!') missing.append(current_link) else: if self.verbose: self.log.info('...OK') # restrict links to those that start with root_parent new_links = [ lnk for lnk in new_links if lnk.target not in known_urls \ and lnk.target.startswith(root_parent) ] queue.extend(new_links) # update known_urls so that we don't chase cycles known_urls.update([lnk.target for lnk in new_links]) return missing
# -------------------------------------------------------------- if __name__ == '__main__': # Wrap input/output if we're called as a standalone script parser = argparse.ArgumentParser() parser.add_argument("-v", "--verbose", action="store_true", help="increase output verbosity") parser.add_argument("path_or_url", help="URL or filesystem path to the MDTF framework output directory.") args = parser.parse_args() # instead of print(), use root logger log = logging.getLogger() handler = logging.StreamHandler(stream=sys.stdout) formatter = logging.Formatter(fmt='%(message)s', datefmt='%H:%M:%S') handler.setFormatter(formatter) log.addHandler(handler) link_verifier = LinkVerifier(args.path_or_url, verbose=args.verbose) missing_dict = link_verifier.verify_all_links() if missing_dict: print("ERROR: the following files are missing:") print(util.pretty_print_json(missing_dict)) sys.exit(1) else: print("SUCCESS: no missing links found.") sys.exit(0)