Cleaning up sorting of error messages.
[rnaseq-cwl-training.git] / bin / util.py
1 import sys
2 import os
3 import json
4 from subprocess import Popen, PIPE
5
6 # Import this way to produce a more useful error message.
7 try:
8     import yaml
9 except ImportError:
10     print('Unable to import YAML module: please install PyYAML', file=sys.stderr)
11     sys.exit(1)
12
13
14 # Things an image file's name can end with.
15 IMAGE_FILE_SUFFIX = {
16     '.gif',
17     '.jpg',
18     '.png',
19     '.svg'
20 }
21
22 # Files that shouldn't be present.
23 UNWANTED_FILES = [
24     '.nojekyll'
25 ]
26
27 # Marker to show that an expected value hasn't been provided.
28 # (Can't use 'None' because that might be a legitimate value.)
29 REPORTER_NOT_SET = []
30
31 class Reporter(object):
32     """Collect and report errors."""
33
34     def __init__(self):
35         """Constructor."""
36
37         super(Reporter, self).__init__()
38         self.messages = []
39
40
41     def check_field(self, filename, name, values, key, expected=REPORTER_NOT_SET):
42         """Check that a dictionary has an expected value."""
43
44         if key not in values:
45             self.add(filename, '{0} does not contain {1}', name, key)
46         elif expected is REPORTER_NOT_SET:
47             pass
48         elif type(expected) in (tuple, set, list):
49             if values[key] not in expected:
50                 self.add(filename, '{0} {1} value {2} is not in {3}', name, key, values[key], expected)
51         elif values[key] != expected:
52             self.add(filename, '{0} {1} is {2} not {3}', name, key, values[key], expected)
53
54
55     def check(self, condition, location, fmt, *args):
56         """Append error if condition not met."""
57
58         if not condition:
59             self.add(location, fmt, *args)
60
61
62     def add(self, location, fmt, *args):
63         """Append error unilaterally."""
64
65         self.messages.append((location, fmt.format(*args)))
66
67
68     def report(self, stream=sys.stdout):
69         """Report all messages in order."""
70
71         if not self.messages:
72             return
73
74         def pretty(item):
75             location, message = item
76             if isinstance(location, type(None)):
77                 return message
78             elif isinstance(location, str):
79                 return location + ': ' + message
80             elif isinstance(location, tuple):
81                 return '{0}:{1}: '.format(*location) + message
82             else:
83                 assert False, 'Unknown item "{0}"'.format(item)
84
85         def key(item):
86             location, message = item
87             if isinstance(location, type(None)):
88                 return ('', -1, message)
89             elif isinstance(location, str):
90                 return (location, -1, message)
91             elif isinstance(location, tuple):
92                 return (location[0], location[1], message)
93             else:
94                 assert False, 'Unknown item "{0}"'.format(item)
95
96         for m in sorted(self.messages, key=key):
97             print(pretty(m), file=stream)
98
99
100 def read_markdown(parser, path):
101     """
102     Get YAML and AST for Markdown file, returning
103     {'metadata':yaml, 'metadata_len':N, 'text':text, 'lines':[(i, line, len)], 'doc':doc}.
104     """
105
106     # Split and extract YAML (if present).
107     with open(path, 'r') as reader:
108         body = reader.read()
109     metadata_raw, metadata_yaml, body = split_metadata(path, body)
110
111     # Split into lines.
112     metadata_len = 0 if metadata_raw is None else metadata_raw.count('\n')
113     lines = [(metadata_len+i+1, line, len(line)) for (i, line) in enumerate(body.split('\n'))]
114
115     # Parse Markdown.
116     cmd = 'ruby {0}'.format(parser)
117     p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, close_fds=True, universal_newlines=True)
118     stdout_data, stderr_data = p.communicate(body)
119     doc = json.loads(stdout_data)
120
121     return {
122         'metadata': metadata_yaml,
123         'metadata_len': metadata_len,
124         'text': body,
125         'lines': lines,
126         'doc': doc
127     }
128
129
130 def split_metadata(path, text):
131     """
132     Get raw (text) metadata, metadata as YAML, and rest of body.
133     If no metadata, return (None, None, body).
134     """
135
136     metadata_raw = None
137     metadata_yaml = None
138     metadata_len = None
139
140     pieces = text.split('---', 2)
141     if len(pieces) == 3:
142         metadata_raw = pieces[1]
143         text = pieces[2]
144         try:
145             metadata_yaml = yaml.load(metadata_raw)
146         except yaml.YAMLError as e:
147             print('Unable to parse YAML header in {0}:\n{1}'.format(path, e), file=sys.stderr)
148             sys.exit(1)
149
150     return metadata_raw, metadata_yaml, text
151
152
153 def load_yaml(filename):
154     """
155     Wrapper around YAML loading so that 'import yaml' is only needed
156     in one file.
157     """
158
159     try:
160         with open(filename, 'r') as reader:
161             return yaml.load(reader)
162     except (yaml.YAMLError, FileNotFoundError) as e:
163         print('Unable to load YAML file {0}:\n{1}'.format(filename, e), file=sys.stderr)
164         sys.exit(1)
165
166
167 def check_unwanted_files(dir_path, reporter):
168     """
169     Check that unwanted files are not present.
170     """
171
172     for filename in UNWANTED_FILES:
173         path = os.path.join(dir_path, filename)
174         reporter.check(not os.path.exists(path),
175                        path,
176                        "Unwanted file found")
177
178
179 def require(condition, message):
180     """Fail if condition not met."""
181
182     if not condition:
183         print(message, file=sys.stderr)
184         sys.exit(1)