6d567e68dc068f443cccc41435bde9ffaa9c32b8
[arvados.git] / tools / crunchstat-summary / crunchstat_summary / summarizer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 import arvados
6 import collections
7 import crunchstat_summary.dygraphs
8 import crunchstat_summary.reader
9 import datetime
10 import functools
11 import itertools
12 import math
13 import re
14 import sys
15 import threading
16 import _strptime
17
18 from arvados.api import OrderedJsonModel
19 from crunchstat_summary import logger
20
21 # Recommend memory constraints that are this multiple of an integral
22 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
23 # that have amounts like 7.5 GiB according to the kernel.)
24 AVAILABLE_RAM_RATIO = 0.95
25 MB=2**20
26
27 # Workaround datetime.datetime.strptime() thread-safety bug by calling
28 # it once before starting threads.  https://bugs.python.org/issue7980
29 datetime.datetime.strptime('1999-12-31_23:59:59', '%Y-%m-%d_%H:%M:%S')
30
31
32 WEBCHART_CLASS = crunchstat_summary.dygraphs.DygraphsChart
33
34
35 class Task(object):
36     def __init__(self):
37         self.starttime = None
38         self.finishtime = None
39         self.series = collections.defaultdict(list)
40
41
42 class Summarizer(object):
43     def __init__(self, logdata, label=None, skip_child_jobs=False, uuid=None, **kwargs):
44         self._logdata = logdata
45
46         self.uuid = uuid
47         self.label = label
48         self.starttime = None
49         self.finishtime = None
50         self._skip_child_jobs = skip_child_jobs
51
52         # stats_max: {category: {stat: val}}
53         self.stats_max = collections.defaultdict(
54             functools.partial(collections.defaultdict, lambda: 0))
55         # task_stats: {task_id: {category: {stat: val}}}
56         self.task_stats = collections.defaultdict(
57             functools.partial(collections.defaultdict, dict))
58
59         self.seq_to_uuid = {}
60         self.tasks = collections.defaultdict(Task)
61
62         # We won't bother recommending new runtime constraints if the
63         # constraints given when running the job are known to us and
64         # are already suitable.  If applicable, the subclass
65         # constructor will overwrite this with something useful.
66         self.existing_constraints = {}
67
68         logger.debug("%s: logdata %s", self.label, logdata)
69
70     def run(self):
71         logger.debug("%s: parsing logdata %s", self.label, self._logdata)
72         with self._logdata as logdata:
73             self._run(logdata)
74
75     def _run(self, logdata):
76         self.detected_crunch1 = False
77         for line in logdata:
78             if not self.detected_crunch1 and '-8i9sb-' in line:
79                 self.detected_crunch1 = True
80
81             if self.detected_crunch1:
82                 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
83                 if m:
84                     seq = int(m.group('seq'))
85                     uuid = m.group('task_uuid')
86                     self.seq_to_uuid[seq] = uuid
87                     logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
88                     continue
89
90                 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
91                 if m:
92                     task_id = self.seq_to_uuid[int(m.group('seq'))]
93                     elapsed = int(m.group('elapsed'))
94                     self.task_stats[task_id]['time'] = {'elapsed': elapsed}
95                     if elapsed > self.stats_max['time']['elapsed']:
96                         self.stats_max['time']['elapsed'] = elapsed
97                     continue
98
99                 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
100                 if m:
101                     uuid = m.group('uuid')
102                     if self._skip_child_jobs:
103                         logger.warning('%s: omitting stats from child job %s'
104                                        ' because --skip-child-jobs flag is on',
105                                        self.label, uuid)
106                         continue
107                     logger.debug('%s: follow %s', self.label, uuid)
108                     child_summarizer = ProcessSummarizer(uuid)
109                     child_summarizer.stats_max = self.stats_max
110                     child_summarizer.task_stats = self.task_stats
111                     child_summarizer.tasks = self.tasks
112                     child_summarizer.starttime = self.starttime
113                     child_summarizer.run()
114                     logger.debug('%s: done %s', self.label, uuid)
115                     continue
116
117                 # 2017-12-02_17:15:08 e51c5-8i9sb-mfp68stkxnqdd6m 63676 0 stderr crunchstat: keepcalls 0 put 2576 get -- interval 10.0000 seconds 0 put 2576 get
118                 m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr (?P<crunchstat>crunchstat: )(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
119                 if not m:
120                     continue
121             else:
122                 # crunch2
123                 # 2017-12-01T16:56:24.723509200Z crunchstat: keepcalls 0 put 3 get -- interval 10.0000 seconds 0 put 3 get
124                 m = re.search(r'^(?P<timestamp>\S+) (?P<crunchstat>crunchstat: )?(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
125                 if not m:
126                     continue
127
128             if self.label is None:
129                 try:
130                     self.label = m.group('job_uuid')
131                 except IndexError:
132                     self.label = 'container'
133             if m.group('category').endswith(':'):
134                 # "stderr crunchstat: notice: ..."
135                 continue
136             elif m.group('category') in ('error', 'caught'):
137                 continue
138             elif m.group('category') in ('read', 'open', 'cgroup', 'CID', 'Running'):
139                 # "stderr crunchstat: read /proc/1234/net/dev: ..."
140                 # (old logs are less careful with unprefixed error messages)
141                 continue
142
143             if self.detected_crunch1:
144                 task_id = self.seq_to_uuid[int(m.group('seq'))]
145             else:
146                 task_id = 'container'
147             task = self.tasks[task_id]
148
149             # Use the first and last crunchstat timestamps as
150             # approximations of starttime and finishtime.
151             timestamp = m.group('timestamp')
152             if timestamp[10:11] == '_':
153                 timestamp = datetime.datetime.strptime(
154                     timestamp, '%Y-%m-%d_%H:%M:%S')
155             elif timestamp[10:11] == 'T':
156                 timestamp = datetime.datetime.strptime(
157                     timestamp[:19], '%Y-%m-%dT%H:%M:%S')
158             else:
159                 raise ValueError("Cannot parse timestamp {!r}".format(
160                     timestamp))
161
162             if task.starttime is None:
163                 logger.debug('%s: task %s starttime %s',
164                              self.label, task_id, timestamp)
165             if task.starttime is None or timestamp < task.starttime:
166                 task.starttime = timestamp
167             if task.finishtime is None or timestamp > task.finishtime:
168                 task.finishtime = timestamp
169
170             if self.starttime is None or timestamp < task.starttime:
171                 self.starttime = timestamp
172             if self.finishtime is None or timestamp < task.finishtime:
173                 self.finishtime = timestamp
174
175             if (not self.detected_crunch1) and task.starttime is not None and task.finishtime is not None:
176                 elapsed = (task.finishtime - task.starttime).seconds
177                 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
178                 if elapsed > self.stats_max['time']['elapsed']:
179                     self.stats_max['time']['elapsed'] = elapsed
180
181             this_interval_s = None
182             for group in ['current', 'interval']:
183                 if not m.group(group):
184                     continue
185                 category = m.group('category')
186                 words = m.group(group).split(' ')
187                 stats = {}
188                 try:
189                     for val, stat in zip(words[::2], words[1::2]):
190                         if '.' in val:
191                             stats[stat] = float(val)
192                         else:
193                             stats[stat] = int(val)
194                 except ValueError as e:
195                     # If the line doesn't start with 'crunchstat:' we
196                     # might have mistaken an error message for a
197                     # structured crunchstat line.
198                     if m.group("crunchstat") is None or m.group("category") == "crunchstat":
199                         logger.warning("%s: log contains message\n  %s", self.label, line)
200                     else:
201                         logger.warning(
202                             '%s: Error parsing value %r (stat %r, category %r): %r',
203                             self.label, val, stat, category, e)
204                         logger.warning('%s', line)
205                     continue
206                 if 'user' in stats or 'sys' in stats:
207                     stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
208                 if 'tx' in stats or 'rx' in stats:
209                     stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
210                 if group == 'interval':
211                     if 'seconds' in stats:
212                         this_interval_s = stats.get('seconds',0)
213                         del stats['seconds']
214                         if this_interval_s <= 0:
215                             logger.error(
216                                 "BUG? interval stat given with duration {!r}".
217                                 format(this_interval_s))
218                     else:
219                         logger.error('BUG? interval stat missing duration')
220                 for stat, val in stats.items():
221                     if group == 'interval' and this_interval_s:
222                             stat = stat + '__rate'
223                             val = val / this_interval_s
224                             if stat in ['user+sys__rate', 'tx+rx__rate']:
225                                 task.series[category, stat].append(
226                                     (timestamp - self.starttime, val))
227                     else:
228                         if stat in ['rss']:
229                             task.series[category, stat].append(
230                                 (timestamp - self.starttime, val))
231                         self.task_stats[task_id][category][stat] = val
232                     if val > self.stats_max[category][stat]:
233                         self.stats_max[category][stat] = val
234         logger.debug('%s: done parsing', self.label)
235
236         self.job_tot = collections.defaultdict(
237             functools.partial(collections.defaultdict, int))
238         for task_id, task_stat in self.task_stats.items():
239             for category, stat_last in task_stat.items():
240                 for stat, val in stat_last.items():
241                     if stat in ['cpus', 'cache', 'swap', 'rss']:
242                         # meaningless stats like 16 cpu cores x 5 tasks = 80
243                         continue
244                     self.job_tot[category][stat] += val
245         logger.debug('%s: done totals', self.label)
246
247     def long_label(self):
248         label = self.label
249         if hasattr(self, 'process') and self.process['uuid'] not in label:
250             label = '{} ({})'.format(label, self.process['uuid'])
251         if self.finishtime:
252             label += ' -- elapsed time '
253             s = (self.finishtime - self.starttime).total_seconds()
254             if s > 86400:
255                 label += '{}d'.format(int(s/86400))
256             if s > 3600:
257                 label += '{}h'.format(int(s/3600) % 24)
258             if s > 60:
259                 label += '{}m'.format(int(s/60) % 60)
260             label += '{}s'.format(int(s) % 60)
261         return label
262
263     def text_report(self):
264         if not self.tasks:
265             return "(no report generated)\n"
266         return "\n".join(itertools.chain(
267             self._text_report_gen(),
268             self._recommend_gen())) + "\n"
269
270     def html_report(self):
271         return WEBCHART_CLASS(self.label, [self]).html()
272
273     def _text_report_gen(self):
274         yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
275         for category, stat_max in sorted(self.stats_max.items()):
276             for stat, val in sorted(stat_max.items()):
277                 if stat.endswith('__rate'):
278                     continue
279                 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
280                 val = self._format(val)
281                 tot = self._format(self.job_tot[category].get(stat, '-'))
282                 yield "\t".join([category, stat, str(val), max_rate, tot])
283         for args in (
284                 ('Number of tasks: {}',
285                  len(self.tasks),
286                  None),
287                 ('Max CPU time spent by a single task: {}s',
288                  self.stats_max['cpu']['user+sys'],
289                  None),
290                 ('Max CPU usage in a single interval: {}%',
291                  self.stats_max['cpu']['user+sys__rate'],
292                  lambda x: x * 100),
293                 ('Overall CPU usage: {}%',
294                  float(self.job_tot['cpu']['user+sys']) /
295                  self.job_tot['time']['elapsed']
296                  if self.job_tot['time']['elapsed'] > 0 else 0,
297                  lambda x: x * 100),
298                 ('Max memory used by a single task: {}GB',
299                  self.stats_max['mem']['rss'],
300                  lambda x: x / 1e9),
301                 ('Max network traffic in a single task: {}GB',
302                  self.stats_max['net:eth0']['tx+rx'] +
303                  self.stats_max['net:keep0']['tx+rx'],
304                  lambda x: x / 1e9),
305                 ('Max network speed in a single interval: {}MB/s',
306                  self.stats_max['net:eth0']['tx+rx__rate'] +
307                  self.stats_max['net:keep0']['tx+rx__rate'],
308                  lambda x: x / 1e6),
309                 ('Keep cache miss rate {}%',
310                  (float(self.job_tot['keepcache']['miss']) /
311                  float(self.job_tot['keepcalls']['get']))
312                  if self.job_tot['keepcalls']['get'] > 0 else 0,
313                  lambda x: x * 100.0),
314                 ('Keep cache utilization {}%',
315                  (float(self.job_tot['blkio:0:0']['read']) /
316                  float(self.job_tot['net:keep0']['rx']))
317                  if self.job_tot['net:keep0']['rx'] > 0 else 0,
318                  lambda x: x * 100.0)):
319             format_string, val, transform = args
320             if val == float('-Inf'):
321                 continue
322             if transform:
323                 val = transform(val)
324             yield "# "+format_string.format(self._format(val))
325
326     def _recommend_gen(self):
327         # TODO recommend fixing job granularity if elapsed time is too short
328         return itertools.chain(
329             self._recommend_cpu(),
330             self._recommend_ram(),
331             self._recommend_keep_cache())
332
333     def _recommend_cpu(self):
334         """Recommend asking for 4 cores if max CPU usage was 333%"""
335
336         constraint_key = self._map_runtime_constraint('vcpus')
337         cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
338         if cpu_max_rate == float('-Inf') or cpu_max_rate == 0.0:
339             logger.warning('%s: no CPU usage data', self.label)
340             return
341         # TODO Don't necessarily want to recommend on isolated max peak
342         # take average CPU usage into account as well or % time at max
343         used_cores = max(1, int(math.ceil(cpu_max_rate)))
344         asked_cores = self.existing_constraints.get(constraint_key)
345         if asked_cores is None:
346             asked_cores = 1
347         # TODO: This should be more nuanced in cases where max >> avg
348         if used_cores < asked_cores:
349             yield (
350                 '#!! {} max CPU usage was {}% -- '
351                 'try reducing runtime_constraints to "{}":{}'
352             ).format(
353                 self.label,
354                 math.ceil(cpu_max_rate*100),
355                 constraint_key,
356                 int(used_cores))
357
358     # FIXME: This needs to be updated to account for current nodemanager algorithms
359     def _recommend_ram(self):
360         """Recommend an economical RAM constraint for this job.
361
362         Nodes that are advertised as "8 gibibytes" actually have what
363         we might call "8 nearlygibs" of memory available for jobs.
364         Here, we calculate a whole number of nearlygibs that would
365         have sufficed to run the job, then recommend requesting a node
366         with that number of nearlygibs (expressed as mebibytes).
367
368         Requesting a node with "nearly 8 gibibytes" is our best hope
369         of getting a node that actually has nearly 8 gibibytes
370         available.  If the node manager is smart enough to account for
371         the discrepancy itself when choosing/creating a node, we'll
372         get an 8 GiB node with nearly 8 GiB available.  Otherwise, the
373         advertised size of the next-size-smaller node (say, 6 GiB)
374         will be too low to satisfy our request, so we will effectively
375         get rounded up to 8 GiB.
376
377         For example, if we need 7500 MiB, we can ask for 7500 MiB, and
378         we will generally get a node that is advertised as "8 GiB" and
379         has at least 7500 MiB available.  However, asking for 8192 MiB
380         would either result in an unnecessarily expensive 12 GiB node
381         (if node manager knows about the discrepancy), or an 8 GiB
382         node which has less than 8192 MiB available and is therefore
383         considered by crunch-dispatch to be too small to meet our
384         constraint.
385
386         When node manager learns how to predict the available memory
387         for each node type such that crunch-dispatch always agrees
388         that a node is big enough to run the job it was brought up
389         for, all this will be unnecessary.  We'll just ask for exactly
390         the memory we want -- even if that happens to be 8192 MiB.
391         """
392
393         constraint_key = self._map_runtime_constraint('ram')
394         used_bytes = self.stats_max['mem']['rss']
395         if used_bytes == float('-Inf'):
396             logger.warning('%s: no memory usage data', self.label)
397             return
398         used_mib = math.ceil(float(used_bytes) / MB)
399         asked_mib = self.existing_constraints.get(constraint_key)
400
401         nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
402         if used_mib > 0 and (asked_mib is None or (
403                 math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib))):
404             yield (
405                 '#!! {} max RSS was {} MiB -- '
406                 'try reducing runtime_constraints to "{}":{}'
407             ).format(
408                 self.label,
409                 int(used_mib),
410                 constraint_key,
411                 int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024*(MB)/self._runtime_constraint_mem_unit()))
412
413     def _recommend_keep_cache(self):
414         """Recommend increasing keep cache if utilization < 80%"""
415         constraint_key = self._map_runtime_constraint('keep_cache_ram')
416         if self.job_tot['net:keep0']['rx'] == 0:
417             return
418         utilization = (float(self.job_tot['blkio:0:0']['read']) /
419                        float(self.job_tot['net:keep0']['rx']))
420         # FIXME: the default on this get won't work correctly
421         asked_cache = self.existing_constraints.get(constraint_key, 256) * self._runtime_constraint_mem_unit()
422
423         if utilization < 0.8:
424             yield (
425                 '#!! {} Keep cache utilization was {:.2f}% -- '
426                 'try doubling runtime_constraints to "{}":{} (or more)'
427             ).format(
428                 self.label,
429                 utilization * 100.0,
430                 constraint_key,
431                 math.ceil(asked_cache * 2 / self._runtime_constraint_mem_unit()))
432
433
434     def _format(self, val):
435         """Return a string representation of a stat.
436
437         {:.2f} for floats, default format for everything else."""
438         if isinstance(val, float):
439             return '{:.2f}'.format(val)
440         else:
441             return '{}'.format(val)
442
443     def _runtime_constraint_mem_unit(self):
444         if hasattr(self, 'runtime_constraint_mem_unit'):
445             return self.runtime_constraint_mem_unit
446         elif self.detected_crunch1:
447             return JobSummarizer.runtime_constraint_mem_unit
448         else:
449             return ContainerSummarizer.runtime_constraint_mem_unit
450
451     def _map_runtime_constraint(self, key):
452         if hasattr(self, 'map_runtime_constraint'):
453             return self.map_runtime_constraint[key]
454         elif self.detected_crunch1:
455             return JobSummarizer.map_runtime_constraint[key]
456         else:
457             return key
458
459
460 class CollectionSummarizer(Summarizer):
461     def __init__(self, collection_id, **kwargs):
462         super(CollectionSummarizer, self).__init__(
463             crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
464         self.label = collection_id
465
466
467 def NewSummarizer(process_or_uuid, **kwargs):
468     """Construct with the appropriate subclass for this uuid/object."""
469
470     if isinstance(process_or_uuid, dict):
471         process = process_or_uuid
472         uuid = process['uuid']
473     else:
474         uuid = process_or_uuid
475         process = None
476         arv = arvados.api('v1', model=OrderedJsonModel())
477
478     if '-dz642-' in uuid:
479         if process is None:
480             process = arv.containers().get(uuid=uuid).execute()
481         klass = ContainerTreeSummarizer
482     elif '-xvhdp-' in uuid:
483         if process is None:
484             process = arv.container_requests().get(uuid=uuid).execute()
485         klass = ContainerTreeSummarizer
486     elif '-8i9sb-' in uuid:
487         if process is None:
488             process = arv.jobs().get(uuid=uuid).execute()
489         klass = JobTreeSummarizer
490     elif '-d1hrv-' in uuid:
491         if process is None:
492             process = arv.pipeline_instances().get(uuid=uuid).execute()
493         klass = PipelineSummarizer
494     elif '-4zz18-' in uuid:
495         return CollectionSummarizer(collection_id=uuid)
496     else:
497         raise ArgumentError("Unrecognized uuid %s", uuid)
498     return klass(process, uuid=uuid, **kwargs)
499
500
501 class ProcessSummarizer(Summarizer):
502     """Process is a job, pipeline, container, or container request."""
503
504     def __init__(self, process, label=None, **kwargs):
505         rdr = None
506         self.process = process
507         if label is None:
508             label = self.process.get('name', self.process['uuid'])
509         if self.process.get('log'):
510             try:
511                 rdr = crunchstat_summary.reader.CollectionReader(self.process['log'])
512             except arvados.errors.NotFoundError as e:
513                 logger.warning("Trying event logs after failing to read "
514                                "log collection %s: %s", self.process['log'], e)
515         if rdr is None:
516             rdr = crunchstat_summary.reader.LiveLogReader(self.process['uuid'])
517             label = label + ' (partial)'
518         super(ProcessSummarizer, self).__init__(rdr, label=label, **kwargs)
519         self.existing_constraints = self.process.get('runtime_constraints', {})
520
521
522 class JobSummarizer(ProcessSummarizer):
523     runtime_constraint_mem_unit = MB
524     map_runtime_constraint = {
525         'keep_cache_ram': 'keep_cache_mb_per_task',
526         'ram': 'min_ram_mb_per_node',
527         'vcpus': 'min_cores_per_node',
528     }
529
530
531 class ContainerSummarizer(ProcessSummarizer):
532     runtime_constraint_mem_unit = 1
533
534
535 class MultiSummarizer(object):
536     def __init__(self, children={}, label=None, threads=1, **kwargs):
537         self.throttle = threading.Semaphore(threads)
538         self.children = children
539         self.label = label
540
541     def run_and_release(self, target, *args, **kwargs):
542         try:
543             return target(*args, **kwargs)
544         finally:
545             self.throttle.release()
546
547     def run(self):
548         threads = []
549         for child in self.children.values():
550             self.throttle.acquire()
551             t = threading.Thread(target=self.run_and_release, args=(child.run, ))
552             t.daemon = True
553             t.start()
554             threads.append(t)
555         for t in threads:
556             t.join()
557
558     def text_report(self):
559         txt = ''
560         d = self._descendants()
561         for child in d.values():
562             if len(d) > 1:
563                 txt += '### Summary for {} ({})\n'.format(
564                     child.label, child.process['uuid'])
565             txt += child.text_report()
566             txt += '\n'
567         return txt
568
569     def _descendants(self):
570         """Dict of self and all descendants.
571
572         Nodes with nothing of their own to report (like
573         MultiSummarizers) are omitted.
574         """
575         d = collections.OrderedDict()
576         for key, child in self.children.items():
577             if isinstance(child, Summarizer):
578                 d[key] = child
579             if isinstance(child, MultiSummarizer):
580                 d.update(child._descendants())
581         return d
582
583     def html_report(self):
584         return WEBCHART_CLASS(self.label, iter(self._descendants().values())).html()
585
586
587 class JobTreeSummarizer(MultiSummarizer):
588     """Summarizes a job and all children listed in its components field."""
589     def __init__(self, job, label=None, **kwargs):
590         arv = arvados.api('v1', model=OrderedJsonModel())
591         label = label or job.get('name', job['uuid'])
592         children = collections.OrderedDict()
593         children[job['uuid']] = JobSummarizer(job, label=label, **kwargs)
594         if job.get('components', None):
595             preloaded = {}
596             for j in arv.jobs().index(
597                     limit=len(job['components']),
598                     filters=[['uuid','in',list(job['components'].values())]]).execute()['items']:
599                 preloaded[j['uuid']] = j
600             for cname in sorted(job['components'].keys()):
601                 child_uuid = job['components'][cname]
602                 j = (preloaded.get(child_uuid) or
603                      arv.jobs().get(uuid=child_uuid).execute())
604                 children[child_uuid] = JobTreeSummarizer(job=j, label=cname, **kwargs)
605
606         super(JobTreeSummarizer, self).__init__(
607             children=children,
608             label=label,
609             **kwargs)
610
611
612 class PipelineSummarizer(MultiSummarizer):
613     def __init__(self, instance, **kwargs):
614         children = collections.OrderedDict()
615         for cname, component in instance['components'].items():
616             if 'job' not in component:
617                 logger.warning(
618                     "%s: skipping component with no job assigned", cname)
619             else:
620                 logger.info(
621                     "%s: job %s", cname, component['job']['uuid'])
622                 summarizer = JobTreeSummarizer(component['job'], label=cname, **kwargs)
623                 summarizer.label = '{} {}'.format(
624                     cname, component['job']['uuid'])
625                 children[cname] = summarizer
626         super(PipelineSummarizer, self).__init__(
627             children=children,
628             label=instance['uuid'],
629             **kwargs)
630
631
632 class ContainerTreeSummarizer(MultiSummarizer):
633     def __init__(self, root, skip_child_jobs=False, **kwargs):
634         arv = arvados.api('v1', model=OrderedJsonModel())
635
636         label = kwargs.pop('label', None) or root.get('name') or root['uuid']
637         root['name'] = label
638
639         children = collections.OrderedDict()
640         todo = collections.deque((root, ))
641         while len(todo) > 0:
642             current = todo.popleft()
643             label = current['name']
644             sort_key = current['created_at']
645             if current['uuid'].find('-xvhdp-') > 0:
646                 current = arv.containers().get(uuid=current['container_uuid']).execute()
647
648             summer = ContainerSummarizer(current, label=label, **kwargs)
649             summer.sort_key = sort_key
650             children[current['uuid']] = summer
651
652             page_filters = []
653             while True:
654                 child_crs = arv.container_requests().index(
655                     order=['uuid asc'],
656                     filters=page_filters+[
657                         ['requesting_container_uuid', '=', current['uuid']]],
658                 ).execute()
659                 if not child_crs['items']:
660                     break
661                 elif skip_child_jobs:
662                     logger.warning('%s: omitting stats from %d child containers'
663                                    ' because --skip-child-jobs flag is on',
664                                    label, child_crs['items_available'])
665                     break
666                 page_filters = [['uuid', '>', child_crs['items'][-1]['uuid']]]
667                 for cr in child_crs['items']:
668                     if cr['container_uuid']:
669                         logger.debug('%s: container req %s', current['uuid'], cr['uuid'])
670                         cr['name'] = cr.get('name') or cr['uuid']
671                         todo.append(cr)
672         sorted_children = collections.OrderedDict()
673         for uuid in sorted(list(children.keys()), key=lambda uuid: children[uuid].sort_key):
674             sorted_children[uuid] = children[uuid]
675         super(ContainerTreeSummarizer, self).__init__(
676             children=sorted_children,
677             label=root['name'],
678             **kwargs)