49b67ffa1f4c4cf4fccd33bec0e74c4b4e58ffef
[arvados.git] / tools / crunchstat-summary / crunchstat_summary / summarizer.py
1 from __future__ import print_function
2
3 import arvados
4 import collections
5 import functools
6 import itertools
7 import logging
8 import math
9 import re
10 import sys
11
12 logger = logging.getLogger(__name__)
13 logger.addHandler(logging.NullHandler())
14
15 class Summarizer(object):
16     existing_constraints = {}
17
18     def __init__(self, logdata, label='job'):
19         self._logdata = logdata
20         self.label = label
21
22     def run(self):
23         # stats_max: {category: {stat: val}}
24         self.stats_max = collections.defaultdict(
25             functools.partial(collections.defaultdict,
26                               lambda: float('-Inf')))
27         # task_stats: {task_id: {category: {stat: val}}}
28         self.task_stats = collections.defaultdict(
29             functools.partial(collections.defaultdict, dict))
30         for line in self._logdata:
31             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) success in (?P<elapsed>\d+) seconds', line)
32             if m:
33                 task_id = m.group('seq')
34                 elapsed = int(m.group('elapsed'))
35                 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
36                 if elapsed > self.stats_max['time']['elapsed']:
37                     self.stats_max['time']['elapsed'] = elapsed
38                 continue
39             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
40             if not m:
41                 continue
42             if m.group('category').endswith(':'):
43                 # "notice:" etc.
44                 continue
45             elif m.group('category') == 'error':
46                 continue
47             task_id = m.group('seq')
48             this_interval_s = None
49             for group in ['current', 'interval']:
50                 if not m.group(group):
51                     continue
52                 category = m.group('category')
53                 words = m.group(group).split(' ')
54                 stats = {}
55                 for val, stat in zip(words[::2], words[1::2]):
56                     try:
57                         if '.' in val:
58                             stats[stat] = float(val)
59                         else:
60                             stats[stat] = int(val)
61                     except ValueError as e:
62                         raise ValueError(
63                             'Error parsing {} stat in "{}": {!r}'.format(
64                                 stat, line, e))
65                 if 'user' in stats or 'sys' in stats:
66                     stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
67                 if 'tx' in stats or 'rx' in stats:
68                     stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
69                 for stat, val in stats.iteritems():
70                     if group == 'interval':
71                         if stat == 'seconds':
72                             this_interval_s = val
73                             continue
74                         elif not (this_interval_s > 0):
75                             logger.error(
76                                 "BUG? interval stat given with duration {!r}".
77                                 format(this_interval_s))
78                             continue
79                         else:
80                             stat = stat + '__rate'
81                             val = val / this_interval_s
82                     else:
83                         self.task_stats[task_id][category][stat] = val
84                     if val > self.stats_max[category][stat]:
85                         self.stats_max[category][stat] = val
86         self.job_tot = collections.defaultdict(
87             functools.partial(collections.defaultdict, int))
88         for task_id, task_stat in self.task_stats.iteritems():
89             for category, stat_last in task_stat.iteritems():
90                 for stat, val in stat_last.iteritems():
91                     if stat in ['cpus', 'cache', 'swap', 'rss']:
92                         # meaningless stats like 16 cpu cores x 5 tasks = 80
93                         continue
94                     self.job_tot[category][stat] += val
95
96     def report(self):
97         return "\n".join(itertools.chain(
98             self._report_gen(),
99             self._recommend_gen())) + "\n"
100
101     def _report_gen(self):
102         yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
103         for category, stat_max in sorted(self.stats_max.iteritems()):
104             for stat, val in sorted(stat_max.iteritems()):
105                 if stat.endswith('__rate'):
106                     continue
107                 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
108                 val = self._format(val)
109                 tot = self._format(self.job_tot[category].get(stat, '-'))
110                 yield "\t".join([category, stat, str(val), max_rate, tot])
111         for args in (
112                 ('Max CPU time spent by a single task: {}s',
113                  self.stats_max['cpu']['user+sys'],
114                  None),
115                 ('Max CPU usage in a single interval: {}%',
116                  self.stats_max['cpu']['user+sys__rate'],
117                  lambda x: x * 100),
118                 ('Overall CPU usage: {}%',
119                  self.job_tot['cpu']['user+sys'] /
120                  self.job_tot['time']['elapsed'],
121                  lambda x: x * 100),
122                 ('Max memory used by a single task: {}GB',
123                  self.stats_max['mem']['rss'],
124                  lambda x: x / 1e9),
125                 ('Max network traffic in a single task: {}GB',
126                  self.stats_max['net:eth0']['tx+rx'],
127                  lambda x: x / 1e9),
128                 ('Max network speed in a single interval: {}MB/s',
129                  self.stats_max['net:eth0']['tx+rx__rate'],
130                  lambda x: x / 1e6)):
131             format_string, val, transform = args
132             if val == float('-Inf'):
133                 continue
134             if transform:
135                 val = transform(val)
136             yield "# "+format_string.format(self._format(val))
137
138     def _recommend_gen(self):
139         return itertools.chain(
140             self._recommend_cpu(),
141             self._recommend_ram())
142
143     def _recommend_cpu(self):
144         """Recommend asking for 4 cores if max CPU usage was 333%"""
145
146         cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
147         if cpu_max_rate == float('-Inf'):
148             logger.warning('%s: no CPU usage data', self.label)
149             return
150         used_cores = int(math.ceil(cpu_max_rate))
151         asked_cores =  self.existing_constraints.get('min_cores_per_node')
152         if asked_cores is None or used_cores < asked_cores:
153             yield (
154                 '#!! {} max CPU usage was {}% -- '
155                 'try runtime_constraints "min_cores_per_node":{}'
156             ).format(
157                 self.label,
158                 int(math.ceil(cpu_max_rate*100)),
159                 int(used_cores))
160
161     def _recommend_ram(self):
162         """Recommend asking for 2048 MiB RAM if max rss was 1248 MiB"""
163
164         used_ram = self.stats_max['mem']['rss']
165         if used_ram == float('-Inf'):
166             logger.warning('%s: no memory usage data', self.label)
167             return
168         used_ram = math.ceil(float(used_ram) / (1<<20))
169         asked_ram = self.existing_constraints.get('min_ram_mb_per_node')
170         if asked_ram is None or math.ceil(used_ram/(1<<10)) < asked_ram/(1<<10):
171             yield (
172                 '#!! {} never used more than {} MiB RAM -- '
173                 'try runtime_constraints "min_ram_mb_per_node":{}'
174             ).format(
175                 self.label,
176                 int(used_ram),
177                 int(math.ceil(used_ram/(1<<10))*(1<<10)))
178
179     def _format(self, val):
180         """Return a string representation of a stat.
181
182         {:.2f} for floats, default format for everything else."""
183         if isinstance(val, float):
184             return '{:.2f}'.format(val)
185         else:
186             return '{}'.format(val)
187
188
189 class CollectionSummarizer(Summarizer):
190     def __init__(self, collection_id):
191         collection = arvados.collection.CollectionReader(collection_id)
192         filenames = [filename for filename in collection]
193         if len(filenames) != 1:
194             raise ValueError(
195                 "collection {} has {} files; need exactly one".format(
196                     collection_id, len(filenames)))
197         super(CollectionSummarizer, self).__init__(
198             collection.open(filenames[0]))
199
200
201 class JobSummarizer(CollectionSummarizer):
202     def __init__(self, job):
203         arv = arvados.api('v1')
204         if isinstance(job, str):
205             self.job = arv.jobs().get(uuid=job).execute()
206         else:
207             self.job = job
208         self.label = self.job['uuid']
209         self.existing_constraints = self.job.get('runtime_constraints', {})
210         if not self.job['log']:
211             raise ValueError(
212                 "job {} has no log; live summary not implemented".format(
213                     self.job['uuid']))
214         super(JobSummarizer, self).__init__(self.job['log'])
215
216
217 class PipelineSummarizer():
218     def __init__(self, pipeline_instance_uuid):
219         arv = arvados.api('v1')
220         instance = arv.pipeline_instances().get(
221             uuid=pipeline_instance_uuid).execute()
222         self.summarizers = collections.OrderedDict()
223         for cname, component in instance['components'].iteritems():
224             if 'job' not in component:
225                 logger.warning(
226                     "%s: skipping component with no job assigned", cname)
227             elif component['job'].get('log') is None:
228                 logger.warning(
229                     "%s: skipping job %s with no log available",
230                     cname, component['job'].get('uuid'))
231             else:
232                 logger.debug(
233                     "%s: reading log from %s", cname, component['job']['log'])
234                 summarizer = JobSummarizer(component['job'])
235                 summarizer.label = cname
236                 self.summarizers[cname] = summarizer
237
238     def run(self):
239         for summarizer in self.summarizers.itervalues():
240             summarizer.run()
241
242     def report(self):
243         txt = ''
244         for cname, summarizer in self.summarizers.iteritems():
245             txt += '### Summary for {} ({})\n'.format(
246                 cname, summarizer.job['uuid'])
247             txt += summarizer.report()
248             txt += '\n'
249         return txt