Do not pipe into `grep -q`, because that stops reading as soon as a
[arvados.git] / tools / crunchstat-summary / crunchstat_summary / summarizer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 import arvados
6 import collections
7 import crunchstat_summary.dygraphs
8 import crunchstat_summary.reader
9 import datetime
10 import functools
11 import itertools
12 import math
13 import re
14 import sys
15 import threading
16 import _strptime
17
18 from arvados.api import OrderedJsonModel
19 from crunchstat_summary import logger
20
21 # Recommend memory constraints that are this multiple of an integral
22 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
23 # that have amounts like 7.5 GiB according to the kernel.)
24 AVAILABLE_RAM_RATIO = 0.95
25 MB=2**20
26
27 # Workaround datetime.datetime.strptime() thread-safety bug by calling
28 # it once before starting threads.  https://bugs.python.org/issue7980
29 datetime.datetime.strptime('1999-12-31_23:59:59', '%Y-%m-%d_%H:%M:%S')
30
31
32 WEBCHART_CLASS = crunchstat_summary.dygraphs.DygraphsChart
33
34
35 class Task(object):
36     def __init__(self):
37         self.starttime = None
38         self.finishtime = None
39         self.series = collections.defaultdict(list)
40
41
42 class Summarizer(object):
43     def __init__(self, logdata, label=None, skip_child_jobs=False, uuid=None, **kwargs):
44         self._logdata = logdata
45
46         self.uuid = uuid
47         self.label = label
48         self.starttime = None
49         self.finishtime = None
50         self._skip_child_jobs = skip_child_jobs
51
52         # stats_max: {category: {stat: val}}
53         self.stats_max = collections.defaultdict(
54             functools.partial(collections.defaultdict, lambda: 0))
55         # task_stats: {task_id: {category: {stat: val}}}
56         self.task_stats = collections.defaultdict(
57             functools.partial(collections.defaultdict, dict))
58
59         self.seq_to_uuid = {}
60         self.tasks = collections.defaultdict(Task)
61
62         # We won't bother recommending new runtime constraints if the
63         # constraints given when running the job are known to us and
64         # are already suitable.  If applicable, the subclass
65         # constructor will overwrite this with something useful.
66         self.existing_constraints = {}
67
68         logger.debug("%s: logdata %s", self.label, logdata)
69
70     def run(self):
71         logger.debug("%s: parsing logdata %s", self.label, self._logdata)
72         with self._logdata as logdata:
73             self._run(logdata)
74
75     def _run(self, logdata):
76         self.detected_crunch1 = False
77         for line in logdata:
78             if not self.detected_crunch1 and '-8i9sb-' in line:
79                 self.detected_crunch1 = True
80
81             if self.detected_crunch1:
82                 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
83                 if m:
84                     seq = int(m.group('seq'))
85                     uuid = m.group('task_uuid')
86                     self.seq_to_uuid[seq] = uuid
87                     logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
88                     continue
89
90                 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
91                 if m:
92                     task_id = self.seq_to_uuid[int(m.group('seq'))]
93                     elapsed = int(m.group('elapsed'))
94                     self.task_stats[task_id]['time'] = {'elapsed': elapsed}
95                     if elapsed > self.stats_max['time']['elapsed']:
96                         self.stats_max['time']['elapsed'] = elapsed
97                     continue
98
99                 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
100                 if m:
101                     uuid = m.group('uuid')
102                     if self._skip_child_jobs:
103                         logger.warning('%s: omitting stats from child job %s'
104                                        ' because --skip-child-jobs flag is on',
105                                        self.label, uuid)
106                         continue
107                     logger.debug('%s: follow %s', self.label, uuid)
108                     child_summarizer = NewSummarizer(uuid)
109                     child_summarizer.stats_max = self.stats_max
110                     child_summarizer.task_stats = self.task_stats
111                     child_summarizer.tasks = self.tasks
112                     child_summarizer.starttime = self.starttime
113                     child_summarizer.run()
114                     logger.debug('%s: done %s', self.label, uuid)
115                     continue
116
117                 # 2017-12-02_17:15:08 e51c5-8i9sb-mfp68stkxnqdd6m 63676 0 stderr crunchstat: keepcalls 0 put 2576 get -- interval 10.0000 seconds 0 put 2576 get
118                 m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr (?P<crunchstat>crunchstat: )(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
119                 if not m:
120                     continue
121             else:
122                 # crunch2
123                 # 2017-12-01T16:56:24.723509200Z crunchstat: keepcalls 0 put 3 get -- interval 10.0000 seconds 0 put 3 get
124                 m = re.search(r'^(?P<timestamp>\S+) (?P<crunchstat>crunchstat: )?(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
125                 if not m:
126                     continue
127
128             if self.label is None:
129                 try:
130                     self.label = m.group('job_uuid')
131                 except IndexError:
132                     self.label = 'label #1'
133             category = m.group('category')
134             if category.endswith(':'):
135                 # "stderr crunchstat: notice: ..."
136                 continue
137             elif category in ('error', 'caught'):
138                 continue
139             elif category in ('read', 'open', 'cgroup', 'CID', 'Running'):
140                 # "stderr crunchstat: read /proc/1234/net/dev: ..."
141                 # (old logs are less careful with unprefixed error messages)
142                 continue
143
144             if self.detected_crunch1:
145                 task_id = self.seq_to_uuid[int(m.group('seq'))]
146             else:
147                 task_id = 'container'
148             task = self.tasks[task_id]
149
150             # Use the first and last crunchstat timestamps as
151             # approximations of starttime and finishtime.
152             timestamp = m.group('timestamp')
153             if timestamp[10:11] == '_':
154                 timestamp = datetime.datetime.strptime(
155                     timestamp, '%Y-%m-%d_%H:%M:%S')
156             elif timestamp[10:11] == 'T':
157                 timestamp = datetime.datetime.strptime(
158                     timestamp[:19], '%Y-%m-%dT%H:%M:%S')
159             else:
160                 raise ValueError("Cannot parse timestamp {!r}".format(
161                     timestamp))
162
163             if task.starttime is None:
164                 logger.debug('%s: task %s starttime %s',
165                              self.label, task_id, timestamp)
166             if task.starttime is None or timestamp < task.starttime:
167                 task.starttime = timestamp
168             if task.finishtime is None or timestamp > task.finishtime:
169                 task.finishtime = timestamp
170
171             if self.starttime is None or timestamp < self.starttime:
172                 self.starttime = timestamp
173             if self.finishtime is None or timestamp > self.finishtime:
174                 self.finishtime = timestamp
175
176             if (not self.detected_crunch1) and task.starttime is not None and task.finishtime is not None:
177                 elapsed = (task.finishtime - task.starttime).seconds
178                 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
179                 if elapsed > self.stats_max['time']['elapsed']:
180                     self.stats_max['time']['elapsed'] = elapsed
181
182             this_interval_s = None
183             for group in ['current', 'interval']:
184                 if not m.group(group):
185                     continue
186                 category = m.group('category')
187                 words = m.group(group).split(' ')
188                 stats = {}
189                 try:
190                     for val, stat in zip(words[::2], words[1::2]):
191                         if '.' in val:
192                             stats[stat] = float(val)
193                         else:
194                             stats[stat] = int(val)
195                 except ValueError as e:
196                     # If the line doesn't start with 'crunchstat:' we
197                     # might have mistaken an error message for a
198                     # structured crunchstat line.
199                     if m.group("crunchstat") is None or m.group("category") == "crunchstat":
200                         logger.warning("%s: log contains message\n  %s", self.label, line)
201                     else:
202                         logger.warning(
203                             '%s: Error parsing value %r (stat %r, category %r): %r',
204                             self.label, val, stat, category, e)
205                         logger.warning('%s', line)
206                     continue
207                 if 'user' in stats or 'sys' in stats:
208                     stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
209                 if 'tx' in stats or 'rx' in stats:
210                     stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
211                 if group == 'interval':
212                     if 'seconds' in stats:
213                         this_interval_s = stats.get('seconds',0)
214                         del stats['seconds']
215                         if this_interval_s <= 0:
216                             logger.error(
217                                 "BUG? interval stat given with duration {!r}".
218                                 format(this_interval_s))
219                     else:
220                         logger.error('BUG? interval stat missing duration')
221                 for stat, val in stats.items():
222                     if group == 'interval' and this_interval_s:
223                             stat = stat + '__rate'
224                             val = val / this_interval_s
225                             if stat in ['user+sys__rate', 'user__rate', 'sys__rate', 'tx+rx__rate', 'rx__rate', 'tx__rate']:
226                                 task.series[category, stat].append(
227                                     (timestamp - self.starttime, val))
228                     else:
229                         if stat in ['rss','used','total']:
230                             task.series[category, stat].append(
231                                 (timestamp - self.starttime, val))
232                         self.task_stats[task_id][category][stat] = val
233                     if val > self.stats_max[category][stat]:
234                         self.stats_max[category][stat] = val
235         logger.debug('%s: done parsing', self.label)
236
237         self.job_tot = collections.defaultdict(
238             functools.partial(collections.defaultdict, int))
239         for task_id, task_stat in self.task_stats.items():
240             for category, stat_last in task_stat.items():
241                 for stat, val in stat_last.items():
242                     if stat in ['cpus', 'cache', 'swap', 'rss']:
243                         # meaningless stats like 16 cpu cores x 5 tasks = 80
244                         continue
245                     self.job_tot[category][stat] += val
246         logger.debug('%s: done totals', self.label)
247
248     def long_label(self):
249         label = self.label
250         if hasattr(self, 'process') and self.process['uuid'] not in label:
251             label = '{} ({})'.format(label, self.process['uuid'])
252         if self.finishtime:
253             label += ' -- elapsed time '
254             s = (self.finishtime - self.starttime).total_seconds()
255             if s > 86400:
256                 label += '{}d'.format(int(s/86400))
257             if s > 3600:
258                 label += '{}h'.format(int(s/3600) % 24)
259             if s > 60:
260                 label += '{}m'.format(int(s/60) % 60)
261             label += '{}s'.format(int(s) % 60)
262         return label
263
264     def text_report(self):
265         if not self.tasks:
266             return "(no report generated)\n"
267         return "\n".join(itertools.chain(
268             self._text_report_gen(),
269             self._recommend_gen())) + "\n"
270
271     def html_report(self):
272         return WEBCHART_CLASS(self.label, [self]).html()
273
274     def _text_report_gen(self):
275         yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
276         for category, stat_max in sorted(self.stats_max.items()):
277             for stat, val in sorted(stat_max.items()):
278                 if stat.endswith('__rate'):
279                     continue
280                 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
281                 val = self._format(val)
282                 tot = self._format(self.job_tot[category].get(stat, '-'))
283                 yield "\t".join([category, stat, str(val), max_rate, tot])
284         for args in (
285                 ('Number of tasks: {}',
286                  len(self.tasks),
287                  None),
288                 ('Max CPU time spent by a single task: {}s',
289                  self.stats_max['cpu']['user+sys'],
290                  None),
291                 ('Max CPU usage in a single interval: {}%',
292                  self.stats_max['cpu']['user+sys__rate'],
293                  lambda x: x * 100),
294                 ('Overall CPU usage: {}%',
295                  float(self.job_tot['cpu']['user+sys']) /
296                  self.job_tot['time']['elapsed']
297                  if self.job_tot['time']['elapsed'] > 0 else 0,
298                  lambda x: x * 100),
299                 ('Max memory used by a single task: {}GB',
300                  self.stats_max['mem']['rss'],
301                  lambda x: x / 1e9),
302                 ('Max network traffic in a single task: {}GB',
303                  self.stats_max['net:eth0']['tx+rx'] +
304                  self.stats_max['net:keep0']['tx+rx'],
305                  lambda x: x / 1e9),
306                 ('Max network speed in a single interval: {}MB/s',
307                  self.stats_max['net:eth0']['tx+rx__rate'] +
308                  self.stats_max['net:keep0']['tx+rx__rate'],
309                  lambda x: x / 1e6),
310                 ('Keep cache miss rate {}%',
311                  (float(self.job_tot['keepcache']['miss']) /
312                  float(self.job_tot['keepcalls']['get']))
313                  if self.job_tot['keepcalls']['get'] > 0 else 0,
314                  lambda x: x * 100.0),
315                 ('Keep cache utilization {}%',
316                  (float(self.job_tot['blkio:0:0']['read']) /
317                  float(self.job_tot['net:keep0']['rx']))
318                  if self.job_tot['net:keep0']['rx'] > 0 else 0,
319                  lambda x: x * 100.0),
320                ('Temp disk utilization {}%',
321                  (float(self.job_tot['statfs']['used']) /
322                  float(self.job_tot['statfs']['total']))
323                  if self.job_tot['statfs']['total'] > 0 else 0,
324                  lambda x: x * 100.0),
325                 ):
326             format_string, val, transform = args
327             if val == float('-Inf'):
328                 continue
329             if transform:
330                 val = transform(val)
331             yield "# "+format_string.format(self._format(val))
332
333     def _recommend_gen(self):
334         # TODO recommend fixing job granularity if elapsed time is too short
335         return itertools.chain(
336             self._recommend_cpu(),
337             self._recommend_ram(),
338             self._recommend_keep_cache(),
339             self._recommend_temp_disk(),
340             )
341
342     def _recommend_cpu(self):
343         """Recommend asking for 4 cores if max CPU usage was 333%"""
344
345         constraint_key = self._map_runtime_constraint('vcpus')
346         cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
347         if cpu_max_rate == float('-Inf') or cpu_max_rate == 0.0:
348             logger.warning('%s: no CPU usage data', self.label)
349             return
350         # TODO Don't necessarily want to recommend on isolated max peak
351         # take average CPU usage into account as well or % time at max
352         used_cores = max(1, int(math.ceil(cpu_max_rate)))
353         asked_cores = self.existing_constraints.get(constraint_key)
354         if asked_cores is None:
355             asked_cores = 1
356         # TODO: This should be more nuanced in cases where max >> avg
357         if used_cores < asked_cores:
358             yield (
359                 '#!! {} max CPU usage was {}% -- '
360                 'try reducing runtime_constraints to "{}":{}'
361             ).format(
362                 self.label,
363                 math.ceil(cpu_max_rate*100),
364                 constraint_key,
365                 int(used_cores))
366
367     # FIXME: This needs to be updated to account for current a-d-c algorithms
368     def _recommend_ram(self):
369         """Recommend an economical RAM constraint for this job.
370
371         Nodes that are advertised as "8 gibibytes" actually have what
372         we might call "8 nearlygibs" of memory available for jobs.
373         Here, we calculate a whole number of nearlygibs that would
374         have sufficed to run the job, then recommend requesting a node
375         with that number of nearlygibs (expressed as mebibytes).
376
377         Requesting a node with "nearly 8 gibibytes" is our best hope
378         of getting a node that actually has nearly 8 gibibytes
379         available.  If the node manager is smart enough to account for
380         the discrepancy itself when choosing/creating a node, we'll
381         get an 8 GiB node with nearly 8 GiB available.  Otherwise, the
382         advertised size of the next-size-smaller node (say, 6 GiB)
383         will be too low to satisfy our request, so we will effectively
384         get rounded up to 8 GiB.
385
386         For example, if we need 7500 MiB, we can ask for 7500 MiB, and
387         we will generally get a node that is advertised as "8 GiB" and
388         has at least 7500 MiB available.  However, asking for 8192 MiB
389         would either result in an unnecessarily expensive 12 GiB node
390         (if node manager knows about the discrepancy), or an 8 GiB
391         node which has less than 8192 MiB available and is therefore
392         considered by crunch-dispatch to be too small to meet our
393         constraint.
394
395         When node manager learns how to predict the available memory
396         for each node type such that crunch-dispatch always agrees
397         that a node is big enough to run the job it was brought up
398         for, all this will be unnecessary.  We'll just ask for exactly
399         the memory we want -- even if that happens to be 8192 MiB.
400         """
401
402         constraint_key = self._map_runtime_constraint('ram')
403         used_bytes = self.stats_max['mem']['rss']
404         if used_bytes == float('-Inf'):
405             logger.warning('%s: no memory usage data', self.label)
406             return
407         used_mib = math.ceil(float(used_bytes) / MB)
408         asked_mib = self.existing_constraints.get(constraint_key)
409
410         nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
411         if used_mib > 0 and (asked_mib is None or (
412                 math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib))):
413             yield (
414                 '#!! {} max RSS was {} MiB -- '
415                 'try reducing runtime_constraints to "{}":{}'
416             ).format(
417                 self.label,
418                 int(used_mib),
419                 constraint_key,
420                 int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024*(MB)/self._runtime_constraint_mem_unit()))
421
422     def _recommend_keep_cache(self):
423         """Recommend increasing keep cache if utilization < 80%"""
424         constraint_key = self._map_runtime_constraint('keep_cache_ram')
425         if self.job_tot['net:keep0']['rx'] == 0:
426             return
427         utilization = (float(self.job_tot['blkio:0:0']['read']) /
428                        float(self.job_tot['net:keep0']['rx']))
429         # FIXME: the default on this get won't work correctly
430         asked_cache = self.existing_constraints.get(constraint_key, 256) * self._runtime_constraint_mem_unit()
431
432         if utilization < 0.8:
433             yield (
434                 '#!! {} Keep cache utilization was {:.2f}% -- '
435                 'try doubling runtime_constraints to "{}":{} (or more)'
436             ).format(
437                 self.label,
438                 utilization * 100.0,
439                 constraint_key,
440                 math.ceil(asked_cache * 2 / self._runtime_constraint_mem_unit()))
441
442
443     def _recommend_temp_disk(self):
444         """Recommend decreasing temp disk if utilization < 50%"""
445         total = float(self.job_tot['statfs']['total'])
446         utilization = (float(self.job_tot['statfs']['used']) / total) if total > 0 else 0.0
447
448         if utilization < 50.8 and total > 0:
449             yield (
450                 '#!! {} max temp disk utilization was {:.0f}% of {:.0f} MiB -- '
451                 'consider reducing "tmpdirMin" and/or "outdirMin"'
452             ).format(
453                 self.label,
454                 utilization * 100.0,
455                 total / MB)
456
457
458     def _format(self, val):
459         """Return a string representation of a stat.
460
461         {:.2f} for floats, default format for everything else."""
462         if isinstance(val, float):
463             return '{:.2f}'.format(val)
464         else:
465             return '{}'.format(val)
466
467     def _runtime_constraint_mem_unit(self):
468         if hasattr(self, 'runtime_constraint_mem_unit'):
469             return self.runtime_constraint_mem_unit
470         elif self.detected_crunch1:
471             return JobSummarizer.runtime_constraint_mem_unit
472         else:
473             return ContainerRequestSummarizer.runtime_constraint_mem_unit
474
475     def _map_runtime_constraint(self, key):
476         if hasattr(self, 'map_runtime_constraint'):
477             return self.map_runtime_constraint[key]
478         elif self.detected_crunch1:
479             return JobSummarizer.map_runtime_constraint[key]
480         else:
481             return key
482
483
484 class CollectionSummarizer(Summarizer):
485     def __init__(self, collection_id, **kwargs):
486         super(CollectionSummarizer, self).__init__(
487             crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
488         self.label = collection_id
489
490
491 def NewSummarizer(process_or_uuid, **kwargs):
492     """Construct with the appropriate subclass for this uuid/object."""
493
494     if isinstance(process_or_uuid, dict):
495         process = process_or_uuid
496         uuid = process['uuid']
497     else:
498         uuid = process_or_uuid
499         process = None
500         arv = arvados.api('v1', model=OrderedJsonModel())
501
502     if '-dz642-' in uuid:
503         if process is None:
504             # Get the associated CR. Doesn't matter which since they all have the same logs
505             crs = arv.container_requests().list(filters=[['container_uuid','=',uuid]],limit=1).execute()['items']
506             if len(crs) > 0:
507                 process = crs[0]
508         klass = ContainerRequestTreeSummarizer
509     elif '-xvhdp-' in uuid:
510         if process is None:
511             process = arv.container_requests().get(uuid=uuid).execute()
512         klass = ContainerRequestTreeSummarizer
513     elif '-8i9sb-' in uuid:
514         if process is None:
515             process = arv.jobs().get(uuid=uuid).execute()
516         klass = JobTreeSummarizer
517     elif '-d1hrv-' in uuid:
518         if process is None:
519             process = arv.pipeline_instances().get(uuid=uuid).execute()
520         klass = PipelineSummarizer
521     elif '-4zz18-' in uuid:
522         return CollectionSummarizer(collection_id=uuid)
523     else:
524         raise ArgumentError("Unrecognized uuid %s", uuid)
525     return klass(process, uuid=uuid, **kwargs)
526
527
528 class ProcessSummarizer(Summarizer):
529     """Process is a job, pipeline, or container request."""
530
531     def __init__(self, process, label=None, **kwargs):
532         rdr = None
533         self.process = process
534         if label is None:
535             label = self.process.get('name', self.process['uuid'])
536         # Pre-Arvados v1.4 everything is in 'log'
537         # For 1.4+ containers have no logs and container_requests have them in 'log_uuid', not 'log'
538         log_collection = self.process.get('log', self.process.get('log_uuid'))
539         if log_collection and self.process.get('state') != 'Uncommitted': # arvados.util.CR_UNCOMMITTED:
540             try:
541                 rdr = crunchstat_summary.reader.CollectionReader(log_collection)
542             except arvados.errors.NotFoundError as e:
543                 logger.warning("Trying event logs after failing to read "
544                                "log collection %s: %s", self.process['log'], e)
545         if rdr is None:
546             uuid = self.process.get('container_uuid', self.process.get('uuid'))
547             rdr = crunchstat_summary.reader.LiveLogReader(uuid)
548             label = label + ' (partial)'
549         super(ProcessSummarizer, self).__init__(rdr, label=label, **kwargs)
550         self.existing_constraints = self.process.get('runtime_constraints', {})
551
552
553 class JobSummarizer(ProcessSummarizer):
554     runtime_constraint_mem_unit = MB
555     map_runtime_constraint = {
556         'keep_cache_ram': 'keep_cache_mb_per_task',
557         'ram': 'min_ram_mb_per_node',
558         'vcpus': 'min_cores_per_node',
559     }
560
561
562 class ContainerRequestSummarizer(ProcessSummarizer):
563     runtime_constraint_mem_unit = 1
564
565
566 class MultiSummarizer(object):
567     def __init__(self, children={}, label=None, threads=1, **kwargs):
568         self.throttle = threading.Semaphore(threads)
569         self.children = children
570         self.label = label
571
572     def run_and_release(self, target, *args, **kwargs):
573         try:
574             return target(*args, **kwargs)
575         finally:
576             self.throttle.release()
577
578     def run(self):
579         threads = []
580         for child in self.children.values():
581             self.throttle.acquire()
582             t = threading.Thread(target=self.run_and_release, args=(child.run, ))
583             t.daemon = True
584             t.start()
585             threads.append(t)
586         for t in threads:
587             t.join()
588
589     def text_report(self):
590         txt = ''
591         d = self._descendants()
592         for child in d.values():
593             if len(d) > 1:
594                 txt += '### Summary for {} ({})\n'.format(
595                     child.label, child.process['uuid'])
596             txt += child.text_report()
597             txt += '\n'
598         return txt
599
600     def _descendants(self):
601         """Dict of self and all descendants.
602
603         Nodes with nothing of their own to report (like
604         MultiSummarizers) are omitted.
605         """
606         d = collections.OrderedDict()
607         for key, child in self.children.items():
608             if isinstance(child, Summarizer):
609                 d[key] = child
610             if isinstance(child, MultiSummarizer):
611                 d.update(child._descendants())
612         return d
613
614     def html_report(self):
615         return WEBCHART_CLASS(self.label, iter(self._descendants().values())).html()
616
617
618 class JobTreeSummarizer(MultiSummarizer):
619     """Summarizes a job and all children listed in its components field."""
620     def __init__(self, job, label=None, **kwargs):
621         arv = arvados.api('v1', model=OrderedJsonModel())
622         label = label or job.get('name', job['uuid'])
623         children = collections.OrderedDict()
624         children[job['uuid']] = JobSummarizer(job, label=label, **kwargs)
625         if job.get('components', None):
626             preloaded = {}
627             for j in arv.jobs().index(
628                     limit=len(job['components']),
629                     filters=[['uuid','in',list(job['components'].values())]]).execute()['items']:
630                 preloaded[j['uuid']] = j
631             for cname in sorted(job['components'].keys()):
632                 child_uuid = job['components'][cname]
633                 j = (preloaded.get(child_uuid) or
634                      arv.jobs().get(uuid=child_uuid).execute())
635                 children[child_uuid] = JobTreeSummarizer(job=j, label=cname, **kwargs)
636
637         super(JobTreeSummarizer, self).__init__(
638             children=children,
639             label=label,
640             **kwargs)
641
642
643 class PipelineSummarizer(MultiSummarizer):
644     def __init__(self, instance, **kwargs):
645         children = collections.OrderedDict()
646         for cname, component in instance['components'].items():
647             if 'job' not in component:
648                 logger.warning(
649                     "%s: skipping component with no job assigned", cname)
650             else:
651                 logger.info(
652                     "%s: job %s", cname, component['job']['uuid'])
653                 summarizer = JobTreeSummarizer(component['job'], label=cname, **kwargs)
654                 summarizer.label = '{} {}'.format(
655                     cname, component['job']['uuid'])
656                 children[cname] = summarizer
657         super(PipelineSummarizer, self).__init__(
658             children=children,
659             label=instance['uuid'],
660             **kwargs)
661
662
663 class ContainerRequestTreeSummarizer(MultiSummarizer):
664     def __init__(self, root, skip_child_jobs=False, **kwargs):
665         arv = arvados.api('v1', model=OrderedJsonModel())
666
667         label = kwargs.pop('label', None) or root.get('name') or root['uuid']
668         root['name'] = label
669
670         children = collections.OrderedDict()
671         todo = collections.deque((root, ))
672         while len(todo) > 0:
673             current = todo.popleft()
674             label = current['name']
675             sort_key = current['created_at']
676
677             summer = ContainerRequestSummarizer(current, label=label, **kwargs)
678             summer.sort_key = sort_key
679             children[current['uuid']] = summer
680
681             page_filters = []
682             while True:
683                 child_crs = arv.container_requests().index(
684                     order=['uuid asc'],
685                     filters=page_filters+[
686                         ['requesting_container_uuid', '=', current['container_uuid']]],
687                 ).execute()
688                 if not child_crs['items']:
689                     break
690                 elif skip_child_jobs:
691                     logger.warning('%s: omitting stats from %d child containers'
692                                    ' because --skip-child-jobs flag is on',
693                                    label, child_crs['items_available'])
694                     break
695                 page_filters = [['uuid', '>', child_crs['items'][-1]['uuid']]]
696                 for cr in child_crs['items']:
697                     if cr['container_uuid']:
698                         logger.debug('%s: container req %s', current['uuid'], cr['uuid'])
699                         cr['name'] = cr.get('name') or cr['uuid']
700                         todo.append(cr)
701         sorted_children = collections.OrderedDict()
702         for uuid in sorted(list(children.keys()), key=lambda uuid: children[uuid].sort_key):
703             sorted_children[uuid] = children[uuid]
704         super(ContainerRequestTreeSummarizer, self).__init__(
705             children=sorted_children,
706             label=root['name'],
707             **kwargs)