11309: 9001: Fix catch-all-exceptions antipattern.
[arvados.git] / tools / crunchstat-summary / crunchstat_summary / summarizer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 from __future__ import print_function
6
7 import arvados
8 import collections
9 import crunchstat_summary.dygraphs
10 import crunchstat_summary.reader
11 import datetime
12 import functools
13 import itertools
14 import math
15 import re
16 import sys
17 import threading
18 import _strptime
19
20 from arvados.api import OrderedJsonModel
21 from crunchstat_summary import logger
22
23 # Recommend memory constraints that are this multiple of an integral
24 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
25 # that have amounts like 7.5 GiB according to the kernel.)
26 AVAILABLE_RAM_RATIO = 0.95
27
28
29 # Workaround datetime.datetime.strptime() thread-safety bug by calling
30 # it once before starting threads.  https://bugs.python.org/issue7980
31 datetime.datetime.strptime('1999-12-31_23:59:59', '%Y-%m-%d_%H:%M:%S')
32
33
34 WEBCHART_CLASS = crunchstat_summary.dygraphs.DygraphsChart
35
36
37 class Task(object):
38     def __init__(self):
39         self.starttime = None
40         self.series = collections.defaultdict(list)
41
42
43 class Summarizer(object):
44     def __init__(self, logdata, label=None, skip_child_jobs=False):
45         self._logdata = logdata
46
47         self.label = label
48         self.starttime = None
49         self.finishtime = None
50         self._skip_child_jobs = skip_child_jobs
51
52         # stats_max: {category: {stat: val}}
53         self.stats_max = collections.defaultdict(
54             functools.partial(collections.defaultdict, lambda: 0))
55         # task_stats: {task_id: {category: {stat: val}}}
56         self.task_stats = collections.defaultdict(
57             functools.partial(collections.defaultdict, dict))
58
59         self.seq_to_uuid = {}
60         self.tasks = collections.defaultdict(Task)
61
62         # We won't bother recommending new runtime constraints if the
63         # constraints given when running the job are known to us and
64         # are already suitable.  If applicable, the subclass
65         # constructor will overwrite this with something useful.
66         self.existing_constraints = {}
67
68         logger.debug("%s: logdata %s", self.label, logdata)
69
70     def run(self):
71         logger.debug("%s: parsing logdata %s", self.label, self._logdata)
72         for line in self._logdata:
73             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
74             if m:
75                 seq = int(m.group('seq'))
76                 uuid = m.group('task_uuid')
77                 self.seq_to_uuid[seq] = uuid
78                 logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
79                 continue
80
81             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
82             if m:
83                 task_id = self.seq_to_uuid[int(m.group('seq'))]
84                 elapsed = int(m.group('elapsed'))
85                 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
86                 if elapsed > self.stats_max['time']['elapsed']:
87                     self.stats_max['time']['elapsed'] = elapsed
88                 continue
89
90             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
91             if m:
92                 uuid = m.group('uuid')
93                 if self._skip_child_jobs:
94                     logger.warning('%s: omitting stats from child job %s'
95                                    ' because --skip-child-jobs flag is on',
96                                    self.label, uuid)
97                     continue
98                 logger.debug('%s: follow %s', self.label, uuid)
99                 child_summarizer = ProcessSummarizer(uuid)
100                 child_summarizer.stats_max = self.stats_max
101                 child_summarizer.task_stats = self.task_stats
102                 child_summarizer.tasks = self.tasks
103                 child_summarizer.starttime = self.starttime
104                 child_summarizer.run()
105                 logger.debug('%s: done %s', self.label, uuid)
106                 continue
107
108             m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
109             if m:
110                 # crunch1 job
111                 task_id = self.seq_to_uuid[int(m.group('seq'))]
112             else:
113                 m = re.search(r'^(?P<timestamp>\S+) (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
114                 if m:
115                     # crunch2 container (seq/task don't apply)
116                     task_id = 'container'
117                 else:
118                     # not a crunchstat log
119                     continue
120             task = self.tasks[task_id]
121
122             if self.label is None:
123                 try:
124                     self.label = m.group('job_uuid')
125                 except IndexError:
126                     self.label = 'container'
127             if m.group('category').endswith(':'):
128                 # "stderr crunchstat: notice: ..."
129                 continue
130             elif m.group('category') in ('error', 'caught'):
131                 continue
132             elif m.group('category') == 'read':
133                 # "stderr crunchstat: read /proc/1234/net/dev: ..."
134                 # (crunchstat formatting fixed, but old logs still say this)
135                 continue
136
137             # Use the first and last crunchstat timestamps as
138             # approximations of starttime and finishtime.
139             timestamp = m.group('timestamp')
140             if timestamp[10:11] == '_':
141                 timestamp = datetime.datetime.strptime(
142                     timestamp, '%Y-%m-%d_%H:%M:%S')
143             elif timestamp[10:11] == 'T':
144                 timestamp = datetime.datetime.strptime(
145                     timestamp[:19], '%Y-%m-%dT%H:%M:%S')
146             else:
147                 raise ValueError("Cannot parse timestamp {!r}".format(
148                     timestamp))
149
150             if not task.starttime:
151                 task.starttime = timestamp
152                 logger.debug('%s: task %s starttime %s',
153                              self.label, task_id, timestamp)
154             task.finishtime = timestamp
155
156             if not self.starttime:
157                 self.starttime = timestamp
158             self.finishtime = timestamp
159
160             this_interval_s = None
161             for group in ['current', 'interval']:
162                 if not m.group(group):
163                     continue
164                 category = m.group('category')
165                 words = m.group(group).split(' ')
166                 stats = {}
167                 try:
168                     for val, stat in zip(words[::2], words[1::2]):
169                         if '.' in val:
170                             stats[stat] = float(val)
171                         else:
172                             stats[stat] = int(val)
173                 except ValueError as e:
174                     logger.warning('Error parsing {} stat: {!r}'.format(
175                         stat, e))
176                     continue
177                 if 'user' in stats or 'sys' in stats:
178                     stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
179                 if 'tx' in stats or 'rx' in stats:
180                     stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
181                 for stat, val in stats.iteritems():
182                     if group == 'interval':
183                         if stat == 'seconds':
184                             this_interval_s = val
185                             continue
186                         elif not (this_interval_s > 0):
187                             logger.error(
188                                 "BUG? interval stat given with duration {!r}".
189                                 format(this_interval_s))
190                             continue
191                         else:
192                             stat = stat + '__rate'
193                             val = val / this_interval_s
194                             if stat in ['user+sys__rate', 'tx+rx__rate']:
195                                 task.series[category, stat].append(
196                                     (timestamp - self.starttime, val))
197                     else:
198                         if stat in ['rss']:
199                             task.series[category, stat].append(
200                                 (timestamp - self.starttime, val))
201                         self.task_stats[task_id][category][stat] = val
202                     if val > self.stats_max[category][stat]:
203                         self.stats_max[category][stat] = val
204         logger.debug('%s: done parsing', self.label)
205
206         self.job_tot = collections.defaultdict(
207             functools.partial(collections.defaultdict, int))
208         for task_id, task_stat in self.task_stats.iteritems():
209             for category, stat_last in task_stat.iteritems():
210                 for stat, val in stat_last.iteritems():
211                     if stat in ['cpus', 'cache', 'swap', 'rss']:
212                         # meaningless stats like 16 cpu cores x 5 tasks = 80
213                         continue
214                     self.job_tot[category][stat] += val
215         logger.debug('%s: done totals', self.label)
216
217     def long_label(self):
218         label = self.label
219         if self.finishtime:
220             label += ' -- elapsed time '
221             s = (self.finishtime - self.starttime).total_seconds()
222             if s > 86400:
223                 label += '{}d'.format(int(s/86400))
224             if s > 3600:
225                 label += '{}h'.format(int(s/3600) % 24)
226             if s > 60:
227                 label += '{}m'.format(int(s/60) % 60)
228             label += '{}s'.format(int(s) % 60)
229         return label
230
231     def text_report(self):
232         if not self.tasks:
233             return "(no report generated)\n"
234         return "\n".join(itertools.chain(
235             self._text_report_gen(),
236             self._recommend_gen())) + "\n"
237
238     def html_report(self):
239         return WEBCHART_CLASS(self.label, [self]).html()
240
241     def _text_report_gen(self):
242         yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
243         for category, stat_max in sorted(self.stats_max.iteritems()):
244             for stat, val in sorted(stat_max.iteritems()):
245                 if stat.endswith('__rate'):
246                     continue
247                 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
248                 val = self._format(val)
249                 tot = self._format(self.job_tot[category].get(stat, '-'))
250                 yield "\t".join([category, stat, str(val), max_rate, tot])
251         for args in (
252                 ('Number of tasks: {}',
253                  len(self.tasks),
254                  None),
255                 ('Max CPU time spent by a single task: {}s',
256                  self.stats_max['cpu']['user+sys'],
257                  None),
258                 ('Max CPU usage in a single interval: {}%',
259                  self.stats_max['cpu']['user+sys__rate'],
260                  lambda x: x * 100),
261                 ('Overall CPU usage: {}%',
262                  self.job_tot['cpu']['user+sys'] /
263                  self.job_tot['time']['elapsed']
264                  if self.job_tot['time']['elapsed'] > 0 else 0,
265                  lambda x: x * 100),
266                 ('Max memory used by a single task: {}GB',
267                  self.stats_max['mem']['rss'],
268                  lambda x: x / 1e9),
269                 ('Max network traffic in a single task: {}GB',
270                  self.stats_max['net:eth0']['tx+rx'] +
271                  self.stats_max['net:keep0']['tx+rx'],
272                  lambda x: x / 1e9),
273                 ('Max network speed in a single interval: {}MB/s',
274                  self.stats_max['net:eth0']['tx+rx__rate'] +
275                  self.stats_max['net:keep0']['tx+rx__rate'],
276                  lambda x: x / 1e6),
277                 ('Keep cache miss rate {}%',
278                  (float(self.job_tot['keepcache']['miss']) /
279                  float(self.job_tot['keepcalls']['get']))
280                  if self.job_tot['keepcalls']['get'] > 0 else 0,
281                  lambda x: x * 100.0),
282                 ('Keep cache utilization {}%',
283                  (float(self.job_tot['blkio:0:0']['read']) /
284                  float(self.job_tot['net:keep0']['rx']))
285                  if self.job_tot['net:keep0']['rx'] > 0 else 0,
286                  lambda x: x * 100.0)):
287             format_string, val, transform = args
288             if val == float('-Inf'):
289                 continue
290             if transform:
291                 val = transform(val)
292             yield "# "+format_string.format(self._format(val))
293
294     def _recommend_gen(self):
295         return itertools.chain(
296             self._recommend_cpu(),
297             self._recommend_ram(),
298             self._recommend_keep_cache())
299
300     def _recommend_cpu(self):
301         """Recommend asking for 4 cores if max CPU usage was 333%"""
302
303         cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
304         if cpu_max_rate == float('-Inf'):
305             logger.warning('%s: no CPU usage data', self.label)
306             return
307         used_cores = max(1, int(math.ceil(cpu_max_rate)))
308         asked_cores = self.existing_constraints.get('min_cores_per_node')
309         if asked_cores is None or used_cores < asked_cores:
310             yield (
311                 '#!! {} max CPU usage was {}% -- '
312                 'try runtime_constraints "min_cores_per_node":{}'
313             ).format(
314                 self.label,
315                 int(math.ceil(cpu_max_rate*100)),
316                 int(used_cores))
317
318     def _recommend_ram(self):
319         """Recommend an economical RAM constraint for this job.
320
321         Nodes that are advertised as "8 gibibytes" actually have what
322         we might call "8 nearlygibs" of memory available for jobs.
323         Here, we calculate a whole number of nearlygibs that would
324         have sufficed to run the job, then recommend requesting a node
325         with that number of nearlygibs (expressed as mebibytes).
326
327         Requesting a node with "nearly 8 gibibytes" is our best hope
328         of getting a node that actually has nearly 8 gibibytes
329         available.  If the node manager is smart enough to account for
330         the discrepancy itself when choosing/creating a node, we'll
331         get an 8 GiB node with nearly 8 GiB available.  Otherwise, the
332         advertised size of the next-size-smaller node (say, 6 GiB)
333         will be too low to satisfy our request, so we will effectively
334         get rounded up to 8 GiB.
335
336         For example, if we need 7500 MiB, we can ask for 7500 MiB, and
337         we will generally get a node that is advertised as "8 GiB" and
338         has at least 7500 MiB available.  However, asking for 8192 MiB
339         would either result in an unnecessarily expensive 12 GiB node
340         (if node manager knows about the discrepancy), or an 8 GiB
341         node which has less than 8192 MiB available and is therefore
342         considered by crunch-dispatch to be too small to meet our
343         constraint.
344
345         When node manager learns how to predict the available memory
346         for each node type such that crunch-dispatch always agrees
347         that a node is big enough to run the job it was brought up
348         for, all this will be unnecessary.  We'll just ask for exactly
349         the memory we want -- even if that happens to be 8192 MiB.
350         """
351
352         used_bytes = self.stats_max['mem']['rss']
353         if used_bytes == float('-Inf'):
354             logger.warning('%s: no memory usage data', self.label)
355             return
356         used_mib = math.ceil(float(used_bytes) / 1048576)
357         asked_mib = self.existing_constraints.get('min_ram_mb_per_node')
358
359         nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
360         if asked_mib is None or (
361                 math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib)):
362             yield (
363                 '#!! {} max RSS was {} MiB -- '
364                 'try runtime_constraints "min_ram_mb_per_node":{}'
365             ).format(
366                 self.label,
367                 int(used_mib),
368                 int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024))
369
370     def _recommend_keep_cache(self):
371         """Recommend increasing keep cache if utilization < 80%"""
372         if self.job_tot['net:keep0']['rx'] == 0:
373             return
374         utilization = (float(self.job_tot['blkio:0:0']['read']) /
375                        float(self.job_tot['net:keep0']['rx']))
376         asked_mib = self.existing_constraints.get('keep_cache_mb_per_task', 256)
377
378         if utilization < 0.8:
379             yield (
380                 '#!! {} Keep cache utilization was {:.2f}% -- '
381                 'try runtime_constraints "keep_cache_mb_per_task":{} (or more)'
382             ).format(
383                 self.label,
384                 utilization * 100.0,
385                 asked_mib*2)
386
387
388     def _format(self, val):
389         """Return a string representation of a stat.
390
391         {:.2f} for floats, default format for everything else."""
392         if isinstance(val, float):
393             return '{:.2f}'.format(val)
394         else:
395             return '{}'.format(val)
396
397
398 class CollectionSummarizer(Summarizer):
399     def __init__(self, collection_id, **kwargs):
400         super(CollectionSummarizer, self).__init__(
401             crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
402         self.label = collection_id
403
404
405 def NewSummarizer(process, **kwargs):
406     """Construct with the appropriate subclass for this uuid/object."""
407
408     if not isinstance(process, dict):
409         uuid = process
410         process = None
411         arv = arvados.api('v1', model=OrderedJsonModel())
412
413     if re.search('-dz642-', uuid):
414         if process is None:
415             process = arv.containers().get(uuid=uuid).execute()
416         return ContainerSummarizer(process, **kwargs)
417     elif re.search('-xvhdp-', uuid):
418         if process is None:
419             ctrReq = arv.container_requests().get(uuid=uuid).execute()
420             ctrUUID = ctrReq['container_uuid']
421             process = arv.containers().get(uuid=ctrUUID).execute()
422         return ContainerSummarizer(process, **kwargs)
423     elif re.search('-8i9sb-', uuid):
424         if process is None:
425             process = arv.jobs().get(uuid=uuid).execute()
426         return JobSummarizer(process, **kwargs)
427     elif re.search('-d1hrv-', uuid):
428         if process is None:
429             process = arv.pipeline_instances().get(uuid=uuid).execute()
430         return PipelineSummarizer(process, **kwargs)
431     else:
432         raise ArgumentError("Unrecognized uuid %s", uuid)
433
434
435 class ProcessSummarizer(Summarizer):
436     """Process is a job, pipeline, container, or container request."""
437
438     def __init__(self, process, **kwargs):
439         rdr = None
440         self.process = process
441         if self.process.get('log'):
442             try:
443                 rdr = crunchstat_summary.reader.CollectionReader(self.process['log'])
444             except arvados.errors.NotFoundError as e:
445                 logger.warning("Trying event logs after failing to read "
446                                "log collection %s: %s", self.process['log'], e)
447             else:
448                 label = self.process['uuid']
449         if rdr is None:
450             rdr = crunchstat_summary.reader.LiveLogReader(self.process['uuid'])
451             label = self.process['uuid'] + ' (partial)'
452         super(ProcessSummarizer, self).__init__(rdr, **kwargs)
453         self.label = label
454         self.existing_constraints = self.process.get('runtime_constraints', {})
455
456
457 class JobSummarizer(ProcessSummarizer):
458     pass
459
460
461 class ContainerSummarizer(ProcessSummarizer):
462     pass
463
464
465 class PipelineSummarizer(object):
466     def __init__(self, instance, **kwargs):
467         self.summarizers = collections.OrderedDict()
468         for cname, component in instance['components'].iteritems():
469             if 'job' not in component:
470                 logger.warning(
471                     "%s: skipping component with no job assigned", cname)
472             else:
473                 logger.info(
474                     "%s: job %s", cname, component['job']['uuid'])
475                 summarizer = JobSummarizer(component['job'], **kwargs)
476                 summarizer.label = '{} {}'.format(
477                     cname, component['job']['uuid'])
478                 self.summarizers[cname] = summarizer
479         self.label = instance['uuid']
480
481     def run(self):
482         threads = []
483         for summarizer in self.summarizers.itervalues():
484             t = threading.Thread(target=summarizer.run)
485             t.daemon = True
486             t.start()
487             threads.append(t)
488         for t in threads:
489             t.join()
490
491     def text_report(self):
492         txt = ''
493         for cname, summarizer in self.summarizers.iteritems():
494             txt += '### Summary for {} ({})\n'.format(
495                 cname, summarizer.process['uuid'])
496             txt += summarizer.text_report()
497             txt += '\n'
498         return txt
499
500     def html_report(self):
501         return WEBCHART_CLASS(self.label, self.summarizers.itervalues()).html()