11309: Drop parent container name from labels.
[arvados.git] / tools / crunchstat-summary / crunchstat_summary / summarizer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 from __future__ import print_function
6
7 import arvados
8 import collections
9 import crunchstat_summary.dygraphs
10 import crunchstat_summary.reader
11 import datetime
12 import functools
13 import itertools
14 import math
15 import re
16 import sys
17 import threading
18 import _strptime
19
20 from arvados.api import OrderedJsonModel
21 from crunchstat_summary import logger
22
23 # Recommend memory constraints that are this multiple of an integral
24 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
25 # that have amounts like 7.5 GiB according to the kernel.)
26 AVAILABLE_RAM_RATIO = 0.95
27
28
29 # Workaround datetime.datetime.strptime() thread-safety bug by calling
30 # it once before starting threads.  https://bugs.python.org/issue7980
31 datetime.datetime.strptime('1999-12-31_23:59:59', '%Y-%m-%d_%H:%M:%S')
32
33
34 WEBCHART_CLASS = crunchstat_summary.dygraphs.DygraphsChart
35
36
37 class Task(object):
38     def __init__(self):
39         self.starttime = None
40         self.series = collections.defaultdict(list)
41
42
43 class Summarizer(object):
44     def __init__(self, logdata, label=None, skip_child_jobs=False, uuid=None, **kwargs):
45         self._logdata = logdata
46
47         self.uuid = uuid
48         self.label = label
49         self.starttime = None
50         self.finishtime = None
51         self._skip_child_jobs = skip_child_jobs
52
53         # stats_max: {category: {stat: val}}
54         self.stats_max = collections.defaultdict(
55             functools.partial(collections.defaultdict, lambda: 0))
56         # task_stats: {task_id: {category: {stat: val}}}
57         self.task_stats = collections.defaultdict(
58             functools.partial(collections.defaultdict, dict))
59
60         self.seq_to_uuid = {}
61         self.tasks = collections.defaultdict(Task)
62
63         # We won't bother recommending new runtime constraints if the
64         # constraints given when running the job are known to us and
65         # are already suitable.  If applicable, the subclass
66         # constructor will overwrite this with something useful.
67         self.existing_constraints = {}
68
69         logger.debug("%s: logdata %s", self.label, logdata)
70
71     def run(self):
72         logger.debug("%s: parsing logdata %s", self.label, self._logdata)
73         detected_crunch1 = False
74         for line in self._logdata:
75             if not detected_crunch1 and '-8i9sb-' in line:
76                 detected_crunch1 = True
77
78             if detected_crunch1:
79                 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
80                 if m:
81                     seq = int(m.group('seq'))
82                     uuid = m.group('task_uuid')
83                     self.seq_to_uuid[seq] = uuid
84                     logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
85                     continue
86
87                 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
88                 if m:
89                     task_id = self.seq_to_uuid[int(m.group('seq'))]
90                     elapsed = int(m.group('elapsed'))
91                     self.task_stats[task_id]['time'] = {'elapsed': elapsed}
92                     if elapsed > self.stats_max['time']['elapsed']:
93                         self.stats_max['time']['elapsed'] = elapsed
94                     continue
95
96                 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
97                 if m:
98                     uuid = m.group('uuid')
99                     if self._skip_child_jobs:
100                         logger.warning('%s: omitting stats from child job %s'
101                                        ' because --skip-child-jobs flag is on',
102                                        self.label, uuid)
103                         continue
104                     logger.debug('%s: follow %s', self.label, uuid)
105                     child_summarizer = ProcessSummarizer(uuid)
106                     child_summarizer.stats_max = self.stats_max
107                     child_summarizer.task_stats = self.task_stats
108                     child_summarizer.tasks = self.tasks
109                     child_summarizer.starttime = self.starttime
110                     child_summarizer.run()
111                     logger.debug('%s: done %s', self.label, uuid)
112                     continue
113
114                 m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
115                 if not m:
116                     continue
117             else:
118                 # crunch2
119                 m = re.search(r'^(?P<timestamp>\S+) (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
120                 if not m:
121                     continue
122
123             if self.label is None:
124                 try:
125                     self.label = m.group('job_uuid')
126                 except IndexError:
127                     self.label = 'container'
128             if m.group('category').endswith(':'):
129                 # "stderr crunchstat: notice: ..."
130                 continue
131             elif m.group('category') in ('error', 'caught'):
132                 continue
133             elif m.group('category') in ['read', 'open', 'cgroup', 'CID']:
134                 # "stderr crunchstat: read /proc/1234/net/dev: ..."
135                 # (old logs are less careful with unprefixed error messages)
136                 continue
137
138             if detected_crunch1:
139                 task_id = self.seq_to_uuid[int(m.group('seq'))]
140             else:
141                 task_id = 'container'
142             task = self.tasks[task_id]
143
144             # Use the first and last crunchstat timestamps as
145             # approximations of starttime and finishtime.
146             timestamp = m.group('timestamp')
147             if timestamp[10:11] == '_':
148                 timestamp = datetime.datetime.strptime(
149                     timestamp, '%Y-%m-%d_%H:%M:%S')
150             elif timestamp[10:11] == 'T':
151                 timestamp = datetime.datetime.strptime(
152                     timestamp[:19], '%Y-%m-%dT%H:%M:%S')
153             else:
154                 raise ValueError("Cannot parse timestamp {!r}".format(
155                     timestamp))
156
157             if not task.starttime:
158                 task.starttime = timestamp
159                 logger.debug('%s: task %s starttime %s',
160                              self.label, task_id, timestamp)
161             task.finishtime = timestamp
162
163             if not self.starttime:
164                 self.starttime = timestamp
165             self.finishtime = timestamp
166
167             this_interval_s = None
168             for group in ['current', 'interval']:
169                 if not m.group(group):
170                     continue
171                 category = m.group('category')
172                 words = m.group(group).split(' ')
173                 stats = {}
174                 try:
175                     for val, stat in zip(words[::2], words[1::2]):
176                         if '.' in val:
177                             stats[stat] = float(val)
178                         else:
179                             stats[stat] = int(val)
180                 except ValueError as e:
181                     logger.warning('Error parsing {} stat: {!r}'.format(
182                         stat, e))
183                     continue
184                 if 'user' in stats or 'sys' in stats:
185                     stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
186                 if 'tx' in stats or 'rx' in stats:
187                     stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
188                 for stat, val in stats.iteritems():
189                     if group == 'interval':
190                         if stat == 'seconds':
191                             this_interval_s = val
192                             continue
193                         elif not (this_interval_s > 0):
194                             logger.error(
195                                 "BUG? interval stat given with duration {!r}".
196                                 format(this_interval_s))
197                             continue
198                         else:
199                             stat = stat + '__rate'
200                             val = val / this_interval_s
201                             if stat in ['user+sys__rate', 'tx+rx__rate']:
202                                 task.series[category, stat].append(
203                                     (timestamp - self.starttime, val))
204                     else:
205                         if stat in ['rss']:
206                             task.series[category, stat].append(
207                                 (timestamp - self.starttime, val))
208                         self.task_stats[task_id][category][stat] = val
209                     if val > self.stats_max[category][stat]:
210                         self.stats_max[category][stat] = val
211         logger.debug('%s: done parsing', self.label)
212
213         self.job_tot = collections.defaultdict(
214             functools.partial(collections.defaultdict, int))
215         for task_id, task_stat in self.task_stats.iteritems():
216             for category, stat_last in task_stat.iteritems():
217                 for stat, val in stat_last.iteritems():
218                     if stat in ['cpus', 'cache', 'swap', 'rss']:
219                         # meaningless stats like 16 cpu cores x 5 tasks = 80
220                         continue
221                     self.job_tot[category][stat] += val
222         logger.debug('%s: done totals', self.label)
223
224     def long_label(self):
225         label = self.label
226         if self.finishtime:
227             label += ' -- elapsed time '
228             s = (self.finishtime - self.starttime).total_seconds()
229             if s > 86400:
230                 label += '{}d'.format(int(s/86400))
231             if s > 3600:
232                 label += '{}h'.format(int(s/3600) % 24)
233             if s > 60:
234                 label += '{}m'.format(int(s/60) % 60)
235             label += '{}s'.format(int(s) % 60)
236         return label
237
238     def text_report(self):
239         if not self.tasks:
240             return "(no report generated)\n"
241         return "\n".join(itertools.chain(
242             self._text_report_gen(),
243             self._recommend_gen())) + "\n"
244
245     def html_report(self):
246         return WEBCHART_CLASS(self.label, [self]).html()
247
248     def _text_report_gen(self):
249         yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
250         for category, stat_max in sorted(self.stats_max.iteritems()):
251             for stat, val in sorted(stat_max.iteritems()):
252                 if stat.endswith('__rate'):
253                     continue
254                 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
255                 val = self._format(val)
256                 tot = self._format(self.job_tot[category].get(stat, '-'))
257                 yield "\t".join([category, stat, str(val), max_rate, tot])
258         for args in (
259                 ('Number of tasks: {}',
260                  len(self.tasks),
261                  None),
262                 ('Max CPU time spent by a single task: {}s',
263                  self.stats_max['cpu']['user+sys'],
264                  None),
265                 ('Max CPU usage in a single interval: {}%',
266                  self.stats_max['cpu']['user+sys__rate'],
267                  lambda x: x * 100),
268                 ('Overall CPU usage: {}%',
269                  self.job_tot['cpu']['user+sys'] /
270                  self.job_tot['time']['elapsed']
271                  if self.job_tot['time']['elapsed'] > 0 else 0,
272                  lambda x: x * 100),
273                 ('Max memory used by a single task: {}GB',
274                  self.stats_max['mem']['rss'],
275                  lambda x: x / 1e9),
276                 ('Max network traffic in a single task: {}GB',
277                  self.stats_max['net:eth0']['tx+rx'] +
278                  self.stats_max['net:keep0']['tx+rx'],
279                  lambda x: x / 1e9),
280                 ('Max network speed in a single interval: {}MB/s',
281                  self.stats_max['net:eth0']['tx+rx__rate'] +
282                  self.stats_max['net:keep0']['tx+rx__rate'],
283                  lambda x: x / 1e6),
284                 ('Keep cache miss rate {}%',
285                  (float(self.job_tot['keepcache']['miss']) /
286                  float(self.job_tot['keepcalls']['get']))
287                  if self.job_tot['keepcalls']['get'] > 0 else 0,
288                  lambda x: x * 100.0),
289                 ('Keep cache utilization {}%',
290                  (float(self.job_tot['blkio:0:0']['read']) /
291                  float(self.job_tot['net:keep0']['rx']))
292                  if self.job_tot['net:keep0']['rx'] > 0 else 0,
293                  lambda x: x * 100.0)):
294             format_string, val, transform = args
295             if val == float('-Inf'):
296                 continue
297             if transform:
298                 val = transform(val)
299             yield "# "+format_string.format(self._format(val))
300
301     def _recommend_gen(self):
302         return itertools.chain(
303             self._recommend_cpu(),
304             self._recommend_ram(),
305             self._recommend_keep_cache())
306
307     def _recommend_cpu(self):
308         """Recommend asking for 4 cores if max CPU usage was 333%"""
309
310         cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
311         if cpu_max_rate == float('-Inf'):
312             logger.warning('%s: no CPU usage data', self.label)
313             return
314         used_cores = max(1, int(math.ceil(cpu_max_rate)))
315         asked_cores = self.existing_constraints.get('min_cores_per_node')
316         if asked_cores is None or used_cores < asked_cores:
317             yield (
318                 '#!! {} max CPU usage was {}% -- '
319                 'try runtime_constraints "min_cores_per_node":{}'
320             ).format(
321                 self.label,
322                 int(math.ceil(cpu_max_rate*100)),
323                 int(used_cores))
324
325     def _recommend_ram(self):
326         """Recommend an economical RAM constraint for this job.
327
328         Nodes that are advertised as "8 gibibytes" actually have what
329         we might call "8 nearlygibs" of memory available for jobs.
330         Here, we calculate a whole number of nearlygibs that would
331         have sufficed to run the job, then recommend requesting a node
332         with that number of nearlygibs (expressed as mebibytes).
333
334         Requesting a node with "nearly 8 gibibytes" is our best hope
335         of getting a node that actually has nearly 8 gibibytes
336         available.  If the node manager is smart enough to account for
337         the discrepancy itself when choosing/creating a node, we'll
338         get an 8 GiB node with nearly 8 GiB available.  Otherwise, the
339         advertised size of the next-size-smaller node (say, 6 GiB)
340         will be too low to satisfy our request, so we will effectively
341         get rounded up to 8 GiB.
342
343         For example, if we need 7500 MiB, we can ask for 7500 MiB, and
344         we will generally get a node that is advertised as "8 GiB" and
345         has at least 7500 MiB available.  However, asking for 8192 MiB
346         would either result in an unnecessarily expensive 12 GiB node
347         (if node manager knows about the discrepancy), or an 8 GiB
348         node which has less than 8192 MiB available and is therefore
349         considered by crunch-dispatch to be too small to meet our
350         constraint.
351
352         When node manager learns how to predict the available memory
353         for each node type such that crunch-dispatch always agrees
354         that a node is big enough to run the job it was brought up
355         for, all this will be unnecessary.  We'll just ask for exactly
356         the memory we want -- even if that happens to be 8192 MiB.
357         """
358
359         used_bytes = self.stats_max['mem']['rss']
360         if used_bytes == float('-Inf'):
361             logger.warning('%s: no memory usage data', self.label)
362             return
363         used_mib = math.ceil(float(used_bytes) / 1048576)
364         asked_mib = self.existing_constraints.get('min_ram_mb_per_node')
365
366         nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
367         if asked_mib is None or (
368                 math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib)):
369             yield (
370                 '#!! {} max RSS was {} MiB -- '
371                 'try runtime_constraints "min_ram_mb_per_node":{}'
372             ).format(
373                 self.label,
374                 int(used_mib),
375                 int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024))
376
377     def _recommend_keep_cache(self):
378         """Recommend increasing keep cache if utilization < 80%"""
379         if self.job_tot['net:keep0']['rx'] == 0:
380             return
381         utilization = (float(self.job_tot['blkio:0:0']['read']) /
382                        float(self.job_tot['net:keep0']['rx']))
383         asked_mib = self.existing_constraints.get('keep_cache_mb_per_task', 256)
384
385         if utilization < 0.8:
386             yield (
387                 '#!! {} Keep cache utilization was {:.2f}% -- '
388                 'try runtime_constraints "keep_cache_mb_per_task":{} (or more)'
389             ).format(
390                 self.label,
391                 utilization * 100.0,
392                 asked_mib*2)
393
394
395     def _format(self, val):
396         """Return a string representation of a stat.
397
398         {:.2f} for floats, default format for everything else."""
399         if isinstance(val, float):
400             return '{:.2f}'.format(val)
401         else:
402             return '{}'.format(val)
403
404
405 class CollectionSummarizer(Summarizer):
406     def __init__(self, collection_id, **kwargs):
407         super(CollectionSummarizer, self).__init__(
408             crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
409         self.label = collection_id
410
411
412 def NewSummarizer(process, **kwargs):
413     """Construct with the appropriate subclass for this uuid/object."""
414
415     if not isinstance(process, dict):
416         uuid = process
417         process = None
418         arv = arvados.api('v1', model=OrderedJsonModel())
419
420     if '-dz642-' in uuid:
421         if process is None:
422             process = arv.containers().get(uuid=uuid).execute()
423         klass = ContainerTreeSummarizer
424     elif '-xvhdp-' in uuid:
425         if process is None:
426             process = arv.container_requests().get(uuid=uuid).execute()
427         klass = ContainerTreeSummarizer
428     elif '-8i9sb-' in uuid:
429         if process is None:
430             process = arv.jobs().get(uuid=uuid).execute()
431         klass = JobSummarizer
432     elif '-d1hrv-' in uuid:
433         if process is None:
434             process = arv.pipeline_instances().get(uuid=uuid).execute()
435         klass = PipelineSummarizer
436     elif '-4zz18-' in uuid:
437         return CollectionSummarizer(collection_id=uuid)
438     else:
439         raise ArgumentError("Unrecognized uuid %s", uuid)
440     return klass(process, uuid=uuid, **kwargs)
441
442
443 class ProcessSummarizer(Summarizer):
444     """Process is a job, pipeline, container, or container request."""
445
446     def __init__(self, process, label=None, **kwargs):
447         rdr = None
448         self.process = process
449         if label is None:
450             label = self.process.get('name', self.process['uuid'])
451         if self.process.get('log'):
452             try:
453                 rdr = crunchstat_summary.reader.CollectionReader(self.process['log'])
454             except arvados.errors.NotFoundError as e:
455                 logger.warning("Trying event logs after failing to read "
456                                "log collection %s: %s", self.process['log'], e)
457         if rdr is None:
458             rdr = crunchstat_summary.reader.LiveLogReader(self.process['uuid'])
459             label = label + ' (partial)'
460         super(ProcessSummarizer, self).__init__(rdr, label=label, **kwargs)
461         self.existing_constraints = self.process.get('runtime_constraints', {})
462
463
464 class JobSummarizer(ProcessSummarizer):
465     pass
466
467
468 class ContainerSummarizer(ProcessSummarizer):
469     pass
470
471
472 class MultiSummarizer(object):
473     def __init__(self, children={}, label=None, threads=1, **kwargs):
474         self.throttle = threading.Semaphore(threads)
475         self.children = children
476         self.label = label
477
478     def run_and_release(self, target, *args, **kwargs):
479         try:
480             return target(*args, **kwargs)
481         finally:
482             self.throttle.release()
483
484     def run(self):
485         threads = []
486         for child in self.children.itervalues():
487             self.throttle.acquire()
488             t = threading.Thread(target=self.run_and_release, args=(child.run, ))
489             t.daemon = True
490             t.start()
491             threads.append(t)
492         for t in threads:
493             t.join()
494
495     def text_report(self):
496         txt = ''
497         for cname, child in self.children.iteritems():
498             if len(self.children) > 1:
499                 txt += '### Summary for {} ({})\n'.format(
500                     cname, child.process['uuid'])
501             txt += child.text_report()
502             txt += '\n'
503         return txt
504
505     def html_report(self):
506         return WEBCHART_CLASS(self.label, self.children.itervalues()).html()
507
508
509 class PipelineSummarizer(MultiSummarizer):
510     def __init__(self, instance, **kwargs):
511         children = collections.OrderedDict()
512         for cname, component in instance['components'].iteritems():
513             if 'job' not in component:
514                 logger.warning(
515                     "%s: skipping component with no job assigned", cname)
516             else:
517                 logger.info(
518                     "%s: job %s", cname, component['job']['uuid'])
519                 summarizer = JobSummarizer(component['job'], **kwargs)
520                 summarizer.label = '{} {}'.format(
521                     cname, component['job']['uuid'])
522                 children[cname] = summarizer
523         super(PipelineSummarizer, self).__init__(
524             children=children,
525             label=instance['uuid'],
526             **kwargs)
527
528
529 class ContainerTreeSummarizer(MultiSummarizer):
530     def __init__(self, root, **kwargs):
531         arv = arvados.api('v1', model=OrderedJsonModel())
532
533         label = kwargs.pop('label', None) or root.get('name') or root['uuid']
534         root['name'] = label
535
536         children = collections.OrderedDict()
537         todo = collections.deque((root, ))
538         while len(todo) > 0:
539             current = todo.popleft()
540             label = current['name']
541             if current['uuid'].find('-xvhdp-') > 0:
542                 current = arv.containers().get(uuid=current['container_uuid']).execute()
543             children[current['uuid']] = ContainerSummarizer(
544                 current, label=label, **kwargs)
545             page_filters = []
546             while True:
547                 items = arv.container_requests().index(
548                     order=['uuid asc'],
549                     filters=page_filters+[
550                         ['requesting_container_uuid', '=', current['uuid']]],
551                 ).execute()['items']
552                 if not items:
553                     break
554                 page_filters = [['uuid', '>', items[-1]['uuid']]]
555                 for cr in items:
556                     if cr['container_uuid']:
557                         logger.debug('%s: container req %s', current['uuid'], cr['uuid'])
558                         cr['name'] = cr.get('name') or cr['uuid']
559                         todo.append(cr)
560         super(ContainerTreeSummarizer, self).__init__(
561             children=children,
562             label=root['name'],
563             **kwargs)