1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
7 import crunchstat_summary.dygraphs
8 import crunchstat_summary.reader
18 from crunchstat_summary import logger
20 # Recommend memory constraints that are this multiple of an integral
21 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
22 # that have amounts like 7.5 GiB according to the kernel.)
23 AVAILABLE_RAM_RATIO = 0.90
26 # Workaround datetime.datetime.strptime() thread-safety bug by calling
27 # it once before starting threads. https://bugs.python.org/issue7980
28 datetime.datetime.strptime('1999-12-31_23:59:59', '%Y-%m-%d_%H:%M:%S')
31 WEBCHART_CLASS = crunchstat_summary.dygraphs.DygraphsChart
37 self.finishtime = None
38 self.series = collections.defaultdict(list)
41 class Summarizer(object):
42 def __init__(self, logdata, label=None, skip_child_jobs=False, uuid=None, **kwargs):
43 self._logdata = logdata
48 self.finishtime = None
49 self._skip_child_jobs = skip_child_jobs
51 # stats_max: {category: {stat: val}}
52 self.stats_max = collections.defaultdict(
53 functools.partial(collections.defaultdict, lambda: 0))
54 # task_stats: {task_id: {category: {stat: val}}}
55 self.task_stats = collections.defaultdict(
56 functools.partial(collections.defaultdict, dict))
59 self.tasks = collections.defaultdict(Task)
61 # We won't bother recommending new runtime constraints if the
62 # constraints given when running the job are known to us and
63 # are already suitable. If applicable, the subclass
64 # constructor will overwrite this with something useful.
65 self.existing_constraints = {}
67 logger.debug("%s: logdata %s", self.label, logdata)
70 logger.debug("%s: parsing logdata %s", self.label, self._logdata)
71 with self._logdata as logdata:
74 def _run(self, logdata):
75 self.detected_crunch1 = False
77 if not self.detected_crunch1 and '-8i9sb-' in line:
78 self.detected_crunch1 = True
80 if self.detected_crunch1:
81 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
83 seq = int(m.group('seq'))
84 uuid = m.group('task_uuid')
85 self.seq_to_uuid[seq] = uuid
86 logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
89 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
91 task_id = self.seq_to_uuid[int(m.group('seq'))]
92 elapsed = int(m.group('elapsed'))
93 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
94 if elapsed > self.stats_max['time']['elapsed']:
95 self.stats_max['time']['elapsed'] = elapsed
98 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
100 uuid = m.group('uuid')
101 if self._skip_child_jobs:
102 logger.warning('%s: omitting stats from child job %s'
103 ' because --skip-child-jobs flag is on',
106 logger.debug('%s: follow %s', self.label, uuid)
107 child_summarizer = NewSummarizer(uuid)
108 child_summarizer.stats_max = self.stats_max
109 child_summarizer.task_stats = self.task_stats
110 child_summarizer.tasks = self.tasks
111 child_summarizer.starttime = self.starttime
112 child_summarizer.run()
113 logger.debug('%s: done %s', self.label, uuid)
116 # 2017-12-02_17:15:08 e51c5-8i9sb-mfp68stkxnqdd6m 63676 0 stderr crunchstat: keepcalls 0 put 2576 get -- interval 10.0000 seconds 0 put 2576 get
117 m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr (?P<crunchstat>crunchstat: )(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
122 # 2017-12-01T16:56:24.723509200Z crunchstat: keepcalls 0 put 3 get -- interval 10.0000 seconds 0 put 3 get
123 m = re.search(r'^(?P<timestamp>\S+) (?P<crunchstat>crunchstat: )?(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
127 if self.label is None:
129 self.label = m.group('job_uuid')
131 self.label = 'label #1'
132 category = m.group('category')
133 if category.endswith(':'):
134 # "stderr crunchstat: notice: ..."
136 elif category in ('error', 'caught'):
138 elif category in ('read', 'open', 'cgroup', 'CID', 'Running'):
139 # "stderr crunchstat: read /proc/1234/net/dev: ..."
140 # (old logs are less careful with unprefixed error messages)
143 if self.detected_crunch1:
144 task_id = self.seq_to_uuid[int(m.group('seq'))]
146 task_id = 'container'
147 task = self.tasks[task_id]
149 # Use the first and last crunchstat timestamps as
150 # approximations of starttime and finishtime.
151 timestamp = m.group('timestamp')
152 if timestamp[10:11] == '_':
153 timestamp = datetime.datetime.strptime(
154 timestamp, '%Y-%m-%d_%H:%M:%S')
155 elif timestamp[10:11] == 'T':
156 timestamp = datetime.datetime.strptime(
157 timestamp[:19], '%Y-%m-%dT%H:%M:%S')
159 raise ValueError("Cannot parse timestamp {!r}".format(
162 if task.starttime is None:
163 logger.debug('%s: task %s starttime %s',
164 self.label, task_id, timestamp)
165 if task.starttime is None or timestamp < task.starttime:
166 task.starttime = timestamp
167 if task.finishtime is None or timestamp > task.finishtime:
168 task.finishtime = timestamp
170 if self.starttime is None or timestamp < self.starttime:
171 self.starttime = timestamp
172 if self.finishtime is None or timestamp > self.finishtime:
173 self.finishtime = timestamp
175 if (not self.detected_crunch1) and task.starttime is not None and task.finishtime is not None:
176 elapsed = (task.finishtime - task.starttime).seconds
177 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
178 if elapsed > self.stats_max['time']['elapsed']:
179 self.stats_max['time']['elapsed'] = elapsed
181 this_interval_s = None
182 for group in ['current', 'interval']:
183 if not m.group(group):
185 category = m.group('category')
186 words = m.group(group).split(' ')
189 for val, stat in zip(words[::2], words[1::2]):
191 stats[stat] = float(val)
193 stats[stat] = int(val)
194 except ValueError as e:
195 # If the line doesn't start with 'crunchstat:' we
196 # might have mistaken an error message for a
197 # structured crunchstat line.
198 if m.group("crunchstat") is None or m.group("category") == "crunchstat":
199 logger.warning("%s: log contains message\n %s", self.label, line)
202 '%s: Error parsing value %r (stat %r, category %r): %r',
203 self.label, val, stat, category, e)
204 logger.warning('%s', line)
206 if 'user' in stats or 'sys' in stats:
207 stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
208 if 'tx' in stats or 'rx' in stats:
209 stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
210 if group == 'interval':
211 if 'seconds' in stats:
212 this_interval_s = stats.get('seconds',0)
214 if this_interval_s <= 0:
216 "BUG? interval stat given with duration {!r}".
217 format(this_interval_s))
219 logger.error('BUG? interval stat missing duration')
220 for stat, val in stats.items():
221 if group == 'interval' and this_interval_s:
222 stat = stat + '__rate'
223 val = val / this_interval_s
224 if stat in ['user+sys__rate', 'user__rate', 'sys__rate', 'tx+rx__rate', 'rx__rate', 'tx__rate']:
225 task.series[category, stat].append(
226 (timestamp - self.starttime, val))
228 if stat in ['rss','used','total']:
229 task.series[category, stat].append(
230 (timestamp - self.starttime, val))
231 self.task_stats[task_id][category][stat] = val
232 if val > self.stats_max[category][stat]:
233 self.stats_max[category][stat] = val
234 logger.debug('%s: done parsing', self.label)
236 self.job_tot = collections.defaultdict(
237 functools.partial(collections.defaultdict, int))
238 for task_id, task_stat in self.task_stats.items():
239 for category, stat_last in task_stat.items():
240 for stat, val in stat_last.items():
241 if stat in ['cpus', 'cache', 'swap', 'rss']:
242 # meaningless stats like 16 cpu cores x 5 tasks = 80
244 self.job_tot[category][stat] += val
245 logger.debug('%s: done totals', self.label)
250 'net:': 'network I/O',
251 'statfs': 'storage space',
253 for task_stat in self.task_stats.values():
254 for category in task_stat.keys():
255 for checkcat in missing_category:
256 if checkcat.endswith(':'):
257 if category.startswith(checkcat):
258 missing_category.pop(checkcat)
261 if category == checkcat:
262 missing_category.pop(checkcat)
264 for catlabel in missing_category.values():
265 logger.warning('%s: %s stats are missing -- possible cluster configuration issue',
266 self.label, catlabel)
268 def long_label(self):
270 if hasattr(self, 'process') and self.process['uuid'] not in label:
271 label = '{} ({})'.format(label, self.process['uuid'])
274 def elapsed_time(self):
275 if not self.finishtime:
278 s = (self.finishtime - self.starttime).total_seconds()
280 label += '{}d'.format(int(s/86400))
282 label += '{}h'.format(int(s/3600) % 24)
284 label += '{}m'.format(int(s/60) % 60)
285 label += '{}s'.format(int(s) % 60)
288 def text_report(self):
290 return "(no report generated)\n"
291 return "\n".join(itertools.chain(
292 self._text_report_table_gen(lambda x: "\t".join(x),
293 lambda x: "\t".join(x)),
294 self._text_report_agg_gen(lambda x: "# {}: {}{}".format(x[0], x[1], x[2])),
295 self._recommend_gen(lambda x: "#!! "+x))) + "\n"
297 def html_report(self):
298 return WEBCHART_CLASS(self.label, [self]).html()
300 def _text_report_table_gen(self, headerformat, rowformat):
301 yield headerformat(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
302 for category, stat_max in sorted(self.stats_max.items()):
303 for stat, val in sorted(stat_max.items()):
304 if stat.endswith('__rate'):
306 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
307 val = self._format(val)
308 tot = self._format(self.job_tot[category].get(stat, '-'))
309 yield rowformat([category, stat, str(val), max_rate, tot])
311 def _text_report_agg_gen(self, aggformat):
313 if len(self.tasks) > 1:
314 by_single_task = " by a single task"
320 ('CPU time spent{}'.format(by_single_task),
321 self.stats_max['cpu']['user+sys'],
324 ('Max CPU usage in a single interval',
325 self.stats_max['cpu']['user+sys__rate'],
328 ('Overall CPU usage',
329 float(self.job_tot['cpu']['user+sys']) /
330 self.job_tot['time']['elapsed']
331 if self.job_tot['time']['elapsed'] > 0 else 0,
334 ('Max memory used{}'.format(by_single_task),
335 self.stats_max['mem']['rss'],
338 ('Max network traffic{}'.format(by_single_task),
339 self.stats_max['net:eth0']['tx+rx'] +
340 self.stats_max['net:keep0']['tx+rx'],
343 ('Max network speed in a single interval',
344 self.stats_max['net:eth0']['tx+rx__rate'] +
345 self.stats_max['net:keep0']['tx+rx__rate'],
348 ('Keep cache miss rate',
349 (float(self.job_tot['keepcache']['miss']) /
350 float(self.job_tot['keepcalls']['get']))
351 if self.job_tot['keepcalls']['get'] > 0 else 0,
354 ('Keep cache utilization',
355 (float(self.job_tot['blkio:0:0']['read']) /
356 float(self.job_tot['net:keep0']['rx']))
357 if self.job_tot['net:keep0']['rx'] > 0 else 0,
360 ('Temp disk utilization',
361 (float(self.job_tot['statfs']['used']) /
362 float(self.job_tot['statfs']['total']))
363 if self.job_tot['statfs']['total'] > 0 else 0,
368 if len(self.tasks) > 1:
369 metrics.insert(0, ('Number of tasks',
374 format_string, val, transform, suffix = args
375 if val == float('-Inf'):
379 yield aggformat((format_string, self._format(val), suffix))
381 def _recommend_gen(self, recommendformat):
382 # TODO recommend fixing job granularity if elapsed time is too short
383 return itertools.chain(
384 self._recommend_cpu(recommendformat),
385 self._recommend_ram(recommendformat),
386 self._recommend_keep_cache(recommendformat),
387 self._recommend_temp_disk(recommendformat),
390 def _recommend_cpu(self, recommendformat):
391 """Recommend asking for 4 cores if max CPU usage was 333%"""
393 constraint_key = self._map_runtime_constraint('vcpus')
394 cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
395 if cpu_max_rate == float('-Inf') or cpu_max_rate == 0.0:
396 logger.warning('%s: no CPU usage data', self.label)
398 # TODO Don't necessarily want to recommend on isolated max peak
399 # take average CPU usage into account as well or % time at max
400 used_cores = max(1, int(math.ceil(cpu_max_rate)))
401 asked_cores = self.existing_constraints.get(constraint_key)
402 if asked_cores is None:
404 # TODO: This should be more nuanced in cases where max >> avg
405 if used_cores < asked_cores:
406 yield recommendformat(
407 '{} max CPU usage was {}% -- '
408 'try reducing runtime_constraints to "{}":{}'
411 math.ceil(cpu_max_rate*100),
415 # FIXME: This needs to be updated to account for current a-d-c algorithms
416 def _recommend_ram(self, recommendformat):
417 """Recommend an economical RAM constraint for this job.
419 Nodes that are advertised as "8 gibibytes" actually have what
420 we might call "8 nearlygibs" of memory available for jobs.
421 Here, we calculate a whole number of nearlygibs that would
422 have sufficed to run the job, then recommend requesting a node
423 with that number of nearlygibs (expressed as mebibytes).
425 Requesting a node with "nearly 8 gibibytes" is our best hope
426 of getting a node that actually has nearly 8 gibibytes
427 available. If the node manager is smart enough to account for
428 the discrepancy itself when choosing/creating a node, we'll
429 get an 8 GiB node with nearly 8 GiB available. Otherwise, the
430 advertised size of the next-size-smaller node (say, 6 GiB)
431 will be too low to satisfy our request, so we will effectively
432 get rounded up to 8 GiB.
434 For example, if we need 7500 MiB, we can ask for 7500 MiB, and
435 we will generally get a node that is advertised as "8 GiB" and
436 has at least 7500 MiB available. However, asking for 8192 MiB
437 would either result in an unnecessarily expensive 12 GiB node
438 (if node manager knows about the discrepancy), or an 8 GiB
439 node which has less than 8192 MiB available and is therefore
440 considered by crunch-dispatch to be too small to meet our
443 When node manager learns how to predict the available memory
444 for each node type such that crunch-dispatch always agrees
445 that a node is big enough to run the job it was brought up
446 for, all this will be unnecessary. We'll just ask for exactly
447 the memory we want -- even if that happens to be 8192 MiB.
450 constraint_key = self._map_runtime_constraint('ram')
451 used_bytes = self.stats_max['mem']['rss']
452 if used_bytes == float('-Inf'):
453 logger.warning('%s: no memory usage data', self.label)
455 used_mib = math.ceil(float(used_bytes) / MB)
456 asked_mib = self.existing_constraints.get(constraint_key) / MB
458 nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
460 recommend_mib = int(math.ceil(nearlygibs(used_mib/ratio))*AVAILABLE_RAM_RATIO*1024)
461 if used_mib > 0 and (used_mib / asked_mib) < ratio and asked_mib > recommend_mib:
462 yield recommendformat(
463 '{} requested {} MiB of RAM but actual RAM usage was below {}% at {} MiB -- '
464 'suggest reducing RAM request to {} MiB'
472 def _recommend_keep_cache(self, recommendformat):
473 """Recommend increasing keep cache if utilization < 80%"""
474 constraint_key = self._map_runtime_constraint('keep_cache_ram')
475 if self.job_tot['net:keep0']['rx'] == 0:
477 utilization = (float(self.job_tot['blkio:0:0']['read']) /
478 float(self.job_tot['net:keep0']['rx']))
479 # FIXME: the default on this get won't work correctly
480 asked_cache = self.existing_constraints.get(constraint_key, 256) * self._runtime_constraint_mem_unit()
482 if utilization < 0.8:
483 yield recommendformat(
484 '{} Keep cache utilization was {:.2f}% -- '
485 'try doubling runtime_constraints to "{}":{} (or more)'
490 math.ceil(asked_cache * 2 / self._runtime_constraint_mem_unit()))
493 def _recommend_temp_disk(self, recommendformat):
494 """Recommend decreasing temp disk if utilization < 50%"""
495 total = float(self.job_tot['statfs']['total'])
496 utilization = (float(self.job_tot['statfs']['used']) / total) if total > 0 else 0.0
498 if utilization < 50.8 and total > 0:
499 yield recommendformat(
500 '{} max temp disk utilization was {:.0f}% of {:.0f} MiB -- '
501 'consider reducing "tmpdirMin" and/or "outdirMin"'
508 def _format(self, val):
509 """Return a string representation of a stat.
511 {:.2f} for floats, default format for everything else."""
512 if isinstance(val, float):
513 return '{:.2f}'.format(val)
515 return '{}'.format(val)
517 def _runtime_constraint_mem_unit(self):
518 if hasattr(self, 'runtime_constraint_mem_unit'):
519 return self.runtime_constraint_mem_unit
520 elif self.detected_crunch1:
521 return JobSummarizer.runtime_constraint_mem_unit
523 return ContainerRequestSummarizer.runtime_constraint_mem_unit
525 def _map_runtime_constraint(self, key):
526 if hasattr(self, 'map_runtime_constraint'):
527 return self.map_runtime_constraint[key]
528 elif self.detected_crunch1:
529 return JobSummarizer.map_runtime_constraint[key]
534 class CollectionSummarizer(Summarizer):
535 def __init__(self, collection_id, **kwargs):
536 super(CollectionSummarizer, self).__init__(
537 crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
538 self.label = collection_id
541 def NewSummarizer(process_or_uuid, **kwargs):
542 """Construct with the appropriate subclass for this uuid/object."""
544 if isinstance(process_or_uuid, dict):
545 process = process_or_uuid
546 uuid = process['uuid']
548 uuid = process_or_uuid
550 arv = kwargs.get("arv") or arvados.api('v1')
552 if '-dz642-' in uuid:
554 # Get the associated CR. Doesn't matter which since they all have the same logs
555 crs = arv.container_requests().list(filters=[['container_uuid','=',uuid]],limit=1).execute()['items']
558 klass = ContainerRequestTreeSummarizer
559 elif '-xvhdp-' in uuid:
561 process = arv.container_requests().get(uuid=uuid).execute()
562 klass = ContainerRequestTreeSummarizer
563 elif '-8i9sb-' in uuid:
565 process = arv.jobs().get(uuid=uuid).execute()
566 klass = JobTreeSummarizer
567 elif '-d1hrv-' in uuid:
569 process = arv.pipeline_instances().get(uuid=uuid).execute()
570 klass = PipelineSummarizer
571 elif '-4zz18-' in uuid:
572 return CollectionSummarizer(collection_id=uuid)
574 raise ArgumentError("Unrecognized uuid %s", uuid)
575 return klass(process, uuid=uuid, **kwargs)
578 class ProcessSummarizer(Summarizer):
579 """Process is a job, pipeline, or container request."""
581 def __init__(self, process, label=None, **kwargs):
583 self.process = process
585 label = self.process.get('name', self.process['uuid'])
586 # Pre-Arvados v1.4 everything is in 'log'
587 # For 1.4+ containers have no logs and container_requests have them in 'log_uuid', not 'log'
588 log_collection = self.process.get('log', self.process.get('log_uuid'))
589 if log_collection and self.process.get('state') != 'Uncommitted': # arvados.util.CR_UNCOMMITTED:
591 rdr = crunchstat_summary.reader.CollectionReader(log_collection)
592 except arvados.errors.NotFoundError as e:
593 logger.warning("Trying event logs after failing to read "
594 "log collection %s: %s", self.process['log'], e)
596 uuid = self.process.get('container_uuid', self.process.get('uuid'))
597 rdr = crunchstat_summary.reader.LiveLogReader(uuid)
598 label = label + ' (partial)'
599 super(ProcessSummarizer, self).__init__(rdr, label=label, **kwargs)
600 self.existing_constraints = self.process.get('runtime_constraints', {})
603 class JobSummarizer(ProcessSummarizer):
604 runtime_constraint_mem_unit = MB
605 map_runtime_constraint = {
606 'keep_cache_ram': 'keep_cache_mb_per_task',
607 'ram': 'min_ram_mb_per_node',
608 'vcpus': 'min_cores_per_node',
612 class ContainerRequestSummarizer(ProcessSummarizer):
613 runtime_constraint_mem_unit = 1
616 class MultiSummarizer(object):
617 def __init__(self, children={}, label=None, threads=1, **kwargs):
618 self.throttle = threading.Semaphore(threads)
619 self.children = children
622 def run_and_release(self, target, *args, **kwargs):
624 return target(*args, **kwargs)
626 self.throttle.release()
630 for child in self.children.values():
631 self.throttle.acquire()
632 t = threading.Thread(target=self.run_and_release, args=(child.run, ))
639 def text_report(self):
641 d = self._descendants()
642 for child in d.values():
644 txt += '### Summary for {} ({})\n'.format(
645 child.label, child.process['uuid'])
646 txt += child.text_report()
650 def _descendants(self):
651 """Dict of self and all descendants.
653 Nodes with nothing of their own to report (like
654 MultiSummarizers) are omitted.
656 d = collections.OrderedDict()
657 for key, child in self.children.items():
658 if isinstance(child, Summarizer):
660 if isinstance(child, MultiSummarizer):
661 d.update(child._descendants())
664 def html_report(self):
668 if len(self._descendants()) == 1:
669 summarizer = next(iter(self._descendants().values()))
670 tophtml = """{}\n<table class='aggtable'><tbody>{}</tbody></table>\n""".format(
671 "\n".join(summarizer._recommend_gen(lambda x: "<p>{}</p>".format(x))),
672 "\n".join(summarizer._text_report_agg_gen(lambda x: "<tr><th>{}</th><td>{}{}</td></tr>".format(*x))))
674 bottomhtml = """<table class='metricstable'><tbody>{}</tbody></table>\n""".format(
675 "\n".join(summarizer._text_report_table_gen(lambda x: "<tr><th>{}</th><th>{}</th><th>{}</th><th>{}</th><th>{}</th></tr>".format(*x),
676 lambda x: "<tr><td>{}</td><td>{}</td><td>{}</td><td>{}</td><td>{}</td></tr>".format(*x))))
677 label = summarizer.long_label()
679 return WEBCHART_CLASS(label, iter(self._descendants().values())).html(tophtml, bottomhtml)
682 class JobTreeSummarizer(MultiSummarizer):
683 """Summarizes a job and all children listed in its components field."""
684 def __init__(self, job, label=None, **kwargs):
685 arv = kwargs.get("arv") or arvados.api('v1')
686 label = label or job.get('name', job['uuid'])
687 children = collections.OrderedDict()
688 children[job['uuid']] = JobSummarizer(job, label=label, **kwargs)
689 if job.get('components', None):
691 for j in arv.jobs().index(
692 limit=len(job['components']),
693 filters=[['uuid','in',list(job['components'].values())]]).execute()['items']:
694 preloaded[j['uuid']] = j
695 for cname in sorted(job['components'].keys()):
696 child_uuid = job['components'][cname]
697 j = (preloaded.get(child_uuid) or
698 arv.jobs().get(uuid=child_uuid).execute())
699 children[child_uuid] = JobTreeSummarizer(job=j, label=cname, **kwargs)
701 super(JobTreeSummarizer, self).__init__(
707 class PipelineSummarizer(MultiSummarizer):
708 def __init__(self, instance, **kwargs):
709 children = collections.OrderedDict()
710 for cname, component in instance['components'].items():
711 if 'job' not in component:
713 "%s: skipping component with no job assigned", cname)
716 "%s: job %s", cname, component['job']['uuid'])
717 summarizer = JobTreeSummarizer(component['job'], label=cname, **kwargs)
718 summarizer.label = '{} {}'.format(
719 cname, component['job']['uuid'])
720 children[cname] = summarizer
721 super(PipelineSummarizer, self).__init__(
723 label=instance['uuid'],
727 class ContainerRequestTreeSummarizer(MultiSummarizer):
728 def __init__(self, root, skip_child_jobs=False, **kwargs):
729 arv = kwargs.get("arv") or arvados.api('v1')
731 label = kwargs.pop('label', None) or root.get('name') or root['uuid']
734 children = collections.OrderedDict()
735 todo = collections.deque((root, ))
737 current = todo.popleft()
738 label = current['name']
739 sort_key = current['created_at']
741 summer = ContainerRequestSummarizer(current, label=label, **kwargs)
742 summer.sort_key = sort_key
743 children[current['uuid']] = summer
747 child_crs = arv.container_requests().index(
749 filters=page_filters+[
750 ['requesting_container_uuid', '=', current['container_uuid']]],
752 if not child_crs['items']:
754 elif skip_child_jobs:
755 logger.warning('%s: omitting stats from %d child containers'
756 ' because --skip-child-jobs flag is on',
757 label, child_crs['items_available'])
759 page_filters = [['uuid', '>', child_crs['items'][-1]['uuid']]]
760 for cr in child_crs['items']:
761 if cr['container_uuid']:
762 logger.debug('%s: container req %s', current['uuid'], cr['uuid'])
763 cr['name'] = cr.get('name') or cr['uuid']
765 sorted_children = collections.OrderedDict()
766 for uuid in sorted(list(children.keys()), key=lambda uuid: children[uuid].sort_key):
767 sorted_children[uuid] = children[uuid]
768 super(ContainerRequestTreeSummarizer, self).__init__(
769 children=sorted_children,