15486: Fix crunchstat-summary to work with Arvados v1.4+
[arvados.git] / tools / crunchstat-summary / crunchstat_summary / summarizer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 import arvados
6 import collections
7 import crunchstat_summary.dygraphs
8 import crunchstat_summary.reader
9 import datetime
10 import functools
11 import itertools
12 import math
13 import re
14 import sys
15 import threading
16 import _strptime
17
18 from arvados.api import OrderedJsonModel
19 from crunchstat_summary import logger
20
21 # Recommend memory constraints that are this multiple of an integral
22 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
23 # that have amounts like 7.5 GiB according to the kernel.)
24 AVAILABLE_RAM_RATIO = 0.95
25 MB=2**20
26
27 # Workaround datetime.datetime.strptime() thread-safety bug by calling
28 # it once before starting threads.  https://bugs.python.org/issue7980
29 datetime.datetime.strptime('1999-12-31_23:59:59', '%Y-%m-%d_%H:%M:%S')
30
31
32 WEBCHART_CLASS = crunchstat_summary.dygraphs.DygraphsChart
33
34
35 class Task(object):
36     def __init__(self):
37         self.starttime = None
38         self.finishtime = None
39         self.series = collections.defaultdict(list)
40
41
42 class Summarizer(object):
43     def __init__(self, logdata, label=None, skip_child_jobs=False, uuid=None, **kwargs):
44         self._logdata = logdata
45
46         self.uuid = uuid
47         self.label = label
48         self.starttime = None
49         self.finishtime = None
50         self._skip_child_jobs = skip_child_jobs
51
52         # stats_max: {category: {stat: val}}
53         self.stats_max = collections.defaultdict(
54             functools.partial(collections.defaultdict, lambda: 0))
55         # task_stats: {task_id: {category: {stat: val}}}
56         self.task_stats = collections.defaultdict(
57             functools.partial(collections.defaultdict, dict))
58
59         self.seq_to_uuid = {}
60         self.tasks = collections.defaultdict(Task)
61
62         # We won't bother recommending new runtime constraints if the
63         # constraints given when running the job are known to us and
64         # are already suitable.  If applicable, the subclass
65         # constructor will overwrite this with something useful.
66         self.existing_constraints = {}
67
68         logger.debug("%s: logdata %s", self.label, logdata)
69
70     def run(self):
71         logger.debug("%s: parsing logdata %s", self.label, self._logdata)
72         with self._logdata as logdata:
73             self._run(logdata)
74
75     def _run(self, logdata):
76         self.detected_crunch1 = False
77         for line in logdata:
78             if not self.detected_crunch1 and '-8i9sb-' in line:
79                 self.detected_crunch1 = True
80
81             if self.detected_crunch1:
82                 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
83                 if m:
84                     seq = int(m.group('seq'))
85                     uuid = m.group('task_uuid')
86                     self.seq_to_uuid[seq] = uuid
87                     logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
88                     continue
89
90                 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
91                 if m:
92                     task_id = self.seq_to_uuid[int(m.group('seq'))]
93                     elapsed = int(m.group('elapsed'))
94                     self.task_stats[task_id]['time'] = {'elapsed': elapsed}
95                     if elapsed > self.stats_max['time']['elapsed']:
96                         self.stats_max['time']['elapsed'] = elapsed
97                     continue
98
99                 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
100                 if m:
101                     uuid = m.group('uuid')
102                     if self._skip_child_jobs:
103                         logger.warning('%s: omitting stats from child job %s'
104                                        ' because --skip-child-jobs flag is on',
105                                        self.label, uuid)
106                         continue
107                     logger.debug('%s: follow %s', self.label, uuid)
108                     child_summarizer = ProcessSummarizer(uuid)
109                     child_summarizer.stats_max = self.stats_max
110                     child_summarizer.task_stats = self.task_stats
111                     child_summarizer.tasks = self.tasks
112                     child_summarizer.starttime = self.starttime
113                     child_summarizer.run()
114                     logger.debug('%s: done %s', self.label, uuid)
115                     continue
116
117                 # 2017-12-02_17:15:08 e51c5-8i9sb-mfp68stkxnqdd6m 63676 0 stderr crunchstat: keepcalls 0 put 2576 get -- interval 10.0000 seconds 0 put 2576 get
118                 m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr (?P<crunchstat>crunchstat: )(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
119                 if not m:
120                     continue
121             else:
122                 # crunch2
123                 # 2017-12-01T16:56:24.723509200Z crunchstat: keepcalls 0 put 3 get -- interval 10.0000 seconds 0 put 3 get
124                 m = re.search(r'^(?P<timestamp>\S+) (?P<crunchstat>crunchstat: )?(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
125                 if not m:
126                     continue
127
128             if self.label is None:
129                 try:
130                     self.label = m.group('job_uuid')
131                 except IndexError:
132                     self.label = 'label #1'
133             category = m.group('category')
134             if category.endswith(':'):
135                 # "stderr crunchstat: notice: ..."
136                 continue
137             elif category in ('error', 'caught'):
138                 continue
139             elif category in ('read', 'open', 'cgroup', 'CID', 'Running'):
140                 # "stderr crunchstat: read /proc/1234/net/dev: ..."
141                 # (old logs are less careful with unprefixed error messages)
142                 continue
143
144             if self.detected_crunch1:
145                 task_id = self.seq_to_uuid[int(m.group('seq'))]
146             else:
147                 task_id = 'container'
148             task = self.tasks[task_id]
149
150             # Use the first and last crunchstat timestamps as
151             # approximations of starttime and finishtime.
152             timestamp = m.group('timestamp')
153             if timestamp[10:11] == '_':
154                 timestamp = datetime.datetime.strptime(
155                     timestamp, '%Y-%m-%d_%H:%M:%S')
156             elif timestamp[10:11] == 'T':
157                 timestamp = datetime.datetime.strptime(
158                     timestamp[:19], '%Y-%m-%dT%H:%M:%S')
159             else:
160                 raise ValueError("Cannot parse timestamp {!r}".format(
161                     timestamp))
162
163             if task.starttime is None:
164                 logger.debug('%s: task %s starttime %s',
165                              self.label, task_id, timestamp)
166             if task.starttime is None or timestamp < task.starttime:
167                 task.starttime = timestamp
168             if task.finishtime is None or timestamp > task.finishtime:
169                 task.finishtime = timestamp
170
171             if self.starttime is None or timestamp < self.starttime:
172                 self.starttime = timestamp
173             if self.finishtime is None or timestamp > self.finishtime:
174                 self.finishtime = timestamp
175
176             if (not self.detected_crunch1) and task.starttime is not None and task.finishtime is not None:
177                 elapsed = (task.finishtime - task.starttime).seconds
178                 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
179                 if elapsed > self.stats_max['time']['elapsed']:
180                     self.stats_max['time']['elapsed'] = elapsed
181
182             this_interval_s = None
183             for group in ['current', 'interval']:
184                 if not m.group(group):
185                     continue
186                 category = m.group('category')
187                 words = m.group(group).split(' ')
188                 stats = {}
189                 try:
190                     for val, stat in zip(words[::2], words[1::2]):
191                         if '.' in val:
192                             stats[stat] = float(val)
193                         else:
194                             stats[stat] = int(val)
195                 except ValueError as e:
196                     # If the line doesn't start with 'crunchstat:' we
197                     # might have mistaken an error message for a
198                     # structured crunchstat line.
199                     if m.group("crunchstat") is None or m.group("category") == "crunchstat":
200                         logger.warning("%s: log contains message\n  %s", self.label, line)
201                     else:
202                         logger.warning(
203                             '%s: Error parsing value %r (stat %r, category %r): %r',
204                             self.label, val, stat, category, e)
205                         logger.warning('%s', line)
206                     continue
207                 if 'user' in stats or 'sys' in stats:
208                     stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
209                 if 'tx' in stats or 'rx' in stats:
210                     stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
211                 if group == 'interval':
212                     if 'seconds' in stats:
213                         this_interval_s = stats.get('seconds',0)
214                         del stats['seconds']
215                         if this_interval_s <= 0:
216                             logger.error(
217                                 "BUG? interval stat given with duration {!r}".
218                                 format(this_interval_s))
219                     else:
220                         logger.error('BUG? interval stat missing duration')
221                 for stat, val in stats.items():
222                     if group == 'interval' and this_interval_s:
223                             stat = stat + '__rate'
224                             val = val / this_interval_s
225                             if stat in ['user+sys__rate', 'user__rate', 'sys__rate', 'tx+rx__rate', 'rx__rate', 'tx__rate']:
226                                 task.series[category, stat].append(
227                                     (timestamp - self.starttime, val))
228                     else:
229                         if stat in ['rss','used','total']:
230                             task.series[category, stat].append(
231                                 (timestamp - self.starttime, val))
232                         self.task_stats[task_id][category][stat] = val
233                     if val > self.stats_max[category][stat]:
234                         self.stats_max[category][stat] = val
235         logger.debug('%s: done parsing', self.label)
236
237         self.job_tot = collections.defaultdict(
238             functools.partial(collections.defaultdict, int))
239         for task_id, task_stat in self.task_stats.items():
240             for category, stat_last in task_stat.items():
241                 for stat, val in stat_last.items():
242                     if stat in ['cpus', 'cache', 'swap', 'rss']:
243                         # meaningless stats like 16 cpu cores x 5 tasks = 80
244                         continue
245                     self.job_tot[category][stat] += val
246         logger.debug('%s: done totals', self.label)
247
248     def long_label(self):
249         label = self.label
250         if hasattr(self, 'process') and self.process['uuid'] not in label:
251             label = '{} ({})'.format(label, self.process['uuid'])
252         if self.finishtime:
253             label += ' -- elapsed time '
254             s = (self.finishtime - self.starttime).total_seconds()
255             if s > 86400:
256                 label += '{}d'.format(int(s/86400))
257             if s > 3600:
258                 label += '{}h'.format(int(s/3600) % 24)
259             if s > 60:
260                 label += '{}m'.format(int(s/60) % 60)
261             label += '{}s'.format(int(s) % 60)
262         return label
263
264     def text_report(self):
265         if not self.tasks:
266             return "(no report generated)\n"
267         return "\n".join(itertools.chain(
268             self._text_report_gen(),
269             self._recommend_gen())) + "\n"
270
271     def html_report(self):
272         return WEBCHART_CLASS(self.label, [self]).html()
273
274     def _text_report_gen(self):
275         yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
276         for category, stat_max in sorted(self.stats_max.items()):
277             for stat, val in sorted(stat_max.items()):
278                 if stat.endswith('__rate'):
279                     continue
280                 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
281                 val = self._format(val)
282                 tot = self._format(self.job_tot[category].get(stat, '-'))
283                 yield "\t".join([category, stat, str(val), max_rate, tot])
284         for args in (
285                 ('Number of tasks: {}',
286                  len(self.tasks),
287                  None),
288                 ('Max CPU time spent by a single task: {}s',
289                  self.stats_max['cpu']['user+sys'],
290                  None),
291                 ('Max CPU usage in a single interval: {}%',
292                  self.stats_max['cpu']['user+sys__rate'],
293                  lambda x: x * 100),
294                 ('Overall CPU usage: {}%',
295                  float(self.job_tot['cpu']['user+sys']) /
296                  self.job_tot['time']['elapsed']
297                  if self.job_tot['time']['elapsed'] > 0 else 0,
298                  lambda x: x * 100),
299                 ('Max memory used by a single task: {}GB',
300                  self.stats_max['mem']['rss'],
301                  lambda x: x / 1e9),
302                 ('Max network traffic in a single task: {}GB',
303                  self.stats_max['net:eth0']['tx+rx'] +
304                  self.stats_max['net:keep0']['tx+rx'],
305                  lambda x: x / 1e9),
306                 ('Max network speed in a single interval: {}MB/s',
307                  self.stats_max['net:eth0']['tx+rx__rate'] +
308                  self.stats_max['net:keep0']['tx+rx__rate'],
309                  lambda x: x / 1e6),
310                 ('Keep cache miss rate {}%',
311                  (float(self.job_tot['keepcache']['miss']) /
312                  float(self.job_tot['keepcalls']['get']))
313                  if self.job_tot['keepcalls']['get'] > 0 else 0,
314                  lambda x: x * 100.0),
315                 ('Keep cache utilization {}%',
316                  (float(self.job_tot['blkio:0:0']['read']) /
317                  float(self.job_tot['net:keep0']['rx']))
318                  if self.job_tot['net:keep0']['rx'] > 0 else 0,
319                  lambda x: x * 100.0),
320                ('Temp disk utilization {}%',
321                  (float(self.job_tot['statfs']['used']) /
322                  float(self.job_tot['statfs']['total']))
323                  if self.job_tot['statfs']['total'] > 0 else 0,
324                  lambda x: x * 100.0),
325                 ):
326             format_string, val, transform = args
327             if val == float('-Inf'):
328                 continue
329             if transform:
330                 val = transform(val)
331             yield "# "+format_string.format(self._format(val))
332
333     def _recommend_gen(self):
334         # TODO recommend fixing job granularity if elapsed time is too short
335         return itertools.chain(
336             self._recommend_cpu(),
337             self._recommend_ram(),
338             self._recommend_keep_cache(),
339             self._recommend_temp_disk(),
340             )
341
342     def _recommend_cpu(self):
343         """Recommend asking for 4 cores if max CPU usage was 333%"""
344
345         constraint_key = self._map_runtime_constraint('vcpus')
346         cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
347         if cpu_max_rate == float('-Inf') or cpu_max_rate == 0.0:
348             logger.warning('%s: no CPU usage data', self.label)
349             return
350         # TODO Don't necessarily want to recommend on isolated max peak
351         # take average CPU usage into account as well or % time at max
352         used_cores = max(1, int(math.ceil(cpu_max_rate)))
353         asked_cores = self.existing_constraints.get(constraint_key)
354         if asked_cores is None:
355             asked_cores = 1
356         # TODO: This should be more nuanced in cases where max >> avg
357         if used_cores < asked_cores:
358             yield (
359                 '#!! {} max CPU usage was {}% -- '
360                 'try reducing runtime_constraints to "{}":{}'
361             ).format(
362                 self.label,
363                 math.ceil(cpu_max_rate*100),
364                 constraint_key,
365                 int(used_cores))
366
367     # FIXME: This needs to be updated to account for current nodemanager algorithms
368     def _recommend_ram(self):
369         """Recommend an economical RAM constraint for this job.
370
371         Nodes that are advertised as "8 gibibytes" actually have what
372         we might call "8 nearlygibs" of memory available for jobs.
373         Here, we calculate a whole number of nearlygibs that would
374         have sufficed to run the job, then recommend requesting a node
375         with that number of nearlygibs (expressed as mebibytes).
376
377         Requesting a node with "nearly 8 gibibytes" is our best hope
378         of getting a node that actually has nearly 8 gibibytes
379         available.  If the node manager is smart enough to account for
380         the discrepancy itself when choosing/creating a node, we'll
381         get an 8 GiB node with nearly 8 GiB available.  Otherwise, the
382         advertised size of the next-size-smaller node (say, 6 GiB)
383         will be too low to satisfy our request, so we will effectively
384         get rounded up to 8 GiB.
385
386         For example, if we need 7500 MiB, we can ask for 7500 MiB, and
387         we will generally get a node that is advertised as "8 GiB" and
388         has at least 7500 MiB available.  However, asking for 8192 MiB
389         would either result in an unnecessarily expensive 12 GiB node
390         (if node manager knows about the discrepancy), or an 8 GiB
391         node which has less than 8192 MiB available and is therefore
392         considered by crunch-dispatch to be too small to meet our
393         constraint.
394
395         When node manager learns how to predict the available memory
396         for each node type such that crunch-dispatch always agrees
397         that a node is big enough to run the job it was brought up
398         for, all this will be unnecessary.  We'll just ask for exactly
399         the memory we want -- even if that happens to be 8192 MiB.
400         """
401
402         constraint_key = self._map_runtime_constraint('ram')
403         used_bytes = self.stats_max['mem']['rss']
404         if used_bytes == float('-Inf'):
405             logger.warning('%s: no memory usage data', self.label)
406             return
407         used_mib = math.ceil(float(used_bytes) / MB)
408         asked_mib = self.existing_constraints.get(constraint_key)
409
410         nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
411         if used_mib > 0 and (asked_mib is None or (
412                 math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib))):
413             yield (
414                 '#!! {} max RSS was {} MiB -- '
415                 'try reducing runtime_constraints to "{}":{}'
416             ).format(
417                 self.label,
418                 int(used_mib),
419                 constraint_key,
420                 int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024*(MB)/self._runtime_constraint_mem_unit()))
421
422     def _recommend_keep_cache(self):
423         """Recommend increasing keep cache if utilization < 80%"""
424         constraint_key = self._map_runtime_constraint('keep_cache_ram')
425         if self.job_tot['net:keep0']['rx'] == 0:
426             return
427         utilization = (float(self.job_tot['blkio:0:0']['read']) /
428                        float(self.job_tot['net:keep0']['rx']))
429         # FIXME: the default on this get won't work correctly
430         asked_cache = self.existing_constraints.get(constraint_key, 256) * self._runtime_constraint_mem_unit()
431
432         if utilization < 0.8:
433             yield (
434                 '#!! {} Keep cache utilization was {:.2f}% -- '
435                 'try doubling runtime_constraints to "{}":{} (or more)'
436             ).format(
437                 self.label,
438                 utilization * 100.0,
439                 constraint_key,
440                 math.ceil(asked_cache * 2 / self._runtime_constraint_mem_unit()))
441
442
443     def _recommend_temp_disk(self):
444         """Recommend decreasing temp disk if utilization < 50%"""
445         total = float(self.job_tot['statfs']['total'])
446         utilization = (float(self.job_tot['statfs']['used']) / total) if total > 0 else 0.0
447
448         if utilization < 50.8 and total > 0:
449             yield (
450                 '#!! {} max temp disk utilization was {:.0f}% of {:.0f} MiB -- '
451                 'consider reducing "tmpdirMin" and/or "outdirMin"'
452             ).format(
453                 self.label,
454                 utilization * 100.0,
455                 total / MB)
456
457
458     def _format(self, val):
459         """Return a string representation of a stat.
460
461         {:.2f} for floats, default format for everything else."""
462         if isinstance(val, float):
463             return '{:.2f}'.format(val)
464         else:
465             return '{}'.format(val)
466
467     def _runtime_constraint_mem_unit(self):
468         if hasattr(self, 'runtime_constraint_mem_unit'):
469             return self.runtime_constraint_mem_unit
470         elif self.detected_crunch1:
471             return JobSummarizer.runtime_constraint_mem_unit
472         else:
473             return ContainerRequestSummarizer.runtime_constraint_mem_unit
474
475     def _map_runtime_constraint(self, key):
476         if hasattr(self, 'map_runtime_constraint'):
477             return self.map_runtime_constraint[key]
478         elif self.detected_crunch1:
479             return JobSummarizer.map_runtime_constraint[key]
480         else:
481             return key
482
483
484 class CollectionSummarizer(Summarizer):
485     def __init__(self, collection_id, **kwargs):
486         super(CollectionSummarizer, self).__init__(
487             crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
488         self.label = collection_id
489
490
491 def NewSummarizer(process_or_uuid, **kwargs):
492     """Construct with the appropriate subclass for this uuid/object."""
493
494     if isinstance(process_or_uuid, dict):
495         process = process_or_uuid
496         uuid = process['uuid']
497     else:
498         uuid = process_or_uuid
499         process = None
500         arv = arvados.api('v1', model=OrderedJsonModel())
501
502     if '-dz642-' in uuid:
503         if process is None:
504             # Get the associated CR. Doesn't matter which since they all have the same logs
505             crs = arv.container_requests().list(filters=[['container_uuid','=',uuid]],limit=1).execute()['items']
506             if len(crs) > 0:
507                 process = crs[0]
508         klass = ContainerRequestTreeSummarizer
509     elif '-xvhdp-' in uuid:
510         if process is None:
511             process = arv.container_requests().get(uuid=uuid).execute()
512         klass = ContainerRequestTreeSummarizer
513     elif '-8i9sb-' in uuid:
514         if process is None:
515             process = arv.jobs().get(uuid=uuid).execute()
516         klass = JobTreeSummarizer
517     elif '-d1hrv-' in uuid:
518         if process is None:
519             process = arv.pipeline_instances().get(uuid=uuid).execute()
520         klass = PipelineSummarizer
521     elif '-4zz18-' in uuid:
522         return CollectionSummarizer(collection_id=uuid)
523     else:
524         raise ArgumentError("Unrecognized uuid %s", uuid)
525     return klass(process, uuid=uuid, **kwargs)
526
527
528 class ProcessSummarizer(Summarizer):
529     """Process is a job, pipeline, container, or container request."""
530
531     def __init__(self, process, label=None, **kwargs):
532         rdr = None
533         self.process = process
534         if label is None:
535             label = self.process.get('name', self.process['uuid'])
536         # Pre-Arvados v1.4 everything is in 'log'
537         # For 1.4+ containers have no logs and container_requests have them in 'log_uuid', not 'log'
538         log_collection = self.process.get('log')
539         if not log_collection:
540             log_collection = self.process.get('log_uuid')
541         if log_collection:
542             try:
543                 rdr = crunchstat_summary.reader.CollectionReader(log_collection)
544             except arvados.errors.NotFoundError as e:
545                 logger.warning("Trying event logs after failing to read "
546                                "log collection %s: %s", self.process['log'], e)
547         if rdr is None:
548             rdr = crunchstat_summary.reader.LiveLogReader(self.process['uuid'])
549             label = label + ' (partial)'
550         super(ProcessSummarizer, self).__init__(rdr, label=label, **kwargs)
551         self.existing_constraints = self.process.get('runtime_constraints', {})
552
553
554 class JobSummarizer(ProcessSummarizer):
555     runtime_constraint_mem_unit = MB
556     map_runtime_constraint = {
557         'keep_cache_ram': 'keep_cache_mb_per_task',
558         'ram': 'min_ram_mb_per_node',
559         'vcpus': 'min_cores_per_node',
560     }
561
562
563 class ContainerRequestSummarizer(ProcessSummarizer):
564     runtime_constraint_mem_unit = 1
565
566
567 class MultiSummarizer(object):
568     def __init__(self, children={}, label=None, threads=1, **kwargs):
569         self.throttle = threading.Semaphore(threads)
570         self.children = children
571         self.label = label
572
573     def run_and_release(self, target, *args, **kwargs):
574         try:
575             return target(*args, **kwargs)
576         finally:
577             self.throttle.release()
578
579     def run(self):
580         threads = []
581         for child in self.children.values():
582             self.throttle.acquire()
583             t = threading.Thread(target=self.run_and_release, args=(child.run, ))
584             t.daemon = True
585             t.start()
586             threads.append(t)
587         for t in threads:
588             t.join()
589
590     def text_report(self):
591         txt = ''
592         d = self._descendants()
593         for child in d.values():
594             if len(d) > 1:
595                 txt += '### Summary for {} ({})\n'.format(
596                     child.label, child.process['uuid'])
597             txt += child.text_report()
598             txt += '\n'
599         return txt
600
601     def _descendants(self):
602         """Dict of self and all descendants.
603
604         Nodes with nothing of their own to report (like
605         MultiSummarizers) are omitted.
606         """
607         d = collections.OrderedDict()
608         for key, child in self.children.items():
609             if isinstance(child, Summarizer):
610                 d[key] = child
611             if isinstance(child, MultiSummarizer):
612                 d.update(child._descendants())
613         return d
614
615     def html_report(self):
616         return WEBCHART_CLASS(self.label, iter(self._descendants().values())).html()
617
618
619 class JobTreeSummarizer(MultiSummarizer):
620     """Summarizes a job and all children listed in its components field."""
621     def __init__(self, job, label=None, **kwargs):
622         arv = arvados.api('v1', model=OrderedJsonModel())
623         label = label or job.get('name', job['uuid'])
624         children = collections.OrderedDict()
625         children[job['uuid']] = JobSummarizer(job, label=label, **kwargs)
626         if job.get('components', None):
627             preloaded = {}
628             for j in arv.jobs().index(
629                     limit=len(job['components']),
630                     filters=[['uuid','in',list(job['components'].values())]]).execute()['items']:
631                 preloaded[j['uuid']] = j
632             for cname in sorted(job['components'].keys()):
633                 child_uuid = job['components'][cname]
634                 j = (preloaded.get(child_uuid) or
635                      arv.jobs().get(uuid=child_uuid).execute())
636                 children[child_uuid] = JobTreeSummarizer(job=j, label=cname, **kwargs)
637
638         super(JobTreeSummarizer, self).__init__(
639             children=children,
640             label=label,
641             **kwargs)
642
643
644 class PipelineSummarizer(MultiSummarizer):
645     def __init__(self, instance, **kwargs):
646         children = collections.OrderedDict()
647         for cname, component in instance['components'].items():
648             if 'job' not in component:
649                 logger.warning(
650                     "%s: skipping component with no job assigned", cname)
651             else:
652                 logger.info(
653                     "%s: job %s", cname, component['job']['uuid'])
654                 summarizer = JobTreeSummarizer(component['job'], label=cname, **kwargs)
655                 summarizer.label = '{} {}'.format(
656                     cname, component['job']['uuid'])
657                 children[cname] = summarizer
658         super(PipelineSummarizer, self).__init__(
659             children=children,
660             label=instance['uuid'],
661             **kwargs)
662
663
664 class ContainerRequestTreeSummarizer(MultiSummarizer):
665     def __init__(self, root, skip_child_jobs=False, **kwargs):
666         arv = arvados.api('v1', model=OrderedJsonModel())
667
668         label = kwargs.pop('label', None) or root.get('name') or root['uuid']
669         root['name'] = label
670
671         children = collections.OrderedDict()
672         todo = collections.deque((root, ))
673         while len(todo) > 0:
674             current = todo.popleft()
675             label = current['name']
676             sort_key = current['created_at']
677
678             summer = ContainerRequestSummarizer(current, label=label, **kwargs)
679             summer.sort_key = sort_key
680             children[current['uuid']] = summer
681
682             page_filters = []
683             while True:
684                 child_crs = arv.container_requests().index(
685                     order=['uuid asc'],
686                     filters=page_filters+[
687                         ['requesting_container_uuid', '=', current['container_uuid']]],
688                 ).execute()
689                 if not child_crs['items']:
690                     break
691                 elif skip_child_jobs:
692                     logger.warning('%s: omitting stats from %d child containers'
693                                    ' because --skip-child-jobs flag is on',
694                                    label, child_crs['items_available'])
695                     break
696                 page_filters = [['uuid', '>', child_crs['items'][-1]['uuid']]]
697                 for cr in child_crs['items']:
698                     if cr['container_uuid']:
699                         logger.debug('%s: container req %s', current['uuid'], cr['uuid'])
700                         cr['name'] = cr.get('name') or cr['uuid']
701                         todo.append(cr)
702         sorted_children = collections.OrderedDict()
703         for uuid in sorted(list(children.keys()), key=lambda uuid: children[uuid].sort_key):
704             sorted_children[uuid] = children[uuid]
705         super(ContainerRequestTreeSummarizer, self).__init__(
706             children=sorted_children,
707             label=root['name'],
708             **kwargs)