Graph temp disk usage
[arvados.git] / tools / crunchstat-summary / crunchstat_summary / summarizer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 import arvados
6 import collections
7 import crunchstat_summary.dygraphs
8 import crunchstat_summary.reader
9 import datetime
10 import functools
11 import itertools
12 import math
13 import re
14 import sys
15 import threading
16 import _strptime
17
18 from arvados.api import OrderedJsonModel
19 from crunchstat_summary import logger
20
21 # Recommend memory constraints that are this multiple of an integral
22 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
23 # that have amounts like 7.5 GiB according to the kernel.)
24 AVAILABLE_RAM_RATIO = 0.95
25 MB=2**20
26
27 # Workaround datetime.datetime.strptime() thread-safety bug by calling
28 # it once before starting threads.  https://bugs.python.org/issue7980
29 datetime.datetime.strptime('1999-12-31_23:59:59', '%Y-%m-%d_%H:%M:%S')
30
31
32 WEBCHART_CLASS = crunchstat_summary.dygraphs.DygraphsChart
33
34
35 class Task(object):
36     def __init__(self):
37         self.starttime = None
38         self.finishtime = None
39         self.series = collections.defaultdict(list)
40
41
42 class Summarizer(object):
43     def __init__(self, logdata, label=None, skip_child_jobs=False, uuid=None, **kwargs):
44         self._logdata = logdata
45
46         self.uuid = uuid
47         self.label = label
48         self.starttime = None
49         self.finishtime = None
50         self._skip_child_jobs = skip_child_jobs
51
52         # stats_max: {category: {stat: val}}
53         self.stats_max = collections.defaultdict(
54             functools.partial(collections.defaultdict, lambda: 0))
55         # task_stats: {task_id: {category: {stat: val}}}
56         self.task_stats = collections.defaultdict(
57             functools.partial(collections.defaultdict, dict))
58
59         self.seq_to_uuid = {}
60         self.tasks = collections.defaultdict(Task)
61
62         # We won't bother recommending new runtime constraints if the
63         # constraints given when running the job are known to us and
64         # are already suitable.  If applicable, the subclass
65         # constructor will overwrite this with something useful.
66         self.existing_constraints = {}
67
68         logger.debug("%s: logdata %s", self.label, logdata)
69
70     def run(self):
71         logger.debug("%s: parsing logdata %s", self.label, self._logdata)
72         with self._logdata as logdata:
73             self._run(logdata)
74
75     def _run(self, logdata):
76         self.detected_crunch1 = False
77         for line in logdata:
78             if not self.detected_crunch1 and '-8i9sb-' in line:
79                 self.detected_crunch1 = True
80
81             if self.detected_crunch1:
82                 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
83                 if m:
84                     seq = int(m.group('seq'))
85                     uuid = m.group('task_uuid')
86                     self.seq_to_uuid[seq] = uuid
87                     logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
88                     continue
89
90                 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
91                 if m:
92                     task_id = self.seq_to_uuid[int(m.group('seq'))]
93                     elapsed = int(m.group('elapsed'))
94                     self.task_stats[task_id]['time'] = {'elapsed': elapsed}
95                     if elapsed > self.stats_max['time']['elapsed']:
96                         self.stats_max['time']['elapsed'] = elapsed
97                     continue
98
99                 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
100                 if m:
101                     uuid = m.group('uuid')
102                     if self._skip_child_jobs:
103                         logger.warning('%s: omitting stats from child job %s'
104                                        ' because --skip-child-jobs flag is on',
105                                        self.label, uuid)
106                         continue
107                     logger.debug('%s: follow %s', self.label, uuid)
108                     child_summarizer = ProcessSummarizer(uuid)
109                     child_summarizer.stats_max = self.stats_max
110                     child_summarizer.task_stats = self.task_stats
111                     child_summarizer.tasks = self.tasks
112                     child_summarizer.starttime = self.starttime
113                     child_summarizer.run()
114                     logger.debug('%s: done %s', self.label, uuid)
115                     continue
116
117                 # 2017-12-02_17:15:08 e51c5-8i9sb-mfp68stkxnqdd6m 63676 0 stderr crunchstat: keepcalls 0 put 2576 get -- interval 10.0000 seconds 0 put 2576 get
118                 m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr (?P<crunchstat>crunchstat: )(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
119                 if not m:
120                     continue
121             else:
122                 # crunch2
123                 # 2017-12-01T16:56:24.723509200Z crunchstat: keepcalls 0 put 3 get -- interval 10.0000 seconds 0 put 3 get
124                 m = re.search(r'^(?P<timestamp>\S+) (?P<crunchstat>crunchstat: )?(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
125                 if not m:
126                     continue
127
128             if self.label is None:
129                 try:
130                     self.label = m.group('job_uuid')
131                 except IndexError:
132                     self.label = 'label #1'
133             category = m.group('category')
134             if category.endswith(':'):
135                 # "stderr crunchstat: notice: ..."
136                 continue
137             elif category in ('error', 'caught'):
138                 continue
139             elif category in ('read', 'open', 'cgroup', 'CID', 'Running'):
140                 # "stderr crunchstat: read /proc/1234/net/dev: ..."
141                 # (old logs are less careful with unprefixed error messages)
142                 continue
143
144             if self.detected_crunch1:
145                 task_id = self.seq_to_uuid[int(m.group('seq'))]
146             else:
147                 task_id = 'container'
148             task = self.tasks[task_id]
149
150             # Use the first and last crunchstat timestamps as
151             # approximations of starttime and finishtime.
152             timestamp = m.group('timestamp')
153             if timestamp[10:11] == '_':
154                 timestamp = datetime.datetime.strptime(
155                     timestamp, '%Y-%m-%d_%H:%M:%S')
156             elif timestamp[10:11] == 'T':
157                 timestamp = datetime.datetime.strptime(
158                     timestamp[:19], '%Y-%m-%dT%H:%M:%S')
159             else:
160                 raise ValueError("Cannot parse timestamp {!r}".format(
161                     timestamp))
162
163             if task.starttime is None:
164                 logger.debug('%s: task %s starttime %s',
165                              self.label, task_id, timestamp)
166             if task.starttime is None or timestamp < task.starttime:
167                 task.starttime = timestamp
168             if task.finishtime is None or timestamp > task.finishtime:
169                 task.finishtime = timestamp
170
171             if self.starttime is None or timestamp < self.starttime:
172                 self.starttime = timestamp
173             if self.finishtime is None or timestamp > self.finishtime:
174                 self.finishtime = timestamp
175
176             if (not self.detected_crunch1) and task.starttime is not None and task.finishtime is not None:
177                 elapsed = (task.finishtime - task.starttime).seconds
178                 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
179                 if elapsed > self.stats_max['time']['elapsed']:
180                     self.stats_max['time']['elapsed'] = elapsed
181
182             this_interval_s = None
183             for group in ['current', 'interval']:
184                 if not m.group(group):
185                     continue
186                 category = m.group('category')
187                 words = m.group(group).split(' ')
188                 stats = {}
189                 try:
190                     for val, stat in zip(words[::2], words[1::2]):
191                         if '.' in val:
192                             stats[stat] = float(val)
193                         else:
194                             stats[stat] = int(val)
195                 except ValueError as e:
196                     # If the line doesn't start with 'crunchstat:' we
197                     # might have mistaken an error message for a
198                     # structured crunchstat line.
199                     if m.group("crunchstat") is None or m.group("category") == "crunchstat":
200                         logger.warning("%s: log contains message\n  %s", self.label, line)
201                     else:
202                         logger.warning(
203                             '%s: Error parsing value %r (stat %r, category %r): %r',
204                             self.label, val, stat, category, e)
205                         logger.warning('%s', line)
206                     continue
207                 if 'user' in stats or 'sys' in stats:
208                     stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
209                 if 'tx' in stats or 'rx' in stats:
210                     stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
211                 if group == 'interval':
212                     if 'seconds' in stats:
213                         this_interval_s = stats.get('seconds',0)
214                         del stats['seconds']
215                         if this_interval_s <= 0:
216                             logger.error(
217                                 "BUG? interval stat given with duration {!r}".
218                                 format(this_interval_s))
219                     else:
220                         logger.error('BUG? interval stat missing duration')
221                 for stat, val in stats.items():
222                     if group == 'interval' and this_interval_s:
223                             stat = stat + '__rate'
224                             val = val / this_interval_s
225                             if stat in ['user+sys__rate', 'user__rate', 'sys__rate', 'tx+rx__rate', 'rx__rate', 'tx__rate']:
226                                 task.series[category, stat].append(
227                                     (timestamp - self.starttime, val))
228                     else:
229                         if stat in ['rss','used','total']:
230                             task.series[category, stat].append(
231                                 (timestamp - self.starttime, val))
232                         self.task_stats[task_id][category][stat] = val
233                     if val > self.stats_max[category][stat]:
234                         self.stats_max[category][stat] = val
235         logger.debug('%s: done parsing', self.label)
236
237         self.job_tot = collections.defaultdict(
238             functools.partial(collections.defaultdict, int))
239         for task_id, task_stat in self.task_stats.items():
240             for category, stat_last in task_stat.items():
241                 for stat, val in stat_last.items():
242                     if stat in ['cpus', 'cache', 'swap', 'rss']:
243                         # meaningless stats like 16 cpu cores x 5 tasks = 80
244                         continue
245                     self.job_tot[category][stat] += val
246         logger.debug('%s: done totals', self.label)
247
248     def long_label(self):
249         label = self.label
250         if hasattr(self, 'process') and self.process['uuid'] not in label:
251             label = '{} ({})'.format(label, self.process['uuid'])
252         if self.finishtime:
253             label += ' -- elapsed time '
254             s = (self.finishtime - self.starttime).total_seconds()
255             if s > 86400:
256                 label += '{}d'.format(int(s/86400))
257             if s > 3600:
258                 label += '{}h'.format(int(s/3600) % 24)
259             if s > 60:
260                 label += '{}m'.format(int(s/60) % 60)
261             label += '{}s'.format(int(s) % 60)
262         return label
263
264     def text_report(self):
265         if not self.tasks:
266             return "(no report generated)\n"
267         return "\n".join(itertools.chain(
268             self._text_report_gen(),
269             self._recommend_gen())) + "\n"
270
271     def html_report(self):
272         return WEBCHART_CLASS(self.label, [self]).html()
273
274     def _text_report_gen(self):
275         yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
276         for category, stat_max in sorted(self.stats_max.items()):
277             for stat, val in sorted(stat_max.items()):
278                 if stat.endswith('__rate'):
279                     continue
280                 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
281                 val = self._format(val)
282                 tot = self._format(self.job_tot[category].get(stat, '-'))
283                 yield "\t".join([category, stat, str(val), max_rate, tot])
284         for args in (
285                 ('Number of tasks: {}',
286                  len(self.tasks),
287                  None),
288                 ('Max CPU time spent by a single task: {}s',
289                  self.stats_max['cpu']['user+sys'],
290                  None),
291                 ('Max CPU usage in a single interval: {}%',
292                  self.stats_max['cpu']['user+sys__rate'],
293                  lambda x: x * 100),
294                 ('Overall CPU usage: {}%',
295                  float(self.job_tot['cpu']['user+sys']) /
296                  self.job_tot['time']['elapsed']
297                  if self.job_tot['time']['elapsed'] > 0 else 0,
298                  lambda x: x * 100),
299                 ('Max memory used by a single task: {}GB',
300                  self.stats_max['mem']['rss'],
301                  lambda x: x / 1e9),
302                 ('Max network traffic in a single task: {}GB',
303                  self.stats_max['net:eth0']['tx+rx'] +
304                  self.stats_max['net:keep0']['tx+rx'],
305                  lambda x: x / 1e9),
306                 ('Max network speed in a single interval: {}MB/s',
307                  self.stats_max['net:eth0']['tx+rx__rate'] +
308                  self.stats_max['net:keep0']['tx+rx__rate'],
309                  lambda x: x / 1e6),
310                 ('Keep cache miss rate {}%',
311                  (float(self.job_tot['keepcache']['miss']) /
312                  float(self.job_tot['keepcalls']['get']))
313                  if self.job_tot['keepcalls']['get'] > 0 else 0,
314                  lambda x: x * 100.0),
315                 ('Keep cache utilization {}%',
316                  (float(self.job_tot['blkio:0:0']['read']) /
317                  float(self.job_tot['net:keep0']['rx']))
318                  if self.job_tot['net:keep0']['rx'] > 0 else 0,
319                  lambda x: x * 100.0),
320                ('Temp disk utilization {}%',
321                  (float(self.job_tot['statfs']['used']) /
322                  float(self.job_tot['statfs']['total']))
323                  if self.job_tot['statfs']['total'] > 0 else 0,
324                  lambda x: x * 100.0),
325                 ):
326             format_string, val, transform = args
327             if val == float('-Inf'):
328                 continue
329             if transform:
330                 val = transform(val)
331             yield "# "+format_string.format(self._format(val))
332
333     def _recommend_gen(self):
334         # TODO recommend fixing job granularity if elapsed time is too short
335         return itertools.chain(
336             self._recommend_cpu(),
337             self._recommend_ram(),
338             self._recommend_keep_cache())
339
340     def _recommend_cpu(self):
341         """Recommend asking for 4 cores if max CPU usage was 333%"""
342
343         constraint_key = self._map_runtime_constraint('vcpus')
344         cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
345         if cpu_max_rate == float('-Inf') or cpu_max_rate == 0.0:
346             logger.warning('%s: no CPU usage data', self.label)
347             return
348         # TODO Don't necessarily want to recommend on isolated max peak
349         # take average CPU usage into account as well or % time at max
350         used_cores = max(1, int(math.ceil(cpu_max_rate)))
351         asked_cores = self.existing_constraints.get(constraint_key)
352         if asked_cores is None:
353             asked_cores = 1
354         # TODO: This should be more nuanced in cases where max >> avg
355         if used_cores < asked_cores:
356             yield (
357                 '#!! {} max CPU usage was {}% -- '
358                 'try reducing runtime_constraints to "{}":{}'
359             ).format(
360                 self.label,
361                 math.ceil(cpu_max_rate*100),
362                 constraint_key,
363                 int(used_cores))
364
365     # FIXME: This needs to be updated to account for current nodemanager algorithms
366     def _recommend_ram(self):
367         """Recommend an economical RAM constraint for this job.
368
369         Nodes that are advertised as "8 gibibytes" actually have what
370         we might call "8 nearlygibs" of memory available for jobs.
371         Here, we calculate a whole number of nearlygibs that would
372         have sufficed to run the job, then recommend requesting a node
373         with that number of nearlygibs (expressed as mebibytes).
374
375         Requesting a node with "nearly 8 gibibytes" is our best hope
376         of getting a node that actually has nearly 8 gibibytes
377         available.  If the node manager is smart enough to account for
378         the discrepancy itself when choosing/creating a node, we'll
379         get an 8 GiB node with nearly 8 GiB available.  Otherwise, the
380         advertised size of the next-size-smaller node (say, 6 GiB)
381         will be too low to satisfy our request, so we will effectively
382         get rounded up to 8 GiB.
383
384         For example, if we need 7500 MiB, we can ask for 7500 MiB, and
385         we will generally get a node that is advertised as "8 GiB" and
386         has at least 7500 MiB available.  However, asking for 8192 MiB
387         would either result in an unnecessarily expensive 12 GiB node
388         (if node manager knows about the discrepancy), or an 8 GiB
389         node which has less than 8192 MiB available and is therefore
390         considered by crunch-dispatch to be too small to meet our
391         constraint.
392
393         When node manager learns how to predict the available memory
394         for each node type such that crunch-dispatch always agrees
395         that a node is big enough to run the job it was brought up
396         for, all this will be unnecessary.  We'll just ask for exactly
397         the memory we want -- even if that happens to be 8192 MiB.
398         """
399
400         constraint_key = self._map_runtime_constraint('ram')
401         used_bytes = self.stats_max['mem']['rss']
402         if used_bytes == float('-Inf'):
403             logger.warning('%s: no memory usage data', self.label)
404             return
405         used_mib = math.ceil(float(used_bytes) / MB)
406         asked_mib = self.existing_constraints.get(constraint_key)
407
408         nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
409         if used_mib > 0 and (asked_mib is None or (
410                 math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib))):
411             yield (
412                 '#!! {} max RSS was {} MiB -- '
413                 'try reducing runtime_constraints to "{}":{}'
414             ).format(
415                 self.label,
416                 int(used_mib),
417                 constraint_key,
418                 int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024*(MB)/self._runtime_constraint_mem_unit()))
419
420     def _recommend_keep_cache(self):
421         """Recommend increasing keep cache if utilization < 80%"""
422         constraint_key = self._map_runtime_constraint('keep_cache_ram')
423         if self.job_tot['net:keep0']['rx'] == 0:
424             return
425         utilization = (float(self.job_tot['blkio:0:0']['read']) /
426                        float(self.job_tot['net:keep0']['rx']))
427         # FIXME: the default on this get won't work correctly
428         asked_cache = self.existing_constraints.get(constraint_key, 256) * self._runtime_constraint_mem_unit()
429
430         if utilization < 0.8:
431             yield (
432                 '#!! {} Keep cache utilization was {:.2f}% -- '
433                 'try doubling runtime_constraints to "{}":{} (or more)'
434             ).format(
435                 self.label,
436                 utilization * 100.0,
437                 constraint_key,
438                 math.ceil(asked_cache * 2 / self._runtime_constraint_mem_unit()))
439
440
441     def _format(self, val):
442         """Return a string representation of a stat.
443
444         {:.2f} for floats, default format for everything else."""
445         if isinstance(val, float):
446             return '{:.2f}'.format(val)
447         else:
448             return '{}'.format(val)
449
450     def _runtime_constraint_mem_unit(self):
451         if hasattr(self, 'runtime_constraint_mem_unit'):
452             return self.runtime_constraint_mem_unit
453         elif self.detected_crunch1:
454             return JobSummarizer.runtime_constraint_mem_unit
455         else:
456             return ContainerSummarizer.runtime_constraint_mem_unit
457
458     def _map_runtime_constraint(self, key):
459         if hasattr(self, 'map_runtime_constraint'):
460             return self.map_runtime_constraint[key]
461         elif self.detected_crunch1:
462             return JobSummarizer.map_runtime_constraint[key]
463         else:
464             return key
465
466
467 class CollectionSummarizer(Summarizer):
468     def __init__(self, collection_id, **kwargs):
469         super(CollectionSummarizer, self).__init__(
470             crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
471         self.label = collection_id
472
473
474 def NewSummarizer(process_or_uuid, **kwargs):
475     """Construct with the appropriate subclass for this uuid/object."""
476
477     if isinstance(process_or_uuid, dict):
478         process = process_or_uuid
479         uuid = process['uuid']
480     else:
481         uuid = process_or_uuid
482         process = None
483         arv = arvados.api('v1', model=OrderedJsonModel())
484
485     if '-dz642-' in uuid:
486         if process is None:
487             process = arv.containers().get(uuid=uuid).execute()
488         klass = ContainerTreeSummarizer
489     elif '-xvhdp-' in uuid:
490         if process is None:
491             process = arv.container_requests().get(uuid=uuid).execute()
492         klass = ContainerTreeSummarizer
493     elif '-8i9sb-' in uuid:
494         if process is None:
495             process = arv.jobs().get(uuid=uuid).execute()
496         klass = JobTreeSummarizer
497     elif '-d1hrv-' in uuid:
498         if process is None:
499             process = arv.pipeline_instances().get(uuid=uuid).execute()
500         klass = PipelineSummarizer
501     elif '-4zz18-' in uuid:
502         return CollectionSummarizer(collection_id=uuid)
503     else:
504         raise ArgumentError("Unrecognized uuid %s", uuid)
505     return klass(process, uuid=uuid, **kwargs)
506
507
508 class ProcessSummarizer(Summarizer):
509     """Process is a job, pipeline, container, or container request."""
510
511     def __init__(self, process, label=None, **kwargs):
512         rdr = None
513         self.process = process
514         if label is None:
515             label = self.process.get('name', self.process['uuid'])
516         if self.process.get('log'):
517             try:
518                 rdr = crunchstat_summary.reader.CollectionReader(self.process['log'])
519             except arvados.errors.NotFoundError as e:
520                 logger.warning("Trying event logs after failing to read "
521                                "log collection %s: %s", self.process['log'], e)
522         if rdr is None:
523             rdr = crunchstat_summary.reader.LiveLogReader(self.process['uuid'])
524             label = label + ' (partial)'
525         super(ProcessSummarizer, self).__init__(rdr, label=label, **kwargs)
526         self.existing_constraints = self.process.get('runtime_constraints', {})
527
528
529 class JobSummarizer(ProcessSummarizer):
530     runtime_constraint_mem_unit = MB
531     map_runtime_constraint = {
532         'keep_cache_ram': 'keep_cache_mb_per_task',
533         'ram': 'min_ram_mb_per_node',
534         'vcpus': 'min_cores_per_node',
535     }
536
537
538 class ContainerSummarizer(ProcessSummarizer):
539     runtime_constraint_mem_unit = 1
540
541
542 class MultiSummarizer(object):
543     def __init__(self, children={}, label=None, threads=1, **kwargs):
544         self.throttle = threading.Semaphore(threads)
545         self.children = children
546         self.label = label
547
548     def run_and_release(self, target, *args, **kwargs):
549         try:
550             return target(*args, **kwargs)
551         finally:
552             self.throttle.release()
553
554     def run(self):
555         threads = []
556         for child in self.children.values():
557             self.throttle.acquire()
558             t = threading.Thread(target=self.run_and_release, args=(child.run, ))
559             t.daemon = True
560             t.start()
561             threads.append(t)
562         for t in threads:
563             t.join()
564
565     def text_report(self):
566         txt = ''
567         d = self._descendants()
568         for child in d.values():
569             if len(d) > 1:
570                 txt += '### Summary for {} ({})\n'.format(
571                     child.label, child.process['uuid'])
572             txt += child.text_report()
573             txt += '\n'
574         return txt
575
576     def _descendants(self):
577         """Dict of self and all descendants.
578
579         Nodes with nothing of their own to report (like
580         MultiSummarizers) are omitted.
581         """
582         d = collections.OrderedDict()
583         for key, child in self.children.items():
584             if isinstance(child, Summarizer):
585                 d[key] = child
586             if isinstance(child, MultiSummarizer):
587                 d.update(child._descendants())
588         return d
589
590     def html_report(self):
591         return WEBCHART_CLASS(self.label, iter(self._descendants().values())).html()
592
593
594 class JobTreeSummarizer(MultiSummarizer):
595     """Summarizes a job and all children listed in its components field."""
596     def __init__(self, job, label=None, **kwargs):
597         arv = arvados.api('v1', model=OrderedJsonModel())
598         label = label or job.get('name', job['uuid'])
599         children = collections.OrderedDict()
600         children[job['uuid']] = JobSummarizer(job, label=label, **kwargs)
601         if job.get('components', None):
602             preloaded = {}
603             for j in arv.jobs().index(
604                     limit=len(job['components']),
605                     filters=[['uuid','in',list(job['components'].values())]]).execute()['items']:
606                 preloaded[j['uuid']] = j
607             for cname in sorted(job['components'].keys()):
608                 child_uuid = job['components'][cname]
609                 j = (preloaded.get(child_uuid) or
610                      arv.jobs().get(uuid=child_uuid).execute())
611                 children[child_uuid] = JobTreeSummarizer(job=j, label=cname, **kwargs)
612
613         super(JobTreeSummarizer, self).__init__(
614             children=children,
615             label=label,
616             **kwargs)
617
618
619 class PipelineSummarizer(MultiSummarizer):
620     def __init__(self, instance, **kwargs):
621         children = collections.OrderedDict()
622         for cname, component in instance['components'].items():
623             if 'job' not in component:
624                 logger.warning(
625                     "%s: skipping component with no job assigned", cname)
626             else:
627                 logger.info(
628                     "%s: job %s", cname, component['job']['uuid'])
629                 summarizer = JobTreeSummarizer(component['job'], label=cname, **kwargs)
630                 summarizer.label = '{} {}'.format(
631                     cname, component['job']['uuid'])
632                 children[cname] = summarizer
633         super(PipelineSummarizer, self).__init__(
634             children=children,
635             label=instance['uuid'],
636             **kwargs)
637
638
639 class ContainerTreeSummarizer(MultiSummarizer):
640     def __init__(self, root, skip_child_jobs=False, **kwargs):
641         arv = arvados.api('v1', model=OrderedJsonModel())
642
643         label = kwargs.pop('label', None) or root.get('name') or root['uuid']
644         root['name'] = label
645
646         children = collections.OrderedDict()
647         todo = collections.deque((root, ))
648         while len(todo) > 0:
649             current = todo.popleft()
650             label = current['name']
651             sort_key = current['created_at']
652             if current['uuid'].find('-xvhdp-') > 0:
653                 current = arv.containers().get(uuid=current['container_uuid']).execute()
654
655             summer = ContainerSummarizer(current, label=label, **kwargs)
656             summer.sort_key = sort_key
657             children[current['uuid']] = summer
658
659             page_filters = []
660             while True:
661                 child_crs = arv.container_requests().index(
662                     order=['uuid asc'],
663                     filters=page_filters+[
664                         ['requesting_container_uuid', '=', current['uuid']]],
665                 ).execute()
666                 if not child_crs['items']:
667                     break
668                 elif skip_child_jobs:
669                     logger.warning('%s: omitting stats from %d child containers'
670                                    ' because --skip-child-jobs flag is on',
671                                    label, child_crs['items_available'])
672                     break
673                 page_filters = [['uuid', '>', child_crs['items'][-1]['uuid']]]
674                 for cr in child_crs['items']:
675                     if cr['container_uuid']:
676                         logger.debug('%s: container req %s', current['uuid'], cr['uuid'])
677                         cr['name'] = cr.get('name') or cr['uuid']
678                         todo.append(cr)
679         sorted_children = collections.OrderedDict()
680         for uuid in sorted(list(children.keys()), key=lambda uuid: children[uuid].sort_key):
681             sorted_children[uuid] = children[uuid]
682         super(ContainerTreeSummarizer, self).__init__(
683             children=sorted_children,
684             label=root['name'],
685             **kwargs)