Merge branch '8319-bcbio-cwl' closes #8319
[arvados.git] / tools / crunchstat-summary / crunchstat_summary / summarizer.py
1 from __future__ import print_function
2
3 import arvados
4 import collections
5 import crunchstat_summary.chartjs
6 import crunchstat_summary.reader
7 import datetime
8 import functools
9 import itertools
10 import math
11 import re
12 import sys
13 import threading
14
15 from arvados.api import OrderedJsonModel
16 from crunchstat_summary import logger
17
18 # Recommend memory constraints that are this multiple of an integral
19 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
20 # that have amounts like 7.5 GiB according to the kernel.)
21 AVAILABLE_RAM_RATIO = 0.95
22
23
24 class Task(object):
25     def __init__(self):
26         self.starttime = None
27         self.series = collections.defaultdict(list)
28
29
30 class Summarizer(object):
31     def __init__(self, logdata, label=None, skip_child_jobs=False):
32         self._logdata = logdata
33
34         self.label = label
35         self.starttime = None
36         self.finishtime = None
37         self._skip_child_jobs = skip_child_jobs
38
39         # stats_max: {category: {stat: val}}
40         self.stats_max = collections.defaultdict(
41             functools.partial(collections.defaultdict, lambda: 0))
42         # task_stats: {task_id: {category: {stat: val}}}
43         self.task_stats = collections.defaultdict(
44             functools.partial(collections.defaultdict, dict))
45
46         self.seq_to_uuid = {}
47         self.tasks = collections.defaultdict(Task)
48
49         # We won't bother recommending new runtime constraints if the
50         # constraints given when running the job are known to us and
51         # are already suitable.  If applicable, the subclass
52         # constructor will overwrite this with something useful.
53         self.existing_constraints = {}
54
55         logger.debug("%s: logdata %s", self.label, logdata)
56
57     def run(self):
58         logger.debug("%s: parsing logdata %s", self.label, self._logdata)
59         for line in self._logdata:
60             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
61             if m:
62                 seq = int(m.group('seq'))
63                 uuid = m.group('task_uuid')
64                 self.seq_to_uuid[seq] = uuid
65                 logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
66                 continue
67
68             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
69             if m:
70                 task_id = self.seq_to_uuid[int(m.group('seq'))]
71                 elapsed = int(m.group('elapsed'))
72                 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
73                 if elapsed > self.stats_max['time']['elapsed']:
74                     self.stats_max['time']['elapsed'] = elapsed
75                 continue
76
77             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
78             if m:
79                 uuid = m.group('uuid')
80                 if self._skip_child_jobs:
81                     logger.warning('%s: omitting stats from child job %s'
82                                    ' because --skip-child-jobs flag is on',
83                                    self.label, uuid)
84                     continue
85                 logger.debug('%s: follow %s', self.label, uuid)
86                 child_summarizer = JobSummarizer(uuid)
87                 child_summarizer.stats_max = self.stats_max
88                 child_summarizer.task_stats = self.task_stats
89                 child_summarizer.tasks = self.tasks
90                 child_summarizer.starttime = self.starttime
91                 child_summarizer.run()
92                 logger.debug('%s: done %s', self.label, uuid)
93                 continue
94
95             m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
96             if not m:
97                 continue
98
99             if self.label is None:
100                 self.label = m.group('job_uuid')
101                 logger.debug('%s: using job uuid as label', self.label)
102             if m.group('category').endswith(':'):
103                 # "stderr crunchstat: notice: ..."
104                 continue
105             elif m.group('category') in ('error', 'caught'):
106                 continue
107             elif m.group('category') == 'read':
108                 # "stderr crunchstat: read /proc/1234/net/dev: ..."
109                 # (crunchstat formatting fixed, but old logs still say this)
110                 continue
111             task_id = self.seq_to_uuid[int(m.group('seq'))]
112             task = self.tasks[task_id]
113
114             # Use the first and last crunchstat timestamps as
115             # approximations of starttime and finishtime.
116             timestamp = datetime.datetime.strptime(
117                 m.group('timestamp'), '%Y-%m-%d_%H:%M:%S')
118             if not task.starttime:
119                 task.starttime = timestamp
120                 logger.debug('%s: task %s starttime %s',
121                              self.label, task_id, timestamp)
122             task.finishtime = timestamp
123
124             if not self.starttime:
125                 self.starttime = timestamp
126             self.finishtime = timestamp
127
128             this_interval_s = None
129             for group in ['current', 'interval']:
130                 if not m.group(group):
131                     continue
132                 category = m.group('category')
133                 words = m.group(group).split(' ')
134                 stats = {}
135                 for val, stat in zip(words[::2], words[1::2]):
136                     try:
137                         if '.' in val:
138                             stats[stat] = float(val)
139                         else:
140                             stats[stat] = int(val)
141                     except ValueError as e:
142                         raise ValueError(
143                             'Error parsing {} stat in "{}": {!r}'.format(
144                                 stat, line, e))
145                 if 'user' in stats or 'sys' in stats:
146                     stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
147                 if 'tx' in stats or 'rx' in stats:
148                     stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
149                 for stat, val in stats.iteritems():
150                     if group == 'interval':
151                         if stat == 'seconds':
152                             this_interval_s = val
153                             continue
154                         elif not (this_interval_s > 0):
155                             logger.error(
156                                 "BUG? interval stat given with duration {!r}".
157                                 format(this_interval_s))
158                             continue
159                         else:
160                             stat = stat + '__rate'
161                             val = val / this_interval_s
162                             if stat in ['user+sys__rate', 'tx+rx__rate']:
163                                 task.series[category, stat].append(
164                                     (timestamp - self.starttime, val))
165                     else:
166                         if stat in ['rss']:
167                             task.series[category, stat].append(
168                                 (timestamp - self.starttime, val))
169                         self.task_stats[task_id][category][stat] = val
170                     if val > self.stats_max[category][stat]:
171                         self.stats_max[category][stat] = val
172         logger.debug('%s: done parsing', self.label)
173
174         self.job_tot = collections.defaultdict(
175             functools.partial(collections.defaultdict, int))
176         for task_id, task_stat in self.task_stats.iteritems():
177             for category, stat_last in task_stat.iteritems():
178                 for stat, val in stat_last.iteritems():
179                     if stat in ['cpus', 'cache', 'swap', 'rss']:
180                         # meaningless stats like 16 cpu cores x 5 tasks = 80
181                         continue
182                     self.job_tot[category][stat] += val
183         logger.debug('%s: done totals', self.label)
184
185     def long_label(self):
186         label = self.label
187         if self.finishtime:
188             label += ' -- elapsed time '
189             s = (self.finishtime - self.starttime).total_seconds()
190             if s > 86400:
191                 label += '{}d'.format(int(s/86400))
192             if s > 3600:
193                 label += '{}h'.format(int(s/3600) % 24)
194             if s > 60:
195                 label += '{}m'.format(int(s/60) % 60)
196             label += '{}s'.format(int(s) % 60)
197         return label
198
199     def text_report(self):
200         if not self.tasks:
201             return "(no report generated)\n"
202         return "\n".join(itertools.chain(
203             self._text_report_gen(),
204             self._recommend_gen())) + "\n"
205
206     def html_report(self):
207         return crunchstat_summary.chartjs.ChartJS(self.label, [self]).html()
208
209     def _text_report_gen(self):
210         yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
211         for category, stat_max in sorted(self.stats_max.iteritems()):
212             for stat, val in sorted(stat_max.iteritems()):
213                 if stat.endswith('__rate'):
214                     continue
215                 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
216                 val = self._format(val)
217                 tot = self._format(self.job_tot[category].get(stat, '-'))
218                 yield "\t".join([category, stat, str(val), max_rate, tot])
219         for args in (
220                 ('Number of tasks: {}',
221                  len(self.tasks),
222                  None),
223                 ('Max CPU time spent by a single task: {}s',
224                  self.stats_max['cpu']['user+sys'],
225                  None),
226                 ('Max CPU usage in a single interval: {}%',
227                  self.stats_max['cpu']['user+sys__rate'],
228                  lambda x: x * 100),
229                 ('Overall CPU usage: {}%',
230                  self.job_tot['cpu']['user+sys'] /
231                  self.job_tot['time']['elapsed']
232                  if self.job_tot['time']['elapsed'] > 0 else 0,
233                  lambda x: x * 100),
234                 ('Max memory used by a single task: {}GB',
235                  self.stats_max['mem']['rss'],
236                  lambda x: x / 1e9),
237                 ('Max network traffic in a single task: {}GB',
238                  self.stats_max['net:eth0']['tx+rx'] +
239                  self.stats_max['net:keep0']['tx+rx'],
240                  lambda x: x / 1e9),
241                 ('Max network speed in a single interval: {}MB/s',
242                  self.stats_max['net:eth0']['tx+rx__rate'] +
243                  self.stats_max['net:keep0']['tx+rx__rate'],
244                  lambda x: x / 1e6),
245                 ('Keep cache miss rate {}%',
246                  (float(self.job_tot['keepcache']['miss']) /
247                  float(self.job_tot['keepcalls']['get']))
248                  if self.job_tot['keepcalls']['get'] > 0 else 0,
249                  lambda x: x * 100.0),
250                 ('Keep cache utilization {}%',
251                  (float(self.job_tot['blkio:0:0']['read']) /
252                  float(self.job_tot['net:keep0']['rx']))
253                  if self.job_tot['net:keep0']['rx'] > 0 else 0,
254                  lambda x: x * 100.0)):
255             format_string, val, transform = args
256             if val == float('-Inf'):
257                 continue
258             if transform:
259                 val = transform(val)
260             yield "# "+format_string.format(self._format(val))
261
262     def _recommend_gen(self):
263         return itertools.chain(
264             self._recommend_cpu(),
265             self._recommend_ram(),
266             self._recommend_keep_cache())
267
268     def _recommend_cpu(self):
269         """Recommend asking for 4 cores if max CPU usage was 333%"""
270
271         cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
272         if cpu_max_rate == float('-Inf'):
273             logger.warning('%s: no CPU usage data', self.label)
274             return
275         used_cores = max(1, int(math.ceil(cpu_max_rate)))
276         asked_cores = self.existing_constraints.get('min_cores_per_node')
277         if asked_cores is None or used_cores < asked_cores:
278             yield (
279                 '#!! {} max CPU usage was {}% -- '
280                 'try runtime_constraints "min_cores_per_node":{}'
281             ).format(
282                 self.label,
283                 int(math.ceil(cpu_max_rate*100)),
284                 int(used_cores))
285
286     def _recommend_ram(self):
287         """Recommend an economical RAM constraint for this job.
288
289         Nodes that are advertised as "8 gibibytes" actually have what
290         we might call "8 nearlygibs" of memory available for jobs.
291         Here, we calculate a whole number of nearlygibs that would
292         have sufficed to run the job, then recommend requesting a node
293         with that number of nearlygibs (expressed as mebibytes).
294
295         Requesting a node with "nearly 8 gibibytes" is our best hope
296         of getting a node that actually has nearly 8 gibibytes
297         available.  If the node manager is smart enough to account for
298         the discrepancy itself when choosing/creating a node, we'll
299         get an 8 GiB node with nearly 8 GiB available.  Otherwise, the
300         advertised size of the next-size-smaller node (say, 6 GiB)
301         will be too low to satisfy our request, so we will effectively
302         get rounded up to 8 GiB.
303
304         For example, if we need 7500 MiB, we can ask for 7500 MiB, and
305         we will generally get a node that is advertised as "8 GiB" and
306         has at least 7500 MiB available.  However, asking for 8192 MiB
307         would either result in an unnecessarily expensive 12 GiB node
308         (if node manager knows about the discrepancy), or an 8 GiB
309         node which has less than 8192 MiB available and is therefore
310         considered by crunch-dispatch to be too small to meet our
311         constraint.
312
313         When node manager learns how to predict the available memory
314         for each node type such that crunch-dispatch always agrees
315         that a node is big enough to run the job it was brought up
316         for, all this will be unnecessary.  We'll just ask for exactly
317         the memory we want -- even if that happens to be 8192 MiB.
318         """
319
320         used_bytes = self.stats_max['mem']['rss']
321         if used_bytes == float('-Inf'):
322             logger.warning('%s: no memory usage data', self.label)
323             return
324         used_mib = math.ceil(float(used_bytes) / 1048576)
325         asked_mib = self.existing_constraints.get('min_ram_mb_per_node')
326
327         nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
328         if asked_mib is None or (
329                 math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib)):
330             yield (
331                 '#!! {} max RSS was {} MiB -- '
332                 'try runtime_constraints "min_ram_mb_per_node":{}'
333             ).format(
334                 self.label,
335                 int(used_mib),
336                 int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024))
337
338     def _recommend_keep_cache(self):
339         """Recommend increasing keep cache if utilization < 80%"""
340         if self.job_tot['net:keep0']['rx'] == 0:
341             return
342         utilization = (float(self.job_tot['blkio:0:0']['read']) /
343                        float(self.job_tot['net:keep0']['rx']))
344         asked_mib = self.existing_constraints.get('keep_cache_mb_per_task', 256)
345
346         if utilization < 0.8:
347             yield (
348                 '#!! {} Keep cache utilization was {:.2f}% -- '
349                 'try runtime_constraints "keep_cache_mb_per_task":{} (or more)'
350             ).format(
351                 self.label,
352                 utilization * 100.0,
353                 asked_mib*2)
354
355
356     def _format(self, val):
357         """Return a string representation of a stat.
358
359         {:.2f} for floats, default format for everything else."""
360         if isinstance(val, float):
361             return '{:.2f}'.format(val)
362         else:
363             return '{}'.format(val)
364
365
366 class CollectionSummarizer(Summarizer):
367     def __init__(self, collection_id, **kwargs):
368         super(CollectionSummarizer, self).__init__(
369             crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
370         self.label = collection_id
371
372
373 class JobSummarizer(Summarizer):
374     def __init__(self, job, **kwargs):
375         arv = arvados.api('v1')
376         if isinstance(job, basestring):
377             self.job = arv.jobs().get(uuid=job).execute()
378         else:
379             self.job = job
380         rdr = None
381         if self.job.get('log'):
382             try:
383                 rdr = crunchstat_summary.reader.CollectionReader(self.job['log'])
384             except arvados.errors.NotFoundError as e:
385                 logger.warning("Trying event logs after failing to read "
386                                "log collection %s: %s", self.job['log'], e)
387             else:
388                 label = self.job['uuid']
389         if rdr is None:
390             rdr = crunchstat_summary.reader.LiveLogReader(self.job['uuid'])
391             label = self.job['uuid'] + ' (partial)'
392         super(JobSummarizer, self).__init__(rdr, **kwargs)
393         self.label = label
394         self.existing_constraints = self.job.get('runtime_constraints', {})
395
396
397 class PipelineSummarizer(object):
398     def __init__(self, pipeline_instance_uuid, **kwargs):
399         arv = arvados.api('v1', model=OrderedJsonModel())
400         instance = arv.pipeline_instances().get(
401             uuid=pipeline_instance_uuid).execute()
402         self.summarizers = collections.OrderedDict()
403         for cname, component in instance['components'].iteritems():
404             if 'job' not in component:
405                 logger.warning(
406                     "%s: skipping component with no job assigned", cname)
407             else:
408                 logger.info(
409                     "%s: job %s", cname, component['job']['uuid'])
410                 summarizer = JobSummarizer(component['job'], **kwargs)
411                 summarizer.label = '{} {}'.format(
412                     cname, component['job']['uuid'])
413                 self.summarizers[cname] = summarizer
414         self.label = pipeline_instance_uuid
415
416     def run(self):
417         threads = []
418         for summarizer in self.summarizers.itervalues():
419             t = threading.Thread(target=summarizer.run)
420             t.daemon = True
421             t.start()
422             threads.append(t)
423         for t in threads:
424             t.join()
425
426     def text_report(self):
427         txt = ''
428         for cname, summarizer in self.summarizers.iteritems():
429             txt += '### Summary for {} ({})\n'.format(
430                 cname, summarizer.job['uuid'])
431             txt += summarizer.text_report()
432             txt += '\n'
433         return txt
434
435     def html_report(self):
436         return crunchstat_summary.chartjs.ChartJS(
437             self.label, self.summarizers.itervalues()).html()