Import future print in case of python2 environment
[rnaseq-cwl-training.git] / bin / util.py
1 import sys
2 import os
3 import json
4 from subprocess import Popen, PIPE
5 from __future__ import print_function
6
7 # Import this way to produce a more useful error message.
8 try:
9     import yaml
10 except ImportError:
11     print('Unable to import YAML module: please install PyYAML', file=sys.stderr)
12     sys.exit(1)
13
14
15 # Things an image file's name can end with.
16 IMAGE_FILE_SUFFIX = {
17     '.gif',
18     '.jpg',
19     '.png',
20     '.svg'
21 }
22
23 # Files that shouldn't be present.
24 UNWANTED_FILES = [
25     '.nojekyll'
26 ]
27
28 # Marker to show that an expected value hasn't been provided.
29 # (Can't use 'None' because that might be a legitimate value.)
30 REPORTER_NOT_SET = []
31
32 class Reporter(object):
33     """Collect and report errors."""
34
35     def __init__(self):
36         """Constructor."""
37
38         super(Reporter, self).__init__()
39         self.messages = []
40
41
42     def check_field(self, filename, name, values, key, expected=REPORTER_NOT_SET):
43         """Check that a dictionary has an expected value."""
44
45         if key not in values:
46             self.add(filename, '{0} does not contain {1}', name, key)
47         elif expected is REPORTER_NOT_SET:
48             pass
49         elif type(expected) in (tuple, set, list):
50             if values[key] not in expected:
51                 self.add(filename, '{0} {1} value {2} is not in {3}', name, key, values[key], expected)
52         elif values[key] != expected:
53             self.add(filename, '{0} {1} is {2} not {3}', name, key, values[key], expected)
54
55
56     def check(self, condition, location, fmt, *args):
57         """Append error if condition not met."""
58
59         if not condition:
60             self.add(location, fmt, *args)
61
62
63     def add(self, location, fmt, *args):
64         """Append error unilaterally."""
65
66         self.messages.append((location, fmt.format(*args)))
67
68
69     def report(self, stream=sys.stdout):
70         """Report all messages in order."""
71
72         if not self.messages:
73             return
74
75         def pretty(item):
76             location, message = item
77             if isinstance(location, type(None)):
78                 return message
79             elif isinstance(location, str):
80                 return location + ': ' + message
81             elif isinstance(location, tuple):
82                 return '{0}:{1}: '.format(*location) + message
83             else:
84                 assert False, 'Unknown item "{0}"'.format(item)
85
86         def key(item):
87             location, message = item
88             if isinstance(location, type(None)):
89                 return ('', -1, message)
90             elif isinstance(location, str):
91                 return (location, -1, message)
92             elif isinstance(location, tuple):
93                 return (location[0], location[1], message)
94             else:
95                 assert False, 'Unknown item "{0}"'.format(item)
96
97         for m in sorted(self.messages, key=key):
98             print(pretty(m), file=stream)
99
100
101 def read_markdown(parser, path):
102     """
103     Get YAML and AST for Markdown file, returning
104     {'metadata':yaml, 'metadata_len':N, 'text':text, 'lines':[(i, line, len)], 'doc':doc}.
105     """
106
107     # Split and extract YAML (if present).
108     with open(path, 'r') as reader:
109         body = reader.read()
110     metadata_raw, metadata_yaml, body = split_metadata(path, body)
111
112     # Split into lines.
113     metadata_len = 0 if metadata_raw is None else metadata_raw.count('\n')
114     lines = [(metadata_len+i+1, line, len(line)) for (i, line) in enumerate(body.split('\n'))]
115
116     # Parse Markdown.
117     cmd = 'ruby {0}'.format(parser)
118     p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, close_fds=True, universal_newlines=True)
119     stdout_data, stderr_data = p.communicate(body)
120     doc = json.loads(stdout_data)
121
122     return {
123         'metadata': metadata_yaml,
124         'metadata_len': metadata_len,
125         'text': body,
126         'lines': lines,
127         'doc': doc
128     }
129
130
131 def split_metadata(path, text):
132     """
133     Get raw (text) metadata, metadata as YAML, and rest of body.
134     If no metadata, return (None, None, body).
135     """
136
137     metadata_raw = None
138     metadata_yaml = None
139     metadata_len = None
140
141     pieces = text.split('---', 2)
142     if len(pieces) == 3:
143         metadata_raw = pieces[1]
144         text = pieces[2]
145         try:
146             metadata_yaml = yaml.load(metadata_raw)
147         except yaml.YAMLError as e:
148             print('Unable to parse YAML header in {0}:\n{1}'.format(path, e), file=sys.stderr)
149             sys.exit(1)
150
151     return metadata_raw, metadata_yaml, text
152
153
154 def load_yaml(filename):
155     """
156     Wrapper around YAML loading so that 'import yaml' is only needed
157     in one file.
158     """
159
160     try:
161         with open(filename, 'r') as reader:
162             return yaml.load(reader)
163     except (yaml.YAMLError, FileNotFoundError) as e:
164         print('Unable to load YAML file {0}:\n{1}'.format(filename, e), file=sys.stderr)
165         sys.exit(1)
166
167
168 def check_unwanted_files(dir_path, reporter):
169     """
170     Check that unwanted files are not present.
171     """
172
173     for filename in UNWANTED_FILES:
174         path = os.path.join(dir_path, filename)
175         reporter.check(not os.path.exists(path),
176                        path,
177                        "Unwanted file found")
178
179
180 def require(condition, message):
181     """Fail if condition not met."""
182
183     if not condition:
184         print(message, file=sys.stderr)
185         sys.exit(1)