Merge pull request #270 from maxim-belkin/py3-classes
[rnaseq-cwl-training.git] / bin / util.py
1
2 import sys
3 import os
4 import json
5 from subprocess import Popen, PIPE
6
7 # Import this way to produce a more useful error message.
8 try:
9     import yaml
10 except ImportError:
11     print('Unable to import YAML module: please install PyYAML', file=sys.stderr)
12     sys.exit(1)
13
14
15 # Things an image file's name can end with.
16 IMAGE_FILE_SUFFIX = {
17     '.gif',
18     '.jpg',
19     '.png',
20     '.svg'
21 }
22
23 # Files that shouldn't be present.
24 UNWANTED_FILES = [
25     '.nojekyll'
26 ]
27
28 # Marker to show that an expected value hasn't been provided.
29 # (Can't use 'None' because that might be a legitimate value.)
30 REPORTER_NOT_SET = []
31
32
33 class Reporter:
34     """Collect and report errors."""
35
36     def __init__(self):
37         """Constructor."""
38
39         super(Reporter, self).__init__()
40         self.messages = []
41
42     def check_field(self, filename, name, values, key, expected=REPORTER_NOT_SET):
43         """Check that a dictionary has an expected value."""
44
45         if key not in values:
46             self.add(filename, '{0} does not contain {1}', name, key)
47         elif expected is REPORTER_NOT_SET:
48             pass
49         elif type(expected) in (tuple, set, list):
50             if values[key] not in expected:
51                 self.add(
52                     filename, '{0} {1} value {2} is not in {3}', name, key, values[key], expected)
53         elif values[key] != expected:
54             self.add(filename, '{0} {1} is {2} not {3}',
55                      name, key, values[key], expected)
56
57     def check(self, condition, location, fmt, *args):
58         """Append error if condition not met."""
59
60         if not condition:
61             self.add(location, fmt, *args)
62
63     def add(self, location, fmt, *args):
64         """Append error unilaterally."""
65
66         self.messages.append((location, fmt.format(*args)))
67
68     @staticmethod
69     def pretty(item):
70         location, message = item
71         if isinstance(location, type(None)):
72             return message
73         elif isinstance(location, str):
74             return location + ': ' + message
75         elif isinstance(location, tuple):
76             return '{0}:{1}: '.format(*location) + message
77
78         print('Unknown item "{0}"'.format(item), file=sys.stderr)
79         return NotImplemented
80
81     @staticmethod
82     def key(item):
83         location, message = item
84         if isinstance(location, type(None)):
85             return ('', -1, message)
86         elif isinstance(location, str):
87             return (location, -1, message)
88         elif isinstance(location, tuple):
89             return (location[0], location[1], message)
90
91         print('Unknown item "{0}"'.format(item), file=sys.stderr)
92         return NotImplemented
93
94     def report(self, stream=sys.stdout):
95         """Report all messages in order."""
96
97         if not self.messages:
98             return
99
100         for m in sorted(self.messages, key=self.key):
101             print(self.pretty(m), file=stream)
102
103
104 def read_markdown(parser, path):
105     """
106     Get YAML and AST for Markdown file, returning
107     {'metadata':yaml, 'metadata_len':N, 'text':text, 'lines':[(i, line, len)], 'doc':doc}.
108     """
109
110     # Split and extract YAML (if present).
111     with open(path, 'r') as reader:
112         body = reader.read()
113     metadata_raw, metadata_yaml, body = split_metadata(path, body)
114
115     # Split into lines.
116     metadata_len = 0 if metadata_raw is None else metadata_raw.count('\n')
117     lines = [(metadata_len+i+1, line, len(line))
118              for (i, line) in enumerate(body.split('\n'))]
119
120     # Parse Markdown.
121     cmd = 'ruby {0}'.format(parser)
122     p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE,
123               close_fds=True, universal_newlines=True)
124     stdout_data, stderr_data = p.communicate(body)
125     doc = json.loads(stdout_data)
126
127     return {
128         'metadata': metadata_yaml,
129         'metadata_len': metadata_len,
130         'text': body,
131         'lines': lines,
132         'doc': doc
133     }
134
135
136 def split_metadata(path, text):
137     """
138     Get raw (text) metadata, metadata as YAML, and rest of body.
139     If no metadata, return (None, None, body).
140     """
141
142     metadata_raw = None
143     metadata_yaml = None
144
145     pieces = text.split('---', 2)
146     if len(pieces) == 3:
147         metadata_raw = pieces[1]
148         text = pieces[2]
149         try:
150             metadata_yaml = yaml.load(metadata_raw)
151         except yaml.YAMLError as e:
152             print('Unable to parse YAML header in {0}:\n{1}'.format(
153                 path, e), file=sys.stderr)
154             sys.exit(1)
155
156     return metadata_raw, metadata_yaml, text
157
158
159 def load_yaml(filename):
160     """
161     Wrapper around YAML loading so that 'import yaml' is only needed
162     in one file.
163     """
164
165     try:
166         with open(filename, 'r') as reader:
167             return yaml.load(reader)
168     except (yaml.YAMLError, IOError) as e:
169         print('Unable to load YAML file {0}:\n{1}'.format(
170             filename, e), file=sys.stderr)
171         sys.exit(1)
172
173
174 def check_unwanted_files(dir_path, reporter):
175     """
176     Check that unwanted files are not present.
177     """
178
179     for filename in UNWANTED_FILES:
180         path = os.path.join(dir_path, filename)
181         reporter.check(not os.path.exists(path),
182                        path,
183                        "Unwanted file found")
184
185
186 def require(condition, message):
187     """Fail if condition not met."""
188
189     if not condition:
190         print(message, file=sys.stderr)
191         sys.exit(1)