6af0a3317061d8663dafad9de41aec2156b7e597
[rnaseq-cwl-training.git] / bin / util.py
1 import sys
2 import json
3 from subprocess import Popen, PIPE
4
5 try:
6     import yaml
7 except ImportError:
8     print('Unable to import YAML module: please install PyYAML', file=sys.stderr)
9     sys.exit(1)
10
11 class Reporter(object):
12     """Collect and report errors."""
13
14     def __init__(self):
15         """Constructor."""
16
17         super(Reporter, self).__init__()
18         self.messages = []
19
20
21     def check_field(self, filename, name, values, key, expected):
22         """Check that a dictionary has an expected value."""
23
24         if key not in values:
25             self.add(filename, '{0} does not contain {1}', name, key)
26         elif values[key] != expected:
27             self.add(filename, '{0} {1} is {2} not {3}', name, key, values[key], expected)
28
29
30     def check(self, condition, location, fmt, *args):
31         """Append error if condition not met."""
32
33         if not condition:
34             self.add(location, fmt, *args)
35
36
37     def add(self, location, fmt, *args):
38         """Append error unilaterally."""
39
40         if isinstance(location, type(None)):
41             coords = ''
42         elif isinstance(location, str):
43             coords = '{0}: '.format(location)
44         elif isinstance(location, tuple):
45             filename, line_number = location
46             coords = '{0}:{1}: '.format(*location)
47         else:
48             assert False, 'Unknown location "{0}"/{1}'.format(location, type(location))
49
50         self.messages.append(coords + fmt.format(*args))
51
52
53     def report(self, stream=sys.stdout):
54         """Report all messages."""
55
56         if not self.messages:
57             return
58         for m in sorted(self.messages):
59             print(m, file=stream)
60
61
62 def read_markdown(parser, path):
63     """
64     Get YAML and AST for Markdown file, returning
65     {'metadata':yaml, 'metadata_len':N, 'text':text, 'lines':[(i, line, len)], 'doc':doc}.
66     """
67
68     # Split and extract YAML (if present).
69     with open(path, 'r') as reader:
70         body = reader.read()
71     metadata_raw, metadata_yaml, body = split_metadata(path, body)
72
73     # Split into lines.
74     metadata_len = 0 if metadata_raw is None else metadata_raw.count('\n')
75     lines = [(metadata_len+i+1, line, len(line)) for (i, line) in enumerate(body.split('\n'))]
76
77     # Parse Markdown.
78     cmd = 'ruby {0}'.format(parser)
79     p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, close_fds=True, universal_newlines=True)
80     stdout_data, stderr_data = p.communicate(body)
81     doc = json.loads(stdout_data)
82
83     return {
84         'metadata': metadata_yaml,
85         'metadata_len': metadata_len,
86         'text': body,
87         'lines': lines,
88         'doc': doc
89     }
90
91
92 def split_metadata(path, text):
93     """
94     Get raw (text) metadata, metadata as YAML, and rest of body.
95     If no metadata, return (None, None, body).
96     """
97
98     metadata_raw = None
99     metadata_yaml = None
100     metadata_len = None
101
102     pieces = text.split('---', 2)
103     if len(pieces) == 3:
104         metadata_raw = pieces[1]
105         text = pieces[2]
106         try:
107             metadata_yaml = yaml.load(metadata_raw)
108         except yaml.YAMLError as e:
109             print('Unable to parse YAML header in {0}:\n{1}'.format(path, e), file=sys.stderr)
110             sys.exit(1)
111
112     return metadata_raw, metadata_yaml, text
113
114
115 def load_yaml(filename):
116     """
117     Wrapper around YAML loading so that 'import yaml' and error
118     handling is only needed in one place.
119     """
120
121     with open(filename, 'r') as reader:
122         return yaml.load(reader)