e1a03a21084240a4f1af7e741fee58eec3ac116d
[rnaseq-cwl-training.git] / bin / util.py
1
2 import sys
3 import os
4 import json
5 from subprocess import Popen, PIPE
6
7 # Import this way to produce a more useful error message.
8 try:
9     import yaml
10 except ImportError:
11     print('Unable to import YAML module: please install PyYAML', file=sys.stderr)
12     sys.exit(1)
13
14
15 # Things an image file's name can end with.
16 IMAGE_FILE_SUFFIX = {
17     '.gif',
18     '.jpg',
19     '.png',
20     '.svg'
21 }
22
23 # Files that shouldn't be present.
24 UNWANTED_FILES = [
25     '.nojekyll'
26 ]
27
28 # Marker to show that an expected value hasn't been provided.
29 # (Can't use 'None' because that might be a legitimate value.)
30 REPORTER_NOT_SET = []
31
32
33 class Reporter:
34     """Collect and report errors."""
35
36     def __init__(self):
37         """Constructor."""
38         self.messages = []
39
40     def check_field(self, filename, name, values, key, expected=REPORTER_NOT_SET):
41         """Check that a dictionary has an expected value."""
42
43         if key not in values:
44             self.add(filename, '{0} does not contain {1}', name, key)
45         elif expected is REPORTER_NOT_SET:
46             pass
47         elif type(expected) in (tuple, set, list):
48             if values[key] not in expected:
49                 self.add(
50                     filename, '{0} {1} value {2} is not in {3}', name, key, values[key], expected)
51         elif values[key] != expected:
52             self.add(filename, '{0} {1} is {2} not {3}',
53                      name, key, values[key], expected)
54
55     def check(self, condition, location, fmt, *args):
56         """Append error if condition not met."""
57
58         if not condition:
59             self.add(location, fmt, *args)
60
61     def add(self, location, fmt, *args):
62         """Append error unilaterally."""
63
64         self.messages.append((location, fmt.format(*args)))
65
66     @staticmethod
67     def pretty(item):
68         location, message = item
69         if isinstance(location, type(None)):
70             return message
71         elif isinstance(location, str):
72             return location + ': ' + message
73         elif isinstance(location, tuple):
74             return '{0}:{1}: '.format(*location) + message
75
76         print('Unknown item "{0}"'.format(item), file=sys.stderr)
77         return NotImplemented
78
79     @staticmethod
80     def key(item):
81         location, message = item
82         if isinstance(location, type(None)):
83             return ('', -1, message)
84         elif isinstance(location, str):
85             return (location, -1, message)
86         elif isinstance(location, tuple):
87             return (location[0], location[1], message)
88
89         print('Unknown item "{0}"'.format(item), file=sys.stderr)
90         return NotImplemented
91
92     def report(self, stream=sys.stdout):
93         """Report all messages in order."""
94
95         if not self.messages:
96             return
97
98         for m in sorted(self.messages, key=self.key):
99             print(self.pretty(m), file=stream)
100
101
102 def read_markdown(parser, path):
103     """
104     Get YAML and AST for Markdown file, returning
105     {'metadata':yaml, 'metadata_len':N, 'text':text, 'lines':[(i, line, len)], 'doc':doc}.
106     """
107
108     # Split and extract YAML (if present).
109     with open(path, 'r') as reader:
110         body = reader.read()
111     metadata_raw, metadata_yaml, body = split_metadata(path, body)
112
113     # Split into lines.
114     metadata_len = 0 if metadata_raw is None else metadata_raw.count('\n')
115     lines = [(metadata_len+i+1, line, len(line))
116              for (i, line) in enumerate(body.split('\n'))]
117
118     # Parse Markdown.
119     cmd = 'ruby {0}'.format(parser)
120     p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE,
121               close_fds=True, universal_newlines=True)
122     stdout_data, stderr_data = p.communicate(body)
123     doc = json.loads(stdout_data)
124
125     return {
126         'metadata': metadata_yaml,
127         'metadata_len': metadata_len,
128         'text': body,
129         'lines': lines,
130         'doc': doc
131     }
132
133
134 def split_metadata(path, text):
135     """
136     Get raw (text) metadata, metadata as YAML, and rest of body.
137     If no metadata, return (None, None, body).
138     """
139
140     metadata_raw = None
141     metadata_yaml = None
142
143     pieces = text.split('---', 2)
144     if len(pieces) == 3:
145         metadata_raw = pieces[1]
146         text = pieces[2]
147         try:
148             metadata_yaml = yaml.load(metadata_raw)
149         except yaml.YAMLError as e:
150             print('Unable to parse YAML header in {0}:\n{1}'.format(
151                 path, e), file=sys.stderr)
152             sys.exit(1)
153
154     return metadata_raw, metadata_yaml, text
155
156
157 def load_yaml(filename):
158     """
159     Wrapper around YAML loading so that 'import yaml' is only needed
160     in one file.
161     """
162
163     try:
164         with open(filename, 'r') as reader:
165             return yaml.load(reader)
166     except (yaml.YAMLError, IOError) as e:
167         print('Unable to load YAML file {0}:\n{1}'.format(
168             filename, e), file=sys.stderr)
169         sys.exit(1)
170
171
172 def check_unwanted_files(dir_path, reporter):
173     """
174     Check that unwanted files are not present.
175     """
176
177     for filename in UNWANTED_FILES:
178         path = os.path.join(dir_path, filename)
179         reporter.check(not os.path.exists(path),
180                        path,
181                        "Unwanted file found")
182
183
184 def require(condition, message):
185     """Fail if condition not met."""
186
187     if not condition:
188         print(message, file=sys.stderr)
189         sys.exit(1)