bf905a394606a4e9a1465979a575bf07e85e0657
[arvados.git] / tools / crunchstat-summary / crunchstat_summary / summarizer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 import arvados
6 import collections
7 import crunchstat_summary.dygraphs
8 import crunchstat_summary.reader
9 import datetime
10 import functools
11 import itertools
12 import math
13 import re
14 import sys
15 import threading
16 import _strptime
17
18 from arvados.api import OrderedJsonModel
19 from crunchstat_summary import logger
20
21 # Recommend memory constraints that are this multiple of an integral
22 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
23 # that have amounts like 7.5 GiB according to the kernel.)
24 AVAILABLE_RAM_RATIO = 0.95
25 MB=2**20
26
27 # Workaround datetime.datetime.strptime() thread-safety bug by calling
28 # it once before starting threads.  https://bugs.python.org/issue7980
29 datetime.datetime.strptime('1999-12-31_23:59:59', '%Y-%m-%d_%H:%M:%S')
30
31
32 WEBCHART_CLASS = crunchstat_summary.dygraphs.DygraphsChart
33
34
35 class Task(object):
36     def __init__(self):
37         self.starttime = None
38         self.finishtime = None
39         self.series = collections.defaultdict(list)
40
41
42 class Summarizer(object):
43     def __init__(self, logdata, label=None, skip_child_jobs=False, uuid=None, **kwargs):
44         self._logdata = logdata
45
46         self.uuid = uuid
47         self.label = label
48         self.starttime = None
49         self.finishtime = None
50         self._skip_child_jobs = skip_child_jobs
51
52         # stats_max: {category: {stat: val}}
53         self.stats_max = collections.defaultdict(
54             functools.partial(collections.defaultdict, lambda: 0))
55         # task_stats: {task_id: {category: {stat: val}}}
56         self.task_stats = collections.defaultdict(
57             functools.partial(collections.defaultdict, dict))
58
59         self.seq_to_uuid = {}
60         self.tasks = collections.defaultdict(Task)
61
62         # We won't bother recommending new runtime constraints if the
63         # constraints given when running the job are known to us and
64         # are already suitable.  If applicable, the subclass
65         # constructor will overwrite this with something useful.
66         self.existing_constraints = {}
67
68         logger.debug("%s: logdata %s", self.label, logdata)
69
70     def run(self):
71         logger.debug("%s: parsing logdata %s", self.label, self._logdata)
72         with self._logdata as logdata:
73             self._run(logdata)
74
75     def _run(self, logdata):
76         self.detected_crunch1 = False
77         for line in logdata:
78             if not self.detected_crunch1 and '-8i9sb-' in line:
79                 self.detected_crunch1 = True
80
81             if self.detected_crunch1:
82                 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
83                 if m:
84                     seq = int(m.group('seq'))
85                     uuid = m.group('task_uuid')
86                     self.seq_to_uuid[seq] = uuid
87                     logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
88                     continue
89
90                 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
91                 if m:
92                     task_id = self.seq_to_uuid[int(m.group('seq'))]
93                     elapsed = int(m.group('elapsed'))
94                     self.task_stats[task_id]['time'] = {'elapsed': elapsed}
95                     if elapsed > self.stats_max['time']['elapsed']:
96                         self.stats_max['time']['elapsed'] = elapsed
97                     continue
98
99                 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
100                 if m:
101                     uuid = m.group('uuid')
102                     if self._skip_child_jobs:
103                         logger.warning('%s: omitting stats from child job %s'
104                                        ' because --skip-child-jobs flag is on',
105                                        self.label, uuid)
106                         continue
107                     logger.debug('%s: follow %s', self.label, uuid)
108                     child_summarizer = ProcessSummarizer(uuid)
109                     child_summarizer.stats_max = self.stats_max
110                     child_summarizer.task_stats = self.task_stats
111                     child_summarizer.tasks = self.tasks
112                     child_summarizer.starttime = self.starttime
113                     child_summarizer.run()
114                     logger.debug('%s: done %s', self.label, uuid)
115                     continue
116
117                 # 2017-12-02_17:15:08 e51c5-8i9sb-mfp68stkxnqdd6m 63676 0 stderr crunchstat: keepcalls 0 put 2576 get -- interval 10.0000 seconds 0 put 2576 get
118                 m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr (?P<crunchstat>crunchstat: )(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
119                 if not m:
120                     continue
121             else:
122                 # crunch2
123                 # 2017-12-01T16:56:24.723509200Z crunchstat: keepcalls 0 put 3 get -- interval 10.0000 seconds 0 put 3 get
124                 m = re.search(r'^(?P<timestamp>\S+) (?P<crunchstat>crunchstat: )?(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
125                 if not m:
126                     continue
127
128             if self.label is None:
129                 try:
130                     self.label = m.group('job_uuid')
131                 except IndexError:
132                     self.label = 'container'
133             if m.group('category').endswith(':'):
134                 # "stderr crunchstat: notice: ..."
135                 continue
136             elif m.group('category') in ('error', 'caught'):
137                 continue
138             elif m.group('category') in ('read', 'open', 'cgroup', 'CID', 'Running'):
139                 # "stderr crunchstat: read /proc/1234/net/dev: ..."
140                 # (old logs are less careful with unprefixed error messages)
141                 continue
142
143             if self.detected_crunch1:
144                 task_id = self.seq_to_uuid[int(m.group('seq'))]
145             else:
146                 task_id = 'container'
147             task = self.tasks[task_id]
148
149             # Use the first and last crunchstat timestamps as
150             # approximations of starttime and finishtime.
151             timestamp = m.group('timestamp')
152             if timestamp[10:11] == '_':
153                 timestamp = datetime.datetime.strptime(
154                     timestamp, '%Y-%m-%d_%H:%M:%S')
155             elif timestamp[10:11] == 'T':
156                 timestamp = datetime.datetime.strptime(
157                     timestamp[:19], '%Y-%m-%dT%H:%M:%S')
158             else:
159                 raise ValueError("Cannot parse timestamp {!r}".format(
160                     timestamp))
161
162             if task.starttime is None:
163                 logger.debug('%s: task %s starttime %s',
164                              self.label, task_id, timestamp)
165             if task.starttime is None or timestamp < task.starttime:
166                 task.starttime = timestamp
167             if task.finishtime is None or timestamp > task.finishtime:
168                 task.finishtime = timestamp
169
170             if self.starttime is None or timestamp < task.starttime:
171                 self.starttime = timestamp
172             if self.finishtime is None or timestamp < task.finishtime:
173                 self.finishtime = timestamp
174
175             if (not self.detected_crunch1) and task.starttime is not None and task.finishtime is not None:
176                 elapsed = (task.finishtime - task.starttime).seconds
177                 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
178                 if elapsed > self.stats_max['time']['elapsed']:
179                     self.stats_max['time']['elapsed'] = elapsed
180
181             this_interval_s = None
182             for group in ['current', 'interval']:
183                 if not m.group(group):
184                     continue
185                 category = m.group('category')
186                 words = m.group(group).split(' ')
187                 stats = {}
188                 try:
189                     for val, stat in zip(words[::2], words[1::2]):
190                         if '.' in val:
191                             stats[stat] = float(val)
192                         else:
193                             stats[stat] = int(val)
194                 except ValueError as e:
195                     # If the line doesn't start with 'crunchstat:' we
196                     # might have mistaken an error message for a
197                     # structured crunchstat line.
198                     if m.group("crunchstat") is None or m.group("category") == "crunchstat":
199                         logger.warning("%s: log contains message\n  %s", self.label, line)
200                     else:
201                         logger.warning(
202                             '%s: Error parsing value %r (stat %r, category %r): %r',
203                             self.label, val, stat, category, e)
204                         logger.warning('%s', line)
205                     continue
206                 if 'user' in stats or 'sys' in stats:
207                     stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
208                 if 'tx' in stats or 'rx' in stats:
209                     stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
210                 for stat, val in stats.items():
211                     if group == 'interval':
212                         if stat == 'seconds':
213                             this_interval_s = val
214                             continue
215                         elif not (this_interval_s > 0):
216                             logger.error(
217                                 "BUG? interval stat given with duration {!r}".
218                                 format(this_interval_s))
219                             continue
220                         else:
221                             stat = stat + '__rate'
222                             val = val / this_interval_s
223                             if stat in ['user+sys__rate', 'tx+rx__rate']:
224                                 task.series[category, stat].append(
225                                     (timestamp - self.starttime, val))
226                     else:
227                         if stat in ['rss']:
228                             task.series[category, stat].append(
229                                 (timestamp - self.starttime, val))
230                         self.task_stats[task_id][category][stat] = val
231                     if val > self.stats_max[category][stat]:
232                         self.stats_max[category][stat] = val
233         logger.debug('%s: done parsing', self.label)
234
235         self.job_tot = collections.defaultdict(
236             functools.partial(collections.defaultdict, int))
237         for task_id, task_stat in self.task_stats.items():
238             for category, stat_last in task_stat.items():
239                 for stat, val in stat_last.items():
240                     if stat in ['cpus', 'cache', 'swap', 'rss']:
241                         # meaningless stats like 16 cpu cores x 5 tasks = 80
242                         continue
243                     self.job_tot[category][stat] += val
244         logger.debug('%s: done totals', self.label)
245
246     def long_label(self):
247         label = self.label
248         if hasattr(self, 'process') and self.process['uuid'] not in label:
249             label = '{} ({})'.format(label, self.process['uuid'])
250         if self.finishtime:
251             label += ' -- elapsed time '
252             s = (self.finishtime - self.starttime).total_seconds()
253             if s > 86400:
254                 label += '{}d'.format(int(s/86400))
255             if s > 3600:
256                 label += '{}h'.format(int(s/3600) % 24)
257             if s > 60:
258                 label += '{}m'.format(int(s/60) % 60)
259             label += '{}s'.format(int(s) % 60)
260         return label
261
262     def text_report(self):
263         if not self.tasks:
264             return "(no report generated)\n"
265         return "\n".join(itertools.chain(
266             self._text_report_gen(),
267             self._recommend_gen())) + "\n"
268
269     def html_report(self):
270         return WEBCHART_CLASS(self.label, [self]).html()
271
272     def _text_report_gen(self):
273         yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
274         for category, stat_max in sorted(self.stats_max.items()):
275             for stat, val in sorted(stat_max.items()):
276                 if stat.endswith('__rate'):
277                     continue
278                 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
279                 val = self._format(val)
280                 tot = self._format(self.job_tot[category].get(stat, '-'))
281                 yield "\t".join([category, stat, str(val), max_rate, tot])
282         for args in (
283                 ('Number of tasks: {}',
284                  len(self.tasks),
285                  None),
286                 ('Max CPU time spent by a single task: {}s',
287                  self.stats_max['cpu']['user+sys'],
288                  None),
289                 ('Max CPU usage in a single interval: {}%',
290                  self.stats_max['cpu']['user+sys__rate'],
291                  lambda x: x * 100),
292                 ('Overall CPU usage: {}%',
293                  float(self.job_tot['cpu']['user+sys']) /
294                  self.job_tot['time']['elapsed']
295                  if self.job_tot['time']['elapsed'] > 0 else 0,
296                  lambda x: x * 100),
297                 ('Max memory used by a single task: {}GB',
298                  self.stats_max['mem']['rss'],
299                  lambda x: x / 1e9),
300                 ('Max network traffic in a single task: {}GB',
301                  self.stats_max['net:eth0']['tx+rx'] +
302                  self.stats_max['net:keep0']['tx+rx'],
303                  lambda x: x / 1e9),
304                 ('Max network speed in a single interval: {}MB/s',
305                  self.stats_max['net:eth0']['tx+rx__rate'] +
306                  self.stats_max['net:keep0']['tx+rx__rate'],
307                  lambda x: x / 1e6),
308                 ('Keep cache miss rate {}%',
309                  (float(self.job_tot['keepcache']['miss']) /
310                  float(self.job_tot['keepcalls']['get']))
311                  if self.job_tot['keepcalls']['get'] > 0 else 0,
312                  lambda x: x * 100.0),
313                 ('Keep cache utilization {}%',
314                  (float(self.job_tot['blkio:0:0']['read']) /
315                  float(self.job_tot['net:keep0']['rx']))
316                  if self.job_tot['net:keep0']['rx'] > 0 else 0,
317                  lambda x: x * 100.0)):
318             format_string, val, transform = args
319             if val == float('-Inf'):
320                 continue
321             if transform:
322                 val = transform(val)
323             yield "# "+format_string.format(self._format(val))
324
325     def _recommend_gen(self):
326         # TODO recommend fixing job granularity if elapsed time is too short
327         return itertools.chain(
328             self._recommend_cpu(),
329             self._recommend_ram(),
330             self._recommend_keep_cache())
331
332     def _recommend_cpu(self):
333         """Recommend asking for 4 cores if max CPU usage was 333%"""
334
335         constraint_key = self._map_runtime_constraint('vcpus')
336         cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
337         if cpu_max_rate == float('-Inf') or cpu_max_rate == 0.0:
338             logger.warning('%s: no CPU usage data', self.label)
339             return
340         # TODO Don't necessarily want to recommend on isolated max peak
341         # take average CPU usage into account as well or % time at max
342         used_cores = max(1, int(math.ceil(cpu_max_rate)))
343         asked_cores = self.existing_constraints.get(constraint_key)
344         if asked_cores is None:
345             asked_cores = 1
346         # TODO: This should be more nuanced in cases where max >> avg
347         if used_cores < asked_cores:
348             yield (
349                 '#!! {} max CPU usage was {}% -- '
350                 'try reducing runtime_constraints to "{}":{}'
351             ).format(
352                 self.label,
353                 math.ceil(cpu_max_rate*100),
354                 constraint_key,
355                 int(used_cores))
356
357     # FIXME: This needs to be updated to account for current nodemanager algorithms
358     def _recommend_ram(self):
359         """Recommend an economical RAM constraint for this job.
360
361         Nodes that are advertised as "8 gibibytes" actually have what
362         we might call "8 nearlygibs" of memory available for jobs.
363         Here, we calculate a whole number of nearlygibs that would
364         have sufficed to run the job, then recommend requesting a node
365         with that number of nearlygibs (expressed as mebibytes).
366
367         Requesting a node with "nearly 8 gibibytes" is our best hope
368         of getting a node that actually has nearly 8 gibibytes
369         available.  If the node manager is smart enough to account for
370         the discrepancy itself when choosing/creating a node, we'll
371         get an 8 GiB node with nearly 8 GiB available.  Otherwise, the
372         advertised size of the next-size-smaller node (say, 6 GiB)
373         will be too low to satisfy our request, so we will effectively
374         get rounded up to 8 GiB.
375
376         For example, if we need 7500 MiB, we can ask for 7500 MiB, and
377         we will generally get a node that is advertised as "8 GiB" and
378         has at least 7500 MiB available.  However, asking for 8192 MiB
379         would either result in an unnecessarily expensive 12 GiB node
380         (if node manager knows about the discrepancy), or an 8 GiB
381         node which has less than 8192 MiB available and is therefore
382         considered by crunch-dispatch to be too small to meet our
383         constraint.
384
385         When node manager learns how to predict the available memory
386         for each node type such that crunch-dispatch always agrees
387         that a node is big enough to run the job it was brought up
388         for, all this will be unnecessary.  We'll just ask for exactly
389         the memory we want -- even if that happens to be 8192 MiB.
390         """
391
392         constraint_key = self._map_runtime_constraint('ram')
393         used_bytes = self.stats_max['mem']['rss']
394         if used_bytes == float('-Inf'):
395             logger.warning('%s: no memory usage data', self.label)
396             return
397         used_mib = math.ceil(float(used_bytes) / MB)
398         asked_mib = self.existing_constraints.get(constraint_key)
399
400         nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
401         if used_mib > 0 and (asked_mib is None or (
402                 math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib))):
403             yield (
404                 '#!! {} max RSS was {} MiB -- '
405                 'try reducing runtime_constraints to "{}":{}'
406             ).format(
407                 self.label,
408                 int(used_mib),
409                 constraint_key,
410                 int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024*(MB)/self._runtime_constraint_mem_unit()))
411
412     def _recommend_keep_cache(self):
413         """Recommend increasing keep cache if utilization < 80%"""
414         constraint_key = self._map_runtime_constraint('keep_cache_ram')
415         if self.job_tot['net:keep0']['rx'] == 0:
416             return
417         utilization = (float(self.job_tot['blkio:0:0']['read']) /
418                        float(self.job_tot['net:keep0']['rx']))
419         # FIXME: the default on this get won't work correctly
420         asked_cache = self.existing_constraints.get(constraint_key, 256) * self._runtime_constraint_mem_unit()
421
422         if utilization < 0.8:
423             yield (
424                 '#!! {} Keep cache utilization was {:.2f}% -- '
425                 'try doubling runtime_constraints to "{}":{} (or more)'
426             ).format(
427                 self.label,
428                 utilization * 100.0,
429                 constraint_key,
430                 math.ceil(asked_cache * 2 / self._runtime_constraint_mem_unit()))
431
432
433     def _format(self, val):
434         """Return a string representation of a stat.
435
436         {:.2f} for floats, default format for everything else."""
437         if isinstance(val, float):
438             return '{:.2f}'.format(val)
439         else:
440             return '{}'.format(val)
441
442     def _runtime_constraint_mem_unit(self):
443         if hasattr(self, 'runtime_constraint_mem_unit'):
444             return self.runtime_constraint_mem_unit
445         elif self.detected_crunch1:
446             return JobSummarizer.runtime_constraint_mem_unit
447         else:
448             return ContainerSummarizer.runtime_constraint_mem_unit
449
450     def _map_runtime_constraint(self, key):
451         if hasattr(self, 'map_runtime_constraint'):
452             return self.map_runtime_constraint[key]
453         elif self.detected_crunch1:
454             return JobSummarizer.map_runtime_constraint[key]
455         else:
456             return key
457
458
459 class CollectionSummarizer(Summarizer):
460     def __init__(self, collection_id, **kwargs):
461         super(CollectionSummarizer, self).__init__(
462             crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
463         self.label = collection_id
464
465
466 def NewSummarizer(process_or_uuid, **kwargs):
467     """Construct with the appropriate subclass for this uuid/object."""
468
469     if isinstance(process_or_uuid, dict):
470         process = process_or_uuid
471         uuid = process['uuid']
472     else:
473         uuid = process_or_uuid
474         process = None
475         arv = arvados.api('v1', model=OrderedJsonModel())
476
477     if '-dz642-' in uuid:
478         if process is None:
479             process = arv.containers().get(uuid=uuid).execute()
480         klass = ContainerTreeSummarizer
481     elif '-xvhdp-' in uuid:
482         if process is None:
483             process = arv.container_requests().get(uuid=uuid).execute()
484         klass = ContainerTreeSummarizer
485     elif '-8i9sb-' in uuid:
486         if process is None:
487             process = arv.jobs().get(uuid=uuid).execute()
488         klass = JobTreeSummarizer
489     elif '-d1hrv-' in uuid:
490         if process is None:
491             process = arv.pipeline_instances().get(uuid=uuid).execute()
492         klass = PipelineSummarizer
493     elif '-4zz18-' in uuid:
494         return CollectionSummarizer(collection_id=uuid)
495     else:
496         raise ArgumentError("Unrecognized uuid %s", uuid)
497     return klass(process, uuid=uuid, **kwargs)
498
499
500 class ProcessSummarizer(Summarizer):
501     """Process is a job, pipeline, container, or container request."""
502
503     def __init__(self, process, label=None, **kwargs):
504         rdr = None
505         self.process = process
506         if label is None:
507             label = self.process.get('name', self.process['uuid'])
508         if self.process.get('log'):
509             try:
510                 rdr = crunchstat_summary.reader.CollectionReader(self.process['log'])
511             except arvados.errors.NotFoundError as e:
512                 logger.warning("Trying event logs after failing to read "
513                                "log collection %s: %s", self.process['log'], e)
514         if rdr is None:
515             rdr = crunchstat_summary.reader.LiveLogReader(self.process['uuid'])
516             label = label + ' (partial)'
517         super(ProcessSummarizer, self).__init__(rdr, label=label, **kwargs)
518         self.existing_constraints = self.process.get('runtime_constraints', {})
519
520
521 class JobSummarizer(ProcessSummarizer):
522     runtime_constraint_mem_unit = MB
523     map_runtime_constraint = {
524         'keep_cache_ram': 'keep_cache_mb_per_task',
525         'ram': 'min_ram_mb_per_node',
526         'vcpus': 'min_cores_per_node',
527     }
528
529
530 class ContainerSummarizer(ProcessSummarizer):
531     runtime_constraint_mem_unit = 1
532
533
534 class MultiSummarizer(object):
535     def __init__(self, children={}, label=None, threads=1, **kwargs):
536         self.throttle = threading.Semaphore(threads)
537         self.children = children
538         self.label = label
539
540     def run_and_release(self, target, *args, **kwargs):
541         try:
542             return target(*args, **kwargs)
543         finally:
544             self.throttle.release()
545
546     def run(self):
547         threads = []
548         for child in self.children.values():
549             self.throttle.acquire()
550             t = threading.Thread(target=self.run_and_release, args=(child.run, ))
551             t.daemon = True
552             t.start()
553             threads.append(t)
554         for t in threads:
555             t.join()
556
557     def text_report(self):
558         txt = ''
559         d = self._descendants()
560         for child in d.values():
561             if len(d) > 1:
562                 txt += '### Summary for {} ({})\n'.format(
563                     child.label, child.process['uuid'])
564             txt += child.text_report()
565             txt += '\n'
566         return txt
567
568     def _descendants(self):
569         """Dict of self and all descendants.
570
571         Nodes with nothing of their own to report (like
572         MultiSummarizers) are omitted.
573         """
574         d = collections.OrderedDict()
575         for key, child in self.children.items():
576             if isinstance(child, Summarizer):
577                 d[key] = child
578             if isinstance(child, MultiSummarizer):
579                 d.update(child._descendants())
580         return d
581
582     def html_report(self):
583         return WEBCHART_CLASS(self.label, iter(self._descendants().values())).html()
584
585
586 class JobTreeSummarizer(MultiSummarizer):
587     """Summarizes a job and all children listed in its components field."""
588     def __init__(self, job, label=None, **kwargs):
589         arv = arvados.api('v1', model=OrderedJsonModel())
590         label = label or job.get('name', job['uuid'])
591         children = collections.OrderedDict()
592         children[job['uuid']] = JobSummarizer(job, label=label, **kwargs)
593         if job.get('components', None):
594             preloaded = {}
595             for j in arv.jobs().index(
596                     limit=len(job['components']),
597                     filters=[['uuid','in',list(job['components'].values())]]).execute()['items']:
598                 preloaded[j['uuid']] = j
599             for cname in sorted(job['components'].keys()):
600                 child_uuid = job['components'][cname]
601                 j = (preloaded.get(child_uuid) or
602                      arv.jobs().get(uuid=child_uuid).execute())
603                 children[child_uuid] = JobTreeSummarizer(job=j, label=cname, **kwargs)
604
605         super(JobTreeSummarizer, self).__init__(
606             children=children,
607             label=label,
608             **kwargs)
609
610
611 class PipelineSummarizer(MultiSummarizer):
612     def __init__(self, instance, **kwargs):
613         children = collections.OrderedDict()
614         for cname, component in instance['components'].items():
615             if 'job' not in component:
616                 logger.warning(
617                     "%s: skipping component with no job assigned", cname)
618             else:
619                 logger.info(
620                     "%s: job %s", cname, component['job']['uuid'])
621                 summarizer = JobTreeSummarizer(component['job'], label=cname, **kwargs)
622                 summarizer.label = '{} {}'.format(
623                     cname, component['job']['uuid'])
624                 children[cname] = summarizer
625         super(PipelineSummarizer, self).__init__(
626             children=children,
627             label=instance['uuid'],
628             **kwargs)
629
630
631 class ContainerTreeSummarizer(MultiSummarizer):
632     def __init__(self, root, skip_child_jobs=False, **kwargs):
633         arv = arvados.api('v1', model=OrderedJsonModel())
634
635         label = kwargs.pop('label', None) or root.get('name') or root['uuid']
636         root['name'] = label
637
638         children = collections.OrderedDict()
639         todo = collections.deque((root, ))
640         while len(todo) > 0:
641             current = todo.popleft()
642             label = current['name']
643             sort_key = current['created_at']
644             if current['uuid'].find('-xvhdp-') > 0:
645                 current = arv.containers().get(uuid=current['container_uuid']).execute()
646
647             summer = ContainerSummarizer(current, label=label, **kwargs)
648             summer.sort_key = sort_key
649             children[current['uuid']] = summer
650
651             page_filters = []
652             while True:
653                 child_crs = arv.container_requests().index(
654                     order=['uuid asc'],
655                     filters=page_filters+[
656                         ['requesting_container_uuid', '=', current['uuid']]],
657                 ).execute()
658                 if not child_crs['items']:
659                     break
660                 elif skip_child_jobs:
661                     logger.warning('%s: omitting stats from %d child containers'
662                                    ' because --skip-child-jobs flag is on',
663                                    label, child_crs['items_available'])
664                     break
665                 page_filters = [['uuid', '>', child_crs['items'][-1]['uuid']]]
666                 for cr in child_crs['items']:
667                     if cr['container_uuid']:
668                         logger.debug('%s: container req %s', current['uuid'], cr['uuid'])
669                         cr['name'] = cr.get('name') or cr['uuid']
670                         todo.append(cr)
671         sorted_children = collections.OrderedDict()
672         for uuid in sorted(list(children.keys()), key=lambda uuid: children[uuid].sort_key):
673             sorted_children[uuid] = children[uuid]
674         super(ContainerTreeSummarizer, self).__init__(
675             children=sorted_children,
676             label=root['name'],
677             **kwargs)