Update to LC inclusive language
[rnaseq-cwl-training.git] / bin / util.py
1 import sys
2 import os
3 import json
4 from subprocess import Popen, PIPE
5
6 # Import this way to produce a more useful error message.
7 try:
8     import yaml
9 except ImportError:
10     print('Unable to import YAML module: please install PyYAML', file=sys.stderr)
11     sys.exit(1)
12
13
14 # Things an image file's name can end with.
15 IMAGE_FILE_SUFFIX = {
16     '.gif',
17     '.jpg',
18     '.png',
19     '.svg'
20 }
21
22 # Files that shouldn't be present.
23 UNWANTED_FILES = [
24     '.nojekyll'
25 ]
26
27 # Marker to show that an expected value hasn't been provided.
28 # (Can't use 'None' because that might be a legitimate value.)
29 REPORTER_NOT_SET = []
30
31
32 class Reporter:
33     """Collect and report errors."""
34
35     def __init__(self):
36         """Constructor."""
37         self.messages = []
38
39     def check_field(self, filename, name, values, key, expected=REPORTER_NOT_SET):
40         """Check that a dictionary has an expected value."""
41
42         if key not in values:
43             self.add(filename, '{0} does not contain {1}', name, key)
44         elif expected is REPORTER_NOT_SET:
45             pass
46         elif type(expected) in (tuple, set, list):
47             if values[key] not in expected:
48                 self.add(
49                     filename, '{0} {1} value {2} is not in {3}', name, key, values[key], expected)
50         elif values[key] != expected:
51             self.add(filename, '{0} {1} is {2} not {3}',
52                      name, key, values[key], expected)
53
54     def check(self, condition, location, fmt, *args):
55         """Append error if condition not met."""
56
57         if not condition:
58             self.add(location, fmt, *args)
59
60     def add(self, location, fmt, *args):
61         """Append error unilaterally."""
62
63         self.messages.append((location, fmt.format(*args)))
64
65     @staticmethod
66     def pretty(item):
67         location, message = item
68         if isinstance(location, type(None)):
69             return message
70         elif isinstance(location, str):
71             return location + ': ' + message
72         elif isinstance(location, tuple):
73             return '{0}:{1}: '.format(*location) + message
74
75         print('Unknown item "{0}"'.format(item), file=sys.stderr)
76         return NotImplemented
77
78     @staticmethod
79     def key(item):
80         location, message = item
81         if isinstance(location, type(None)):
82             return ('', -1, message)
83         elif isinstance(location, str):
84             return (location, -1, message)
85         elif isinstance(location, tuple):
86             return (location[0], location[1], message)
87
88         print('Unknown item "{0}"'.format(item), file=sys.stderr)
89         return NotImplemented
90
91     def report(self, stream=sys.stdout):
92         """Report all messages in order."""
93
94         if not self.messages:
95             return
96
97         for m in sorted(self.messages, key=self.key):
98             print(self.pretty(m), file=stream)
99
100
101 def read_markdown(parser, path):
102     """
103     Get YAML and AST for Markdown file, returning
104     {'metadata':yaml, 'metadata_len':N, 'text':text, 'lines':[(i, line, len)], 'doc':doc}.
105     """
106
107     # Split and extract YAML (if present).
108     with open(path, 'r') as reader:
109         body = reader.read()
110     metadata_raw, metadata_yaml, body = split_metadata(path, body)
111
112     # Split into lines.
113     metadata_len = 0 if metadata_raw is None else metadata_raw.count('\n')
114     lines = [(metadata_len+i+1, line, len(line))
115              for (i, line) in enumerate(body.split('\n'))]
116
117     # Parse Markdown.
118     cmd = 'ruby {0}'.format(parser)
119     p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE,
120               close_fds=True, universal_newlines=True)
121     stdout_data, stderr_data = p.communicate(body)
122     doc = json.loads(stdout_data)
123
124     return {
125         'metadata': metadata_yaml,
126         'metadata_len': metadata_len,
127         'text': body,
128         'lines': lines,
129         'doc': doc
130     }
131
132
133 def split_metadata(path, text):
134     """
135     Get raw (text) metadata, metadata as YAML, and rest of body.
136     If no metadata, return (None, None, body).
137     """
138
139     metadata_raw = None
140     metadata_yaml = None
141
142     pieces = text.split('---', 2)
143     if len(pieces) == 3:
144         metadata_raw = pieces[1]
145         text = pieces[2]
146         try:
147             metadata_yaml = yaml.load(metadata_raw)
148         except yaml.YAMLError as e:
149             print('Unable to parse YAML header in {0}:\n{1}'.format(
150                 path, e), file=sys.stderr)
151             sys.exit(1)
152
153     return metadata_raw, metadata_yaml, text
154
155
156 def load_yaml(filename):
157     """
158     Wrapper around YAML loading so that 'import yaml' is only needed
159     in one file.
160     """
161
162     try:
163         with open(filename, 'r') as reader:
164             return yaml.load(reader)
165     except (yaml.YAMLError, IOError) as e:
166         print('Unable to load YAML file {0}:\n{1}'.format(
167             filename, e), file=sys.stderr)
168         sys.exit(1)
169
170
171 def check_unwanted_files(dir_path, reporter):
172     """
173     Check that unwanted files are not present.
174     """
175
176     for filename in UNWANTED_FILES:
177         path = os.path.join(dir_path, filename)
178         reporter.check(not os.path.exists(path),
179                        path,
180                        "Unwanted file found")
181
182
183 def require(condition, message):
184     """Fail if condition not met."""
185
186     if not condition:
187         print(message, file=sys.stderr)
188         sys.exit(1)