b2f6f1bb700b6d5d2a04f0212c699eb1ace15435
[arvados.git] / tools / crunchstat-summary / crunchstat_summary / summarizer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 from __future__ import print_function
6
7 import arvados
8 import collections
9 import crunchstat_summary.dygraphs
10 import crunchstat_summary.reader
11 import datetime
12 import functools
13 import itertools
14 import math
15 import re
16 import sys
17 import threading
18 import _strptime
19
20 from arvados.api import OrderedJsonModel
21 from crunchstat_summary import logger
22
23 # Recommend memory constraints that are this multiple of an integral
24 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
25 # that have amounts like 7.5 GiB according to the kernel.)
26 AVAILABLE_RAM_RATIO = 0.95
27
28
29 # Workaround datetime.datetime.strptime() thread-safety bug by calling
30 # it once before starting threads.  https://bugs.python.org/issue7980
31 datetime.datetime.strptime('1999-12-31_23:59:59', '%Y-%m-%d_%H:%M:%S')
32
33
34 WEBCHART_CLASS = crunchstat_summary.dygraphs.DygraphsChart
35
36
37 class Task(object):
38     def __init__(self):
39         self.starttime = None
40         self.finishtime = None
41         self.series = collections.defaultdict(list)
42
43
44 class Summarizer(object):
45     def __init__(self, logdata, label=None, skip_child_jobs=False, uuid=None, **kwargs):
46         self._logdata = logdata
47
48         self.uuid = uuid
49         self.label = label
50         self.starttime = None
51         self.finishtime = None
52         self._skip_child_jobs = skip_child_jobs
53
54         # stats_max: {category: {stat: val}}
55         self.stats_max = collections.defaultdict(
56             functools.partial(collections.defaultdict, lambda: 0))
57         # task_stats: {task_id: {category: {stat: val}}}
58         self.task_stats = collections.defaultdict(
59             functools.partial(collections.defaultdict, dict))
60
61         self.seq_to_uuid = {}
62         self.tasks = collections.defaultdict(Task)
63
64         # We won't bother recommending new runtime constraints if the
65         # constraints given when running the job are known to us and
66         # are already suitable.  If applicable, the subclass
67         # constructor will overwrite this with something useful.
68         self.existing_constraints = {}
69
70         logger.debug("%s: logdata %s", self.label, logdata)
71
72     def run(self):
73         logger.debug("%s: parsing logdata %s", self.label, self._logdata)
74         with self._logdata as logdata:
75             self._run(logdata)
76
77     def _run(self, logdata):
78         self.detected_crunch1 = False
79         for line in logdata:
80             if not self.detected_crunch1 and '-8i9sb-' in line:
81                 self.detected_crunch1 = True
82
83             if self.detected_crunch1:
84                 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
85                 if m:
86                     seq = int(m.group('seq'))
87                     uuid = m.group('task_uuid')
88                     self.seq_to_uuid[seq] = uuid
89                     logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
90                     continue
91
92                 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
93                 if m:
94                     task_id = self.seq_to_uuid[int(m.group('seq'))]
95                     elapsed = int(m.group('elapsed'))
96                     self.task_stats[task_id]['time'] = {'elapsed': elapsed}
97                     if elapsed > self.stats_max['time']['elapsed']:
98                         self.stats_max['time']['elapsed'] = elapsed
99                     continue
100
101                 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
102                 if m:
103                     uuid = m.group('uuid')
104                     if self._skip_child_jobs:
105                         logger.warning('%s: omitting stats from child job %s'
106                                        ' because --skip-child-jobs flag is on',
107                                        self.label, uuid)
108                         continue
109                     logger.debug('%s: follow %s', self.label, uuid)
110                     child_summarizer = ProcessSummarizer(uuid)
111                     child_summarizer.stats_max = self.stats_max
112                     child_summarizer.task_stats = self.task_stats
113                     child_summarizer.tasks = self.tasks
114                     child_summarizer.starttime = self.starttime
115                     child_summarizer.run()
116                     logger.debug('%s: done %s', self.label, uuid)
117                     continue
118
119                 # 2017-12-02_17:15:08 e51c5-8i9sb-mfp68stkxnqdd6m 63676 0 stderr crunchstat: keepcalls 0 put 2576 get -- interval 10.0000 seconds 0 put 2576 get
120                 m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr (?P<crunchstat>crunchstat: )(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
121                 if not m:
122                     continue
123             else:
124                 # crunch2
125                 # 2017-12-01T16:56:24.723509200Z crunchstat: keepcalls 0 put 3 get -- interval 10.0000 seconds 0 put 3 get
126                 m = re.search(r'^(?P<timestamp>\S+) (?P<crunchstat>crunchstat: )?(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
127                 if not m:
128                     continue
129
130             if self.label is None:
131                 try:
132                     self.label = m.group('job_uuid')
133                 except IndexError:
134                     self.label = 'container'
135             if m.group('category').endswith(':'):
136                 # "stderr crunchstat: notice: ..."
137                 continue
138             elif m.group('category') in ('error', 'caught'):
139                 continue
140             elif m.group('category') in ('read', 'open', 'cgroup', 'CID', 'Running'):
141                 # "stderr crunchstat: read /proc/1234/net/dev: ..."
142                 # (old logs are less careful with unprefixed error messages)
143                 continue
144
145             if self.detected_crunch1:
146                 task_id = self.seq_to_uuid[int(m.group('seq'))]
147             else:
148                 task_id = 'container'
149             task = self.tasks[task_id]
150
151             # Use the first and last crunchstat timestamps as
152             # approximations of starttime and finishtime.
153             timestamp = m.group('timestamp')
154             if timestamp[10:11] == '_':
155                 timestamp = datetime.datetime.strptime(
156                     timestamp, '%Y-%m-%d_%H:%M:%S')
157             elif timestamp[10:11] == 'T':
158                 timestamp = datetime.datetime.strptime(
159                     timestamp[:19], '%Y-%m-%dT%H:%M:%S')
160             else:
161                 raise ValueError("Cannot parse timestamp {!r}".format(
162                     timestamp))
163
164             if task.starttime is None:
165                 logger.debug('%s: task %s starttime %s',
166                              self.label, task_id, timestamp)
167             if task.starttime is None or timestamp < task.starttime:
168                 task.starttime = timestamp
169             if task.finishtime is None or timestamp > task.finishtime:
170                 task.finishtime = timestamp
171
172             if self.starttime is None or timestamp < task.starttime:
173                 self.starttime = timestamp
174             if self.finishtime is None or timestamp < task.finishtime:
175                 self.finishtime = timestamp
176
177             if (not self.detected_crunch1) and task.starttime is not None and task.finishtime is not None:
178                 elapsed = (task.finishtime - task.starttime).seconds
179                 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
180                 if elapsed > self.stats_max['time']['elapsed']:
181                     self.stats_max['time']['elapsed'] = elapsed
182
183             this_interval_s = None
184             for group in ['current', 'interval']:
185                 if not m.group(group):
186                     continue
187                 category = m.group('category')
188                 words = m.group(group).split(' ')
189                 stats = {}
190                 try:
191                     for val, stat in zip(words[::2], words[1::2]):
192                         if '.' in val:
193                             stats[stat] = float(val)
194                         else:
195                             stats[stat] = int(val)
196                 except ValueError as e:
197                     # If the line doesn't start with 'crunchstat:' we
198                     # might have mistaken an error message for a
199                     # structured crunchstat line.
200                     if m.group("crunchstat") is None or m.group("category") == "crunchstat":
201                         logger.warning("%s: log contains message\n  %s", self.label, line)
202                     else:
203                         logger.warning(
204                             '%s: Error parsing value %r (stat %r, category %r): %r',
205                             self.label, val, stat, category, e)
206                         logger.warning('%s', line)
207                     continue
208                 if 'user' in stats or 'sys' in stats:
209                     stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
210                 if 'tx' in stats or 'rx' in stats:
211                     stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
212                 for stat, val in stats.iteritems():
213                     if group == 'interval':
214                         if stat == 'seconds':
215                             this_interval_s = val
216                             continue
217                         elif not (this_interval_s > 0):
218                             logger.error(
219                                 "BUG? interval stat given with duration {!r}".
220                                 format(this_interval_s))
221                             continue
222                         else:
223                             stat = stat + '__rate'
224                             val = val / this_interval_s
225                             if stat in ['user+sys__rate', 'tx+rx__rate']:
226                                 task.series[category, stat].append(
227                                     (timestamp - self.starttime, val))
228                     else:
229                         if stat in ['rss']:
230                             task.series[category, stat].append(
231                                 (timestamp - self.starttime, val))
232                         self.task_stats[task_id][category][stat] = val
233                     if val > self.stats_max[category][stat]:
234                         self.stats_max[category][stat] = val
235         logger.debug('%s: done parsing', self.label)
236
237         self.job_tot = collections.defaultdict(
238             functools.partial(collections.defaultdict, int))
239         for task_id, task_stat in self.task_stats.iteritems():
240             for category, stat_last in task_stat.iteritems():
241                 for stat, val in stat_last.iteritems():
242                     if stat in ['cpus', 'cache', 'swap', 'rss']:
243                         # meaningless stats like 16 cpu cores x 5 tasks = 80
244                         continue
245                     self.job_tot[category][stat] += val
246         logger.debug('%s: done totals', self.label)
247
248     def long_label(self):
249         label = self.label
250         if hasattr(self, 'process') and self.process['uuid'] not in label:
251             label = '{} ({})'.format(label, self.process['uuid'])
252         if self.finishtime:
253             label += ' -- elapsed time '
254             s = (self.finishtime - self.starttime).total_seconds()
255             if s > 86400:
256                 label += '{}d'.format(int(s/86400))
257             if s > 3600:
258                 label += '{}h'.format(int(s/3600) % 24)
259             if s > 60:
260                 label += '{}m'.format(int(s/60) % 60)
261             label += '{}s'.format(int(s) % 60)
262         return label
263
264     def text_report(self):
265         if not self.tasks:
266             return "(no report generated)\n"
267         return "\n".join(itertools.chain(
268             self._text_report_gen(),
269             self._recommend_gen())) + "\n"
270
271     def html_report(self):
272         return WEBCHART_CLASS(self.label, [self]).html()
273
274     def _text_report_gen(self):
275         yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
276         for category, stat_max in sorted(self.stats_max.iteritems()):
277             for stat, val in sorted(stat_max.iteritems()):
278                 if stat.endswith('__rate'):
279                     continue
280                 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
281                 val = self._format(val)
282                 tot = self._format(self.job_tot[category].get(stat, '-'))
283                 yield "\t".join([category, stat, str(val), max_rate, tot])
284         for args in (
285                 ('Number of tasks: {}',
286                  len(self.tasks),
287                  None),
288                 ('Max CPU time spent by a single task: {}s',
289                  self.stats_max['cpu']['user+sys'],
290                  None),
291                 ('Max CPU usage in a single interval: {}%',
292                  self.stats_max['cpu']['user+sys__rate'],
293                  lambda x: x * 100),
294                 ('Overall CPU usage: {}%',
295                  self.job_tot['cpu']['user+sys'] /
296                  self.job_tot['time']['elapsed']
297                  if self.job_tot['time']['elapsed'] > 0 else 0,
298                  lambda x: x * 100),
299                 ('Max memory used by a single task: {}GB',
300                  self.stats_max['mem']['rss'],
301                  lambda x: x / 1e9),
302                 ('Max network traffic in a single task: {}GB',
303                  self.stats_max['net:eth0']['tx+rx'] +
304                  self.stats_max['net:keep0']['tx+rx'],
305                  lambda x: x / 1e9),
306                 ('Max network speed in a single interval: {}MB/s',
307                  self.stats_max['net:eth0']['tx+rx__rate'] +
308                  self.stats_max['net:keep0']['tx+rx__rate'],
309                  lambda x: x / 1e6),
310                 ('Keep cache miss rate {}%',
311                  (float(self.job_tot['keepcache']['miss']) /
312                  float(self.job_tot['keepcalls']['get']))
313                  if self.job_tot['keepcalls']['get'] > 0 else 0,
314                  lambda x: x * 100.0),
315                 ('Keep cache utilization {}%',
316                  (float(self.job_tot['blkio:0:0']['read']) /
317                  float(self.job_tot['net:keep0']['rx']))
318                  if self.job_tot['net:keep0']['rx'] > 0 else 0,
319                  lambda x: x * 100.0)):
320             format_string, val, transform = args
321             if val == float('-Inf'):
322                 continue
323             if transform:
324                 val = transform(val)
325             yield "# "+format_string.format(self._format(val))
326
327     def _recommend_gen(self):
328         return itertools.chain(
329             self._recommend_cpu(),
330             self._recommend_ram(),
331             self._recommend_keep_cache())
332
333     def _recommend_cpu(self):
334         """Recommend asking for 4 cores if max CPU usage was 333%"""
335
336         constraint_key = self._map_runtime_constraint('vcpus')
337         cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
338         if cpu_max_rate == float('-Inf'):
339             logger.warning('%s: no CPU usage data', self.label)
340             return
341         used_cores = max(1, int(math.ceil(cpu_max_rate)))
342         asked_cores = self.existing_constraints.get(constraint_key)
343         if asked_cores is None or used_cores < asked_cores:
344             yield (
345                 '#!! {} max CPU usage was {}% -- '
346                 'try runtime_constraints "{}":{}'
347             ).format(
348                 self.label,
349                 int(math.ceil(cpu_max_rate*100)),
350                 constraint_key,
351                 int(used_cores))
352
353     def _recommend_ram(self):
354         """Recommend an economical RAM constraint for this job.
355
356         Nodes that are advertised as "8 gibibytes" actually have what
357         we might call "8 nearlygibs" of memory available for jobs.
358         Here, we calculate a whole number of nearlygibs that would
359         have sufficed to run the job, then recommend requesting a node
360         with that number of nearlygibs (expressed as mebibytes).
361
362         Requesting a node with "nearly 8 gibibytes" is our best hope
363         of getting a node that actually has nearly 8 gibibytes
364         available.  If the node manager is smart enough to account for
365         the discrepancy itself when choosing/creating a node, we'll
366         get an 8 GiB node with nearly 8 GiB available.  Otherwise, the
367         advertised size of the next-size-smaller node (say, 6 GiB)
368         will be too low to satisfy our request, so we will effectively
369         get rounded up to 8 GiB.
370
371         For example, if we need 7500 MiB, we can ask for 7500 MiB, and
372         we will generally get a node that is advertised as "8 GiB" and
373         has at least 7500 MiB available.  However, asking for 8192 MiB
374         would either result in an unnecessarily expensive 12 GiB node
375         (if node manager knows about the discrepancy), or an 8 GiB
376         node which has less than 8192 MiB available and is therefore
377         considered by crunch-dispatch to be too small to meet our
378         constraint.
379
380         When node manager learns how to predict the available memory
381         for each node type such that crunch-dispatch always agrees
382         that a node is big enough to run the job it was brought up
383         for, all this will be unnecessary.  We'll just ask for exactly
384         the memory we want -- even if that happens to be 8192 MiB.
385         """
386
387         constraint_key = self._map_runtime_constraint('ram')
388         used_bytes = self.stats_max['mem']['rss']
389         if used_bytes == float('-Inf'):
390             logger.warning('%s: no memory usage data', self.label)
391             return
392         used_mib = math.ceil(float(used_bytes) / 1048576)
393         asked_mib = self.existing_constraints.get(constraint_key)
394
395         nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
396         if asked_mib is None or (
397                 math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib)):
398             yield (
399                 '#!! {} max RSS was {} MiB -- '
400                 'try runtime_constraints "{}":{}'
401             ).format(
402                 self.label,
403                 int(used_mib),
404                 constraint_key,
405                 int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024*(2**20)/self._runtime_constraint_mem_unit()))
406
407     def _recommend_keep_cache(self):
408         """Recommend increasing keep cache if utilization < 80%"""
409         constraint_key = self._map_runtime_constraint('keep_cache_ram')
410         if self.job_tot['net:keep0']['rx'] == 0:
411             return
412         utilization = (float(self.job_tot['blkio:0:0']['read']) /
413                        float(self.job_tot['net:keep0']['rx']))
414         asked_mib = self.existing_constraints.get(constraint_key, 256)
415
416         if utilization < 0.8:
417             yield (
418                 '#!! {} Keep cache utilization was {:.2f}% -- '
419                 'try runtime_constraints "{}":{} (or more)'
420             ).format(
421                 self.label,
422                 utilization * 100.0,
423                 constraint_key,
424                 asked_mib*2*(2**20)/self._runtime_constraint_mem_unit())
425
426
427     def _format(self, val):
428         """Return a string representation of a stat.
429
430         {:.2f} for floats, default format for everything else."""
431         if isinstance(val, float):
432             return '{:.2f}'.format(val)
433         else:
434             return '{}'.format(val)
435
436     def _runtime_constraint_mem_unit(self):
437         if hasattr(self, 'runtime_constraint_mem_unit'):
438             return self.runtime_constraint_mem_unit
439         elif self.detected_crunch1:
440             return JobSummarizer.runtime_constraint_mem_unit
441         else:
442             return ContainerSummarizer.runtime_constraint_mem_unit
443
444     def _map_runtime_constraint(self, key):
445         if hasattr(self, 'map_runtime_constraint'):
446             return self.map_runtime_constraint[key]
447         elif self.detected_crunch1:
448             return JobSummarizer.map_runtime_constraint[key]
449         else:
450             return key
451
452
453 class CollectionSummarizer(Summarizer):
454     def __init__(self, collection_id, **kwargs):
455         super(CollectionSummarizer, self).__init__(
456             crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
457         self.label = collection_id
458
459
460 def NewSummarizer(process_or_uuid, **kwargs):
461     """Construct with the appropriate subclass for this uuid/object."""
462
463     if isinstance(process_or_uuid, dict):
464         process = process_or_uuid
465         uuid = process['uuid']
466     else:
467         uuid = process_or_uuid
468         process = None
469         arv = arvados.api('v1', model=OrderedJsonModel())
470
471     if '-dz642-' in uuid:
472         if process is None:
473             process = arv.containers().get(uuid=uuid).execute()
474         klass = ContainerTreeSummarizer
475     elif '-xvhdp-' in uuid:
476         if process is None:
477             process = arv.container_requests().get(uuid=uuid).execute()
478         klass = ContainerTreeSummarizer
479     elif '-8i9sb-' in uuid:
480         if process is None:
481             process = arv.jobs().get(uuid=uuid).execute()
482         klass = JobTreeSummarizer
483     elif '-d1hrv-' in uuid:
484         if process is None:
485             process = arv.pipeline_instances().get(uuid=uuid).execute()
486         klass = PipelineSummarizer
487     elif '-4zz18-' in uuid:
488         return CollectionSummarizer(collection_id=uuid)
489     else:
490         raise ArgumentError("Unrecognized uuid %s", uuid)
491     return klass(process, uuid=uuid, **kwargs)
492
493
494 class ProcessSummarizer(Summarizer):
495     """Process is a job, pipeline, container, or container request."""
496
497     def __init__(self, process, label=None, **kwargs):
498         rdr = None
499         self.process = process
500         if label is None:
501             label = self.process.get('name', self.process['uuid'])
502         if self.process.get('log'):
503             try:
504                 rdr = crunchstat_summary.reader.CollectionReader(self.process['log'])
505             except arvados.errors.NotFoundError as e:
506                 logger.warning("Trying event logs after failing to read "
507                                "log collection %s: %s", self.process['log'], e)
508         if rdr is None:
509             rdr = crunchstat_summary.reader.LiveLogReader(self.process['uuid'])
510             label = label + ' (partial)'
511         super(ProcessSummarizer, self).__init__(rdr, label=label, **kwargs)
512         self.existing_constraints = self.process.get('runtime_constraints', {})
513
514
515 class JobSummarizer(ProcessSummarizer):
516     runtime_constraint_mem_unit = 1048576
517     map_runtime_constraint = {
518         'keep_cache_ram': 'keep_cache_mb_per_task',
519         'ram': 'min_ram_mb_per_node',
520         'vcpus': 'min_cores_per_node',
521     }
522
523
524 class ContainerSummarizer(ProcessSummarizer):
525     runtime_constraint_mem_unit = 1
526
527
528 class MultiSummarizer(object):
529     def __init__(self, children={}, label=None, threads=1, **kwargs):
530         self.throttle = threading.Semaphore(threads)
531         self.children = children
532         self.label = label
533
534     def run_and_release(self, target, *args, **kwargs):
535         try:
536             return target(*args, **kwargs)
537         finally:
538             self.throttle.release()
539
540     def run(self):
541         threads = []
542         for child in self.children.itervalues():
543             self.throttle.acquire()
544             t = threading.Thread(target=self.run_and_release, args=(child.run, ))
545             t.daemon = True
546             t.start()
547             threads.append(t)
548         for t in threads:
549             t.join()
550
551     def text_report(self):
552         txt = ''
553         d = self._descendants()
554         for child in d.itervalues():
555             if len(d) > 1:
556                 txt += '### Summary for {} ({})\n'.format(
557                     child.label, child.process['uuid'])
558             txt += child.text_report()
559             txt += '\n'
560         return txt
561
562     def _descendants(self):
563         """Dict of self and all descendants.
564
565         Nodes with nothing of their own to report (like
566         MultiSummarizers) are omitted.
567         """
568         d = collections.OrderedDict()
569         for key, child in self.children.iteritems():
570             if isinstance(child, Summarizer):
571                 d[key] = child
572             if isinstance(child, MultiSummarizer):
573                 d.update(child._descendants())
574         return d
575
576     def html_report(self):
577         return WEBCHART_CLASS(self.label, self._descendants().itervalues()).html()
578
579
580 class JobTreeSummarizer(MultiSummarizer):
581     """Summarizes a job and all children listed in its components field."""
582     def __init__(self, job, label=None, **kwargs):
583         arv = arvados.api('v1', model=OrderedJsonModel())
584         label = label or job.get('name', job['uuid'])
585         children = collections.OrderedDict()
586         children[job['uuid']] = JobSummarizer(job, label=label, **kwargs)
587         if job.get('components', None):
588             preloaded = {}
589             for j in arv.jobs().index(
590                     limit=len(job['components']),
591                     filters=[['uuid','in',job['components'].values()]]).execute()['items']:
592                 preloaded[j['uuid']] = j
593             for cname in sorted(job['components'].keys()):
594                 child_uuid = job['components'][cname]
595                 j = (preloaded.get(child_uuid) or
596                      arv.jobs().get(uuid=child_uuid).execute())
597                 children[child_uuid] = JobTreeSummarizer(job=j, label=cname, **kwargs)
598
599         super(JobTreeSummarizer, self).__init__(
600             children=children,
601             label=label,
602             **kwargs)
603
604
605 class PipelineSummarizer(MultiSummarizer):
606     def __init__(self, instance, **kwargs):
607         children = collections.OrderedDict()
608         for cname, component in instance['components'].iteritems():
609             if 'job' not in component:
610                 logger.warning(
611                     "%s: skipping component with no job assigned", cname)
612             else:
613                 logger.info(
614                     "%s: job %s", cname, component['job']['uuid'])
615                 summarizer = JobTreeSummarizer(component['job'], label=cname, **kwargs)
616                 summarizer.label = '{} {}'.format(
617                     cname, component['job']['uuid'])
618                 children[cname] = summarizer
619         super(PipelineSummarizer, self).__init__(
620             children=children,
621             label=instance['uuid'],
622             **kwargs)
623
624
625 class ContainerTreeSummarizer(MultiSummarizer):
626     def __init__(self, root, skip_child_jobs=False, **kwargs):
627         arv = arvados.api('v1', model=OrderedJsonModel())
628
629         label = kwargs.pop('label', None) or root.get('name') or root['uuid']
630         root['name'] = label
631
632         children = collections.OrderedDict()
633         todo = collections.deque((root, ))
634         while len(todo) > 0:
635             current = todo.popleft()
636             label = current['name']
637             sort_key = current['created_at']
638             if current['uuid'].find('-xvhdp-') > 0:
639                 current = arv.containers().get(uuid=current['container_uuid']).execute()
640
641             summer = ContainerSummarizer(current, label=label, **kwargs)
642             summer.sort_key = sort_key
643             children[current['uuid']] = summer
644
645             page_filters = []
646             while True:
647                 child_crs = arv.container_requests().index(
648                     order=['uuid asc'],
649                     filters=page_filters+[
650                         ['requesting_container_uuid', '=', current['uuid']]],
651                 ).execute()
652                 if not child_crs['items']:
653                     break
654                 elif skip_child_jobs:
655                     logger.warning('%s: omitting stats from %d child containers'
656                                    ' because --skip-child-jobs flag is on',
657                                    label, child_crs['items_available'])
658                     break
659                 page_filters = [['uuid', '>', child_crs['items'][-1]['uuid']]]
660                 for cr in child_crs['items']:
661                     if cr['container_uuid']:
662                         logger.debug('%s: container req %s', current['uuid'], cr['uuid'])
663                         cr['name'] = cr.get('name') or cr['uuid']
664                         todo.append(cr)
665         sorted_children = collections.OrderedDict()
666         for uuid in sorted(children.keys(), key=lambda uuid: children[uuid].sort_key):
667             sorted_children[uuid] = children[uuid]
668         super(ContainerTreeSummarizer, self).__init__(
669             children=sorted_children,
670             label=root['name'],
671             **kwargs)