Remove unnecessary super call
[rnaseq-cwl-training.git] / bin / util.py
1
2 import sys
3 import os
4 import json
5 from subprocess import Popen, PIPE
6
7 # Import this way to produce a more useful error message.
8 try:
9     import yaml
10 except ImportError:
11     print('Unable to import YAML module: please install PyYAML', file=sys.stderr)
12     sys.exit(1)
13
14
15 # Things an image file's name can end with.
16 IMAGE_FILE_SUFFIX = {
17     '.gif',
18     '.jpg',
19     '.png',
20     '.svg'
21 }
22
23 # Files that shouldn't be present.
24 UNWANTED_FILES = [
25     '.nojekyll'
26 ]
27
28 # Marker to show that an expected value hasn't been provided.
29 # (Can't use 'None' because that might be a legitimate value.)
30 REPORTER_NOT_SET = []
31
32
33 class Reporter(object):
34     """Collect and report errors."""
35
36     def __init__(self):
37         """Constructor."""
38         self.messages = []
39
40     def check_field(self, filename, name, values, key, expected=REPORTER_NOT_SET):
41         """Check that a dictionary has an expected value."""
42
43         if key not in values:
44             self.add(filename, '{0} does not contain {1}', name, key)
45         elif expected is REPORTER_NOT_SET:
46             pass
47         elif type(expected) in (tuple, set, list):
48             if values[key] not in expected:
49                 self.add(
50                     filename, '{0} {1} value {2} is not in {3}', name, key, values[key], expected)
51         elif values[key] != expected:
52             self.add(filename, '{0} {1} is {2} not {3}',
53                      name, key, values[key], expected)
54
55     def check(self, condition, location, fmt, *args):
56         """Append error if condition not met."""
57
58         if not condition:
59             self.add(location, fmt, *args)
60
61     def add(self, location, fmt, *args):
62         """Append error unilaterally."""
63
64         self.messages.append((location, fmt.format(*args)))
65
66     @staticmethod
67     def pretty(item):
68         location, message = item
69         if isinstance(location, type(None)):
70             return message
71         elif isinstance(location, str):
72             return location + ': ' + message
73         elif isinstance(location, tuple):
74             return '{0}:{1}: '.format(*location) + message
75         else:
76             assert False, 'Unknown item "{0}"'.format(item)
77
78     @staticmethod
79     def key(item):
80         location, message = item
81         if isinstance(location, type(None)):
82             return ('', -1, message)
83         elif isinstance(location, str):
84             return (location, -1, message)
85         elif isinstance(location, tuple):
86             return (location[0], location[1], message)
87         else:
88             assert False, 'Unknown item "{0}"'.format(item)
89
90     def report(self, stream=sys.stdout):
91         """Report all messages in order."""
92
93         if not self.messages:
94             return
95
96         for m in sorted(self.messages, key=self.key):
97             print(self.pretty(m), file=stream)
98
99
100 def read_markdown(parser, path):
101     """
102     Get YAML and AST for Markdown file, returning
103     {'metadata':yaml, 'metadata_len':N, 'text':text, 'lines':[(i, line, len)], 'doc':doc}.
104     """
105
106     # Split and extract YAML (if present).
107     with open(path, 'r') as reader:
108         body = reader.read()
109     metadata_raw, metadata_yaml, body = split_metadata(path, body)
110
111     # Split into lines.
112     metadata_len = 0 if metadata_raw is None else metadata_raw.count('\n')
113     lines = [(metadata_len+i+1, line, len(line))
114              for (i, line) in enumerate(body.split('\n'))]
115
116     # Parse Markdown.
117     cmd = 'ruby {0}'.format(parser)
118     p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE,
119               close_fds=True, universal_newlines=True)
120     stdout_data, stderr_data = p.communicate(body)
121     doc = json.loads(stdout_data)
122
123     return {
124         'metadata': metadata_yaml,
125         'metadata_len': metadata_len,
126         'text': body,
127         'lines': lines,
128         'doc': doc
129     }
130
131
132 def split_metadata(path, text):
133     """
134     Get raw (text) metadata, metadata as YAML, and rest of body.
135     If no metadata, return (None, None, body).
136     """
137
138     metadata_raw = None
139     metadata_yaml = None
140
141     pieces = text.split('---', 2)
142     if len(pieces) == 3:
143         metadata_raw = pieces[1]
144         text = pieces[2]
145         try:
146             metadata_yaml = yaml.load(metadata_raw)
147         except yaml.YAMLError as e:
148             print('Unable to parse YAML header in {0}:\n{1}'.format(
149                 path, e), file=sys.stderr)
150             sys.exit(1)
151
152     return metadata_raw, metadata_yaml, text
153
154
155 def load_yaml(filename):
156     """
157     Wrapper around YAML loading so that 'import yaml' is only needed
158     in one file.
159     """
160
161     try:
162         with open(filename, 'r') as reader:
163             return yaml.load(reader)
164     except (yaml.YAMLError, IOError) as e:
165         print('Unable to load YAML file {0}:\n{1}'.format(
166             filename, e), file=sys.stderr)
167         sys.exit(1)
168
169
170 def check_unwanted_files(dir_path, reporter):
171     """
172     Check that unwanted files are not present.
173     """
174
175     for filename in UNWANTED_FILES:
176         path = os.path.join(dir_path, filename)
177         reporter.check(not os.path.exists(path),
178                        path,
179                        "Unwanted file found")
180
181
182 def require(condition, message):
183     """Fail if condition not met."""
184
185     if not condition:
186         print(message, file=sys.stderr)
187         sys.exit(1)