15554 - Fix crunchstat-summary for running containers
[arvados.git] / tools / crunchstat-summary / crunchstat_summary / summarizer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 import arvados
6 import collections
7 import arvados.util
8 import crunchstat_summary.dygraphs
9 import crunchstat_summary.reader
10 import datetime
11 import functools
12 import itertools
13 import math
14 import re
15 import sys
16 import threading
17 import _strptime
18
19 from arvados.api import OrderedJsonModel
20 from crunchstat_summary import logger
21
22 # Recommend memory constraints that are this multiple of an integral
23 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
24 # that have amounts like 7.5 GiB according to the kernel.)
25 AVAILABLE_RAM_RATIO = 0.95
26 MB=2**20
27
28 # Workaround datetime.datetime.strptime() thread-safety bug by calling
29 # it once before starting threads.  https://bugs.python.org/issue7980
30 datetime.datetime.strptime('1999-12-31_23:59:59', '%Y-%m-%d_%H:%M:%S')
31
32
33 WEBCHART_CLASS = crunchstat_summary.dygraphs.DygraphsChart
34
35
36 class Task(object):
37     def __init__(self):
38         self.starttime = None
39         self.finishtime = None
40         self.series = collections.defaultdict(list)
41
42
43 class Summarizer(object):
44     def __init__(self, logdata, label=None, skip_child_jobs=False, uuid=None, **kwargs):
45         self._logdata = logdata
46
47         self.uuid = uuid
48         self.label = label
49         self.starttime = None
50         self.finishtime = None
51         self._skip_child_jobs = skip_child_jobs
52
53         # stats_max: {category: {stat: val}}
54         self.stats_max = collections.defaultdict(
55             functools.partial(collections.defaultdict, lambda: 0))
56         # task_stats: {task_id: {category: {stat: val}}}
57         self.task_stats = collections.defaultdict(
58             functools.partial(collections.defaultdict, dict))
59
60         self.seq_to_uuid = {}
61         self.tasks = collections.defaultdict(Task)
62
63         # We won't bother recommending new runtime constraints if the
64         # constraints given when running the job are known to us and
65         # are already suitable.  If applicable, the subclass
66         # constructor will overwrite this with something useful.
67         self.existing_constraints = {}
68
69         logger.debug("%s: logdata %s", self.label, logdata)
70
71     def run(self):
72         logger.debug("%s: parsing logdata %s", self.label, self._logdata)
73         with self._logdata as logdata:
74             self._run(logdata)
75
76     def _run(self, logdata):
77         self.detected_crunch1 = False
78         for line in logdata:
79             if not self.detected_crunch1 and '-8i9sb-' in line:
80                 self.detected_crunch1 = True
81
82             if self.detected_crunch1:
83                 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
84                 if m:
85                     seq = int(m.group('seq'))
86                     uuid = m.group('task_uuid')
87                     self.seq_to_uuid[seq] = uuid
88                     logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
89                     continue
90
91                 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
92                 if m:
93                     task_id = self.seq_to_uuid[int(m.group('seq'))]
94                     elapsed = int(m.group('elapsed'))
95                     self.task_stats[task_id]['time'] = {'elapsed': elapsed}
96                     if elapsed > self.stats_max['time']['elapsed']:
97                         self.stats_max['time']['elapsed'] = elapsed
98                     continue
99
100                 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
101                 if m:
102                     uuid = m.group('uuid')
103                     if self._skip_child_jobs:
104                         logger.warning('%s: omitting stats from child job %s'
105                                        ' because --skip-child-jobs flag is on',
106                                        self.label, uuid)
107                         continue
108                     logger.debug('%s: follow %s', self.label, uuid)
109                     child_summarizer = ProcessSummarizer(uuid)
110                     child_summarizer.stats_max = self.stats_max
111                     child_summarizer.task_stats = self.task_stats
112                     child_summarizer.tasks = self.tasks
113                     child_summarizer.starttime = self.starttime
114                     child_summarizer.run()
115                     logger.debug('%s: done %s', self.label, uuid)
116                     continue
117
118                 # 2017-12-02_17:15:08 e51c5-8i9sb-mfp68stkxnqdd6m 63676 0 stderr crunchstat: keepcalls 0 put 2576 get -- interval 10.0000 seconds 0 put 2576 get
119                 m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr (?P<crunchstat>crunchstat: )(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
120                 if not m:
121                     continue
122             else:
123                 # crunch2
124                 # 2017-12-01T16:56:24.723509200Z crunchstat: keepcalls 0 put 3 get -- interval 10.0000 seconds 0 put 3 get
125                 m = re.search(r'^(?P<timestamp>\S+) (?P<crunchstat>crunchstat: )?(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
126                 if not m:
127                     continue
128
129             if self.label is None:
130                 try:
131                     self.label = m.group('job_uuid')
132                 except IndexError:
133                     self.label = 'label #1'
134             category = m.group('category')
135             if category.endswith(':'):
136                 # "stderr crunchstat: notice: ..."
137                 continue
138             elif category in ('error', 'caught'):
139                 continue
140             elif category in ('read', 'open', 'cgroup', 'CID', 'Running'):
141                 # "stderr crunchstat: read /proc/1234/net/dev: ..."
142                 # (old logs are less careful with unprefixed error messages)
143                 continue
144
145             if self.detected_crunch1:
146                 task_id = self.seq_to_uuid[int(m.group('seq'))]
147             else:
148                 task_id = 'container'
149             task = self.tasks[task_id]
150
151             # Use the first and last crunchstat timestamps as
152             # approximations of starttime and finishtime.
153             timestamp = m.group('timestamp')
154             if timestamp[10:11] == '_':
155                 timestamp = datetime.datetime.strptime(
156                     timestamp, '%Y-%m-%d_%H:%M:%S')
157             elif timestamp[10:11] == 'T':
158                 timestamp = datetime.datetime.strptime(
159                     timestamp[:19], '%Y-%m-%dT%H:%M:%S')
160             else:
161                 raise ValueError("Cannot parse timestamp {!r}".format(
162                     timestamp))
163
164             if task.starttime is None:
165                 logger.debug('%s: task %s starttime %s',
166                              self.label, task_id, timestamp)
167             if task.starttime is None or timestamp < task.starttime:
168                 task.starttime = timestamp
169             if task.finishtime is None or timestamp > task.finishtime:
170                 task.finishtime = timestamp
171
172             if self.starttime is None or timestamp < self.starttime:
173                 self.starttime = timestamp
174             if self.finishtime is None or timestamp > self.finishtime:
175                 self.finishtime = timestamp
176
177             if (not self.detected_crunch1) and task.starttime is not None and task.finishtime is not None:
178                 elapsed = (task.finishtime - task.starttime).seconds
179                 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
180                 if elapsed > self.stats_max['time']['elapsed']:
181                     self.stats_max['time']['elapsed'] = elapsed
182
183             this_interval_s = None
184             for group in ['current', 'interval']:
185                 if not m.group(group):
186                     continue
187                 category = m.group('category')
188                 words = m.group(group).split(' ')
189                 stats = {}
190                 try:
191                     for val, stat in zip(words[::2], words[1::2]):
192                         if '.' in val:
193                             stats[stat] = float(val)
194                         else:
195                             stats[stat] = int(val)
196                 except ValueError as e:
197                     # If the line doesn't start with 'crunchstat:' we
198                     # might have mistaken an error message for a
199                     # structured crunchstat line.
200                     if m.group("crunchstat") is None or m.group("category") == "crunchstat":
201                         logger.warning("%s: log contains message\n  %s", self.label, line)
202                     else:
203                         logger.warning(
204                             '%s: Error parsing value %r (stat %r, category %r): %r',
205                             self.label, val, stat, category, e)
206                         logger.warning('%s', line)
207                     continue
208                 if 'user' in stats or 'sys' in stats:
209                     stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
210                 if 'tx' in stats or 'rx' in stats:
211                     stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
212                 if group == 'interval':
213                     if 'seconds' in stats:
214                         this_interval_s = stats.get('seconds',0)
215                         del stats['seconds']
216                         if this_interval_s <= 0:
217                             logger.error(
218                                 "BUG? interval stat given with duration {!r}".
219                                 format(this_interval_s))
220                     else:
221                         logger.error('BUG? interval stat missing duration')
222                 for stat, val in stats.items():
223                     if group == 'interval' and this_interval_s:
224                             stat = stat + '__rate'
225                             val = val / this_interval_s
226                             if stat in ['user+sys__rate', 'user__rate', 'sys__rate', 'tx+rx__rate', 'rx__rate', 'tx__rate']:
227                                 task.series[category, stat].append(
228                                     (timestamp - self.starttime, val))
229                     else:
230                         if stat in ['rss','used','total']:
231                             task.series[category, stat].append(
232                                 (timestamp - self.starttime, val))
233                         self.task_stats[task_id][category][stat] = val
234                     if val > self.stats_max[category][stat]:
235                         self.stats_max[category][stat] = val
236         logger.debug('%s: done parsing', self.label)
237
238         self.job_tot = collections.defaultdict(
239             functools.partial(collections.defaultdict, int))
240         for task_id, task_stat in self.task_stats.items():
241             for category, stat_last in task_stat.items():
242                 for stat, val in stat_last.items():
243                     if stat in ['cpus', 'cache', 'swap', 'rss']:
244                         # meaningless stats like 16 cpu cores x 5 tasks = 80
245                         continue
246                     self.job_tot[category][stat] += val
247         logger.debug('%s: done totals', self.label)
248
249     def long_label(self):
250         label = self.label
251         if hasattr(self, 'process') and self.process['uuid'] not in label:
252             label = '{} ({})'.format(label, self.process['uuid'])
253         if self.finishtime:
254             label += ' -- elapsed time '
255             s = (self.finishtime - self.starttime).total_seconds()
256             if s > 86400:
257                 label += '{}d'.format(int(s/86400))
258             if s > 3600:
259                 label += '{}h'.format(int(s/3600) % 24)
260             if s > 60:
261                 label += '{}m'.format(int(s/60) % 60)
262             label += '{}s'.format(int(s) % 60)
263         return label
264
265     def text_report(self):
266         if not self.tasks:
267             return "(no report generated)\n"
268         return "\n".join(itertools.chain(
269             self._text_report_gen(),
270             self._recommend_gen())) + "\n"
271
272     def html_report(self):
273         return WEBCHART_CLASS(self.label, [self]).html()
274
275     def _text_report_gen(self):
276         yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
277         for category, stat_max in sorted(self.stats_max.items()):
278             for stat, val in sorted(stat_max.items()):
279                 if stat.endswith('__rate'):
280                     continue
281                 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
282                 val = self._format(val)
283                 tot = self._format(self.job_tot[category].get(stat, '-'))
284                 yield "\t".join([category, stat, str(val), max_rate, tot])
285         for args in (
286                 ('Number of tasks: {}',
287                  len(self.tasks),
288                  None),
289                 ('Max CPU time spent by a single task: {}s',
290                  self.stats_max['cpu']['user+sys'],
291                  None),
292                 ('Max CPU usage in a single interval: {}%',
293                  self.stats_max['cpu']['user+sys__rate'],
294                  lambda x: x * 100),
295                 ('Overall CPU usage: {}%',
296                  float(self.job_tot['cpu']['user+sys']) /
297                  self.job_tot['time']['elapsed']
298                  if self.job_tot['time']['elapsed'] > 0 else 0,
299                  lambda x: x * 100),
300                 ('Max memory used by a single task: {}GB',
301                  self.stats_max['mem']['rss'],
302                  lambda x: x / 1e9),
303                 ('Max network traffic in a single task: {}GB',
304                  self.stats_max['net:eth0']['tx+rx'] +
305                  self.stats_max['net:keep0']['tx+rx'],
306                  lambda x: x / 1e9),
307                 ('Max network speed in a single interval: {}MB/s',
308                  self.stats_max['net:eth0']['tx+rx__rate'] +
309                  self.stats_max['net:keep0']['tx+rx__rate'],
310                  lambda x: x / 1e6),
311                 ('Keep cache miss rate {}%',
312                  (float(self.job_tot['keepcache']['miss']) /
313                  float(self.job_tot['keepcalls']['get']))
314                  if self.job_tot['keepcalls']['get'] > 0 else 0,
315                  lambda x: x * 100.0),
316                 ('Keep cache utilization {}%',
317                  (float(self.job_tot['blkio:0:0']['read']) /
318                  float(self.job_tot['net:keep0']['rx']))
319                  if self.job_tot['net:keep0']['rx'] > 0 else 0,
320                  lambda x: x * 100.0),
321                ('Temp disk utilization {}%',
322                  (float(self.job_tot['statfs']['used']) /
323                  float(self.job_tot['statfs']['total']))
324                  if self.job_tot['statfs']['total'] > 0 else 0,
325                  lambda x: x * 100.0),
326                 ):
327             format_string, val, transform = args
328             if val == float('-Inf'):
329                 continue
330             if transform:
331                 val = transform(val)
332             yield "# "+format_string.format(self._format(val))
333
334     def _recommend_gen(self):
335         # TODO recommend fixing job granularity if elapsed time is too short
336         return itertools.chain(
337             self._recommend_cpu(),
338             self._recommend_ram(),
339             self._recommend_keep_cache(),
340             self._recommend_temp_disk(),
341             )
342
343     def _recommend_cpu(self):
344         """Recommend asking for 4 cores if max CPU usage was 333%"""
345
346         constraint_key = self._map_runtime_constraint('vcpus')
347         cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
348         if cpu_max_rate == float('-Inf') or cpu_max_rate == 0.0:
349             logger.warning('%s: no CPU usage data', self.label)
350             return
351         # TODO Don't necessarily want to recommend on isolated max peak
352         # take average CPU usage into account as well or % time at max
353         used_cores = max(1, int(math.ceil(cpu_max_rate)))
354         asked_cores = self.existing_constraints.get(constraint_key)
355         if asked_cores is None:
356             asked_cores = 1
357         # TODO: This should be more nuanced in cases where max >> avg
358         if used_cores < asked_cores:
359             yield (
360                 '#!! {} max CPU usage was {}% -- '
361                 'try reducing runtime_constraints to "{}":{}'
362             ).format(
363                 self.label,
364                 math.ceil(cpu_max_rate*100),
365                 constraint_key,
366                 int(used_cores))
367
368     # FIXME: This needs to be updated to account for current nodemanager algorithms
369     def _recommend_ram(self):
370         """Recommend an economical RAM constraint for this job.
371
372         Nodes that are advertised as "8 gibibytes" actually have what
373         we might call "8 nearlygibs" of memory available for jobs.
374         Here, we calculate a whole number of nearlygibs that would
375         have sufficed to run the job, then recommend requesting a node
376         with that number of nearlygibs (expressed as mebibytes).
377
378         Requesting a node with "nearly 8 gibibytes" is our best hope
379         of getting a node that actually has nearly 8 gibibytes
380         available.  If the node manager is smart enough to account for
381         the discrepancy itself when choosing/creating a node, we'll
382         get an 8 GiB node with nearly 8 GiB available.  Otherwise, the
383         advertised size of the next-size-smaller node (say, 6 GiB)
384         will be too low to satisfy our request, so we will effectively
385         get rounded up to 8 GiB.
386
387         For example, if we need 7500 MiB, we can ask for 7500 MiB, and
388         we will generally get a node that is advertised as "8 GiB" and
389         has at least 7500 MiB available.  However, asking for 8192 MiB
390         would either result in an unnecessarily expensive 12 GiB node
391         (if node manager knows about the discrepancy), or an 8 GiB
392         node which has less than 8192 MiB available and is therefore
393         considered by crunch-dispatch to be too small to meet our
394         constraint.
395
396         When node manager learns how to predict the available memory
397         for each node type such that crunch-dispatch always agrees
398         that a node is big enough to run the job it was brought up
399         for, all this will be unnecessary.  We'll just ask for exactly
400         the memory we want -- even if that happens to be 8192 MiB.
401         """
402
403         constraint_key = self._map_runtime_constraint('ram')
404         used_bytes = self.stats_max['mem']['rss']
405         if used_bytes == float('-Inf'):
406             logger.warning('%s: no memory usage data', self.label)
407             return
408         used_mib = math.ceil(float(used_bytes) / MB)
409         asked_mib = self.existing_constraints.get(constraint_key)
410
411         nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
412         if used_mib > 0 and (asked_mib is None or (
413                 math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib))):
414             yield (
415                 '#!! {} max RSS was {} MiB -- '
416                 'try reducing runtime_constraints to "{}":{}'
417             ).format(
418                 self.label,
419                 int(used_mib),
420                 constraint_key,
421                 int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024*(MB)/self._runtime_constraint_mem_unit()))
422
423     def _recommend_keep_cache(self):
424         """Recommend increasing keep cache if utilization < 80%"""
425         constraint_key = self._map_runtime_constraint('keep_cache_ram')
426         if self.job_tot['net:keep0']['rx'] == 0:
427             return
428         utilization = (float(self.job_tot['blkio:0:0']['read']) /
429                        float(self.job_tot['net:keep0']['rx']))
430         # FIXME: the default on this get won't work correctly
431         asked_cache = self.existing_constraints.get(constraint_key, 256) * self._runtime_constraint_mem_unit()
432
433         if utilization < 0.8:
434             yield (
435                 '#!! {} Keep cache utilization was {:.2f}% -- '
436                 'try doubling runtime_constraints to "{}":{} (or more)'
437             ).format(
438                 self.label,
439                 utilization * 100.0,
440                 constraint_key,
441                 math.ceil(asked_cache * 2 / self._runtime_constraint_mem_unit()))
442
443
444     def _recommend_temp_disk(self):
445         """Recommend decreasing temp disk if utilization < 50%"""
446         total = float(self.job_tot['statfs']['total'])
447         utilization = (float(self.job_tot['statfs']['used']) / total) if total > 0 else 0.0
448
449         if utilization < 50.8 and total > 0:
450             yield (
451                 '#!! {} max temp disk utilization was {:.0f}% of {:.0f} MiB -- '
452                 'consider reducing "tmpdirMin" and/or "outdirMin"'
453             ).format(
454                 self.label,
455                 utilization * 100.0,
456                 total / MB)
457
458
459     def _format(self, val):
460         """Return a string representation of a stat.
461
462         {:.2f} for floats, default format for everything else."""
463         if isinstance(val, float):
464             return '{:.2f}'.format(val)
465         else:
466             return '{}'.format(val)
467
468     def _runtime_constraint_mem_unit(self):
469         if hasattr(self, 'runtime_constraint_mem_unit'):
470             return self.runtime_constraint_mem_unit
471         elif self.detected_crunch1:
472             return JobSummarizer.runtime_constraint_mem_unit
473         else:
474             return ContainerRequestSummarizer.runtime_constraint_mem_unit
475
476     def _map_runtime_constraint(self, key):
477         if hasattr(self, 'map_runtime_constraint'):
478             return self.map_runtime_constraint[key]
479         elif self.detected_crunch1:
480             return JobSummarizer.map_runtime_constraint[key]
481         else:
482             return key
483
484
485 class CollectionSummarizer(Summarizer):
486     def __init__(self, collection_id, **kwargs):
487         super(CollectionSummarizer, self).__init__(
488             crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
489         self.label = collection_id
490
491
492 def NewSummarizer(process_or_uuid, **kwargs):
493     """Construct with the appropriate subclass for this uuid/object."""
494
495     if isinstance(process_or_uuid, dict):
496         process = process_or_uuid
497         uuid = process['uuid']
498     else:
499         uuid = process_or_uuid
500         process = None
501         arv = arvados.api('v1', model=OrderedJsonModel())
502
503     if '-dz642-' in uuid:
504         if process is None:
505             # Get the associated CR. Doesn't matter which since they all have the same logs
506             crs = arv.container_requests().list(filters=[['container_uuid','=',uuid]],limit=1).execute()['items']
507             if len(crs) > 0:
508                 process = crs[0]
509         klass = ContainerRequestTreeSummarizer
510     elif '-xvhdp-' in uuid:
511         if process is None:
512             process = arv.container_requests().get(uuid=uuid).execute()
513         klass = ContainerRequestTreeSummarizer
514     elif '-8i9sb-' in uuid:
515         if process is None:
516             process = arv.jobs().get(uuid=uuid).execute()
517         klass = JobTreeSummarizer
518     elif '-d1hrv-' in uuid:
519         if process is None:
520             process = arv.pipeline_instances().get(uuid=uuid).execute()
521         klass = PipelineSummarizer
522     elif '-4zz18-' in uuid:
523         return CollectionSummarizer(collection_id=uuid)
524     else:
525         raise ArgumentError("Unrecognized uuid %s", uuid)
526     return klass(process, uuid=uuid, **kwargs)
527
528
529 class ProcessSummarizer(Summarizer):
530     """Process is a job, pipeline, or container request."""
531
532     def __init__(self, process, label=None, **kwargs):
533         rdr = None
534         self.process = process
535         if label is None:
536             label = self.process.get('name', self.process['uuid'])
537         # Pre-Arvados v1.4 everything is in 'log'
538         # For 1.4+ containers have no logs and container_requests have them in 'log_uuid', not 'log'
539         log_collection = self.process.get('log', self.process.get('log_uuid'))
540         if log_collection and self.process.get('state') != arvados.util.CR_UNCOMMITTED:
541             try:
542                 rdr = crunchstat_summary.reader.CollectionReader(log_collection)
543             except arvados.errors.NotFoundError as e:
544                 logger.warning("Trying event logs after failing to read "
545                                "log collection %s: %s", self.process['log'], e)
546         if rdr is None:
547             uuid = self.process.get('container_uuid', self.process.get('uuid'))
548             rdr = crunchstat_summary.reader.LiveLogReader(uuid)
549             label = label + ' (partial)'
550         super(ProcessSummarizer, self).__init__(rdr, label=label, **kwargs)
551         self.existing_constraints = self.process.get('runtime_constraints', {})
552
553
554 class JobSummarizer(ProcessSummarizer):
555     runtime_constraint_mem_unit = MB
556     map_runtime_constraint = {
557         'keep_cache_ram': 'keep_cache_mb_per_task',
558         'ram': 'min_ram_mb_per_node',
559         'vcpus': 'min_cores_per_node',
560     }
561
562
563 class ContainerRequestSummarizer(ProcessSummarizer):
564     runtime_constraint_mem_unit = 1
565
566
567 class MultiSummarizer(object):
568     def __init__(self, children={}, label=None, threads=1, **kwargs):
569         self.throttle = threading.Semaphore(threads)
570         self.children = children
571         self.label = label
572
573     def run_and_release(self, target, *args, **kwargs):
574         try:
575             return target(*args, **kwargs)
576         finally:
577             self.throttle.release()
578
579     def run(self):
580         threads = []
581         for child in self.children.values():
582             self.throttle.acquire()
583             t = threading.Thread(target=self.run_and_release, args=(child.run, ))
584             t.daemon = True
585             t.start()
586             threads.append(t)
587         for t in threads:
588             t.join()
589
590     def text_report(self):
591         txt = ''
592         d = self._descendants()
593         for child in d.values():
594             if len(d) > 1:
595                 txt += '### Summary for {} ({})\n'.format(
596                     child.label, child.process['uuid'])
597             txt += child.text_report()
598             txt += '\n'
599         return txt
600
601     def _descendants(self):
602         """Dict of self and all descendants.
603
604         Nodes with nothing of their own to report (like
605         MultiSummarizers) are omitted.
606         """
607         d = collections.OrderedDict()
608         for key, child in self.children.items():
609             if isinstance(child, Summarizer):
610                 d[key] = child
611             if isinstance(child, MultiSummarizer):
612                 d.update(child._descendants())
613         return d
614
615     def html_report(self):
616         return WEBCHART_CLASS(self.label, iter(self._descendants().values())).html()
617
618
619 class JobTreeSummarizer(MultiSummarizer):
620     """Summarizes a job and all children listed in its components field."""
621     def __init__(self, job, label=None, **kwargs):
622         arv = arvados.api('v1', model=OrderedJsonModel())
623         label = label or job.get('name', job['uuid'])
624         children = collections.OrderedDict()
625         children[job['uuid']] = JobSummarizer(job, label=label, **kwargs)
626         if job.get('components', None):
627             preloaded = {}
628             for j in arv.jobs().index(
629                     limit=len(job['components']),
630                     filters=[['uuid','in',list(job['components'].values())]]).execute()['items']:
631                 preloaded[j['uuid']] = j
632             for cname in sorted(job['components'].keys()):
633                 child_uuid = job['components'][cname]
634                 j = (preloaded.get(child_uuid) or
635                      arv.jobs().get(uuid=child_uuid).execute())
636                 children[child_uuid] = JobTreeSummarizer(job=j, label=cname, **kwargs)
637
638         super(JobTreeSummarizer, self).__init__(
639             children=children,
640             label=label,
641             **kwargs)
642
643
644 class PipelineSummarizer(MultiSummarizer):
645     def __init__(self, instance, **kwargs):
646         children = collections.OrderedDict()
647         for cname, component in instance['components'].items():
648             if 'job' not in component:
649                 logger.warning(
650                     "%s: skipping component with no job assigned", cname)
651             else:
652                 logger.info(
653                     "%s: job %s", cname, component['job']['uuid'])
654                 summarizer = JobTreeSummarizer(component['job'], label=cname, **kwargs)
655                 summarizer.label = '{} {}'.format(
656                     cname, component['job']['uuid'])
657                 children[cname] = summarizer
658         super(PipelineSummarizer, self).__init__(
659             children=children,
660             label=instance['uuid'],
661             **kwargs)
662
663
664 class ContainerRequestTreeSummarizer(MultiSummarizer):
665     def __init__(self, root, skip_child_jobs=False, **kwargs):
666         arv = arvados.api('v1', model=OrderedJsonModel())
667
668         label = kwargs.pop('label', None) or root.get('name') or root['uuid']
669         root['name'] = label
670
671         children = collections.OrderedDict()
672         todo = collections.deque((root, ))
673         while len(todo) > 0:
674             current = todo.popleft()
675             label = current['name']
676             sort_key = current['created_at']
677
678             summer = ContainerRequestSummarizer(current, label=label, **kwargs)
679             summer.sort_key = sort_key
680             children[current['uuid']] = summer
681
682             page_filters = []
683             while True:
684                 child_crs = arv.container_requests().index(
685                     order=['uuid asc'],
686                     filters=page_filters+[
687                         ['requesting_container_uuid', '=', current['container_uuid']]],
688                 ).execute()
689                 if not child_crs['items']:
690                     break
691                 elif skip_child_jobs:
692                     logger.warning('%s: omitting stats from %d child containers'
693                                    ' because --skip-child-jobs flag is on',
694                                    label, child_crs['items_available'])
695                     break
696                 page_filters = [['uuid', '>', child_crs['items'][-1]['uuid']]]
697                 for cr in child_crs['items']:
698                     if cr['container_uuid']:
699                         logger.debug('%s: container req %s', current['uuid'], cr['uuid'])
700                         cr['name'] = cr.get('name') or cr['uuid']
701                         todo.append(cr)
702         sorted_children = collections.OrderedDict()
703         for uuid in sorted(list(children.keys()), key=lambda uuid: children[uuid].sort_key):
704             sorted_children[uuid] = children[uuid]
705         super(ContainerRequestTreeSummarizer, self).__init__(
706             children=sorted_children,
707             label=root['name'],
708             **kwargs)