X-Git-Url: https://git.arvados.org/rnaseq-cwl-training.git/blobdiff_plain/aa316f855d3812b3105f86495eba89dcf58e587c..33c71b1448690c459d9dcf1c44d962c074aeadc0:/bin/util.py diff --git a/bin/util.py b/bin/util.py index 0cc8de6..0e16d86 100644 --- a/bin/util.py +++ b/bin/util.py @@ -1,4 +1,3 @@ -from __future__ import print_function import sys import os import json @@ -29,16 +28,14 @@ UNWANTED_FILES = [ # (Can't use 'None' because that might be a legitimate value.) REPORTER_NOT_SET = [] -class Reporter(object): + +class Reporter: """Collect and report errors.""" def __init__(self): """Constructor.""" - - super(Reporter, self).__init__() self.messages = [] - def check_field(self, filename, name, values, key, expected=REPORTER_NOT_SET): """Check that a dictionary has an expected value.""" @@ -48,10 +45,11 @@ class Reporter(object): pass elif type(expected) in (tuple, set, list): if values[key] not in expected: - self.add(filename, '{0} {1} value {2} is not in {3}', name, key, values[key], expected) + self.add( + filename, '{0} {1} value {2} is not in {3}', name, key, values[key], expected) elif values[key] != expected: - self.add(filename, '{0} {1} is {2} not {3}', name, key, values[key], expected) - + self.add(filename, '{0} {1} is {2} not {3}', + name, key, values[key], expected) def check(self, condition, location, fmt, *args): """Append error if condition not met.""" @@ -59,12 +57,36 @@ class Reporter(object): if not condition: self.add(location, fmt, *args) - def add(self, location, fmt, *args): """Append error unilaterally.""" self.messages.append((location, fmt.format(*args))) + @staticmethod + def pretty(item): + location, message = item + if isinstance(location, type(None)): + return message + elif isinstance(location, str): + return location + ': ' + message + elif isinstance(location, tuple): + return '{0}:{1}: '.format(*location) + message + + print('Unknown item "{0}"'.format(item), file=sys.stderr) + return NotImplemented + + @staticmethod + def key(item): + location, message = item + if isinstance(location, type(None)): + return ('', -1, message) + elif isinstance(location, str): + return (location, -1, message) + elif isinstance(location, tuple): + return (location[0], location[1], message) + + print('Unknown item "{0}"'.format(item), file=sys.stderr) + return NotImplemented def report(self, stream=sys.stdout): """Report all messages in order.""" @@ -72,30 +94,8 @@ class Reporter(object): if not self.messages: return - def pretty(item): - location, message = item - if isinstance(location, type(None)): - return message - elif isinstance(location, str): - return location + ': ' + message - elif isinstance(location, tuple): - return '{0}:{1}: '.format(*location) + message - else: - assert False, 'Unknown item "{0}"'.format(item) - - def key(item): - location, message = item - if isinstance(location, type(None)): - return ('', -1, message) - elif isinstance(location, str): - return (location, -1, message) - elif isinstance(location, tuple): - return (location[0], location[1], message) - else: - assert False, 'Unknown item "{0}"'.format(item) - - for m in sorted(self.messages, key=key): - print(pretty(m), file=stream) + for m in sorted(self.messages, key=self.key): + print(self.pretty(m), file=stream) def read_markdown(parser, path): @@ -105,17 +105,19 @@ def read_markdown(parser, path): """ # Split and extract YAML (if present). - with open(path, 'r') as reader: + with open(path, 'r', encoding='utf-8') as reader: body = reader.read() metadata_raw, metadata_yaml, body = split_metadata(path, body) # Split into lines. metadata_len = 0 if metadata_raw is None else metadata_raw.count('\n') - lines = [(metadata_len+i+1, line, len(line)) for (i, line) in enumerate(body.split('\n'))] + lines = [(metadata_len+i+1, line, len(line)) + for (i, line) in enumerate(body.split('\n'))] # Parse Markdown. cmd = 'ruby {0}'.format(parser) - p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, close_fds=True, universal_newlines=True) + p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, + close_fds=True, universal_newlines=True, encoding='utf-8') stdout_data, stderr_data = p.communicate(body) doc = json.loads(stdout_data) @@ -136,16 +138,16 @@ def split_metadata(path, text): metadata_raw = None metadata_yaml = None - metadata_len = None pieces = text.split('---', 2) if len(pieces) == 3: metadata_raw = pieces[1] text = pieces[2] try: - metadata_yaml = yaml.load(metadata_raw) + metadata_yaml = yaml.load(metadata_raw, Loader=yaml.SafeLoader) except yaml.YAMLError as e: - print('Unable to parse YAML header in {0}:\n{1}'.format(path, e), file=sys.stderr) + print('Unable to parse YAML header in {0}:\n{1}'.format( + path, e), file=sys.stderr) sys.exit(1) return metadata_raw, metadata_yaml, text @@ -158,10 +160,11 @@ def load_yaml(filename): """ try: - with open(filename, 'r') as reader: - return yaml.load(reader) - except (yaml.YAMLError, FileNotFoundError) as e: - print('Unable to load YAML file {0}:\n{1}'.format(filename, e), file=sys.stderr) + with open(filename, 'r', encoding='utf-8') as reader: + return yaml.load(reader, Loader=yaml.SafeLoader) + except (yaml.YAMLError, IOError) as e: + print('Unable to load YAML file {0}:\n{1}'.format( + filename, e), file=sys.stderr) sys.exit(1)