1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
7 import crunchstat_summary.dygraphs
8 import crunchstat_summary.reader
18 from concurrent.futures import ThreadPoolExecutor
20 from crunchstat_summary import logger
22 # Recommend memory constraints that are this multiple of an integral
23 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
24 # that have amounts like 7.5 GiB according to the kernel.)
25 AVAILABLE_RAM_RATIO = 0.90
28 # Workaround datetime.datetime.strptime() thread-safety bug by calling
29 # it once before starting threads. https://bugs.python.org/issue7980
30 datetime.datetime.strptime('1999-12-31_23:59:59', '%Y-%m-%d_%H:%M:%S')
33 WEBCHART_CLASS = crunchstat_summary.dygraphs.DygraphsChart
39 self.finishtime = None
40 self.series = collections.defaultdict(list)
43 class Summarizer(object):
44 def __init__(self, logdata, label=None, skip_child_jobs=False, uuid=None, **kwargs):
45 self._logdata = logdata
50 self.finishtime = None
51 self._skip_child_jobs = skip_child_jobs
53 # stats_max: {category: {stat: val}}
54 self.stats_max = collections.defaultdict(
55 functools.partial(collections.defaultdict, lambda: 0))
56 # task_stats: {task_id: {category: {stat: val}}}
57 self.task_stats = collections.defaultdict(
58 functools.partial(collections.defaultdict, dict))
61 self.tasks = collections.defaultdict(Task)
63 # We won't bother recommending new runtime constraints if the
64 # constraints given when running the job are known to us and
65 # are already suitable. If applicable, the subclass
66 # constructor will overwrite this with something useful.
67 self.existing_constraints = {}
70 logger.info("%s: logdata %s", self.label, logdata)
73 logger.debug("%s: parsing logdata %s", self.label, self._logdata)
74 with self._logdata as logdata:
77 def _run(self, logdata):
78 self.detected_crunch1 = False
80 if not self.node_info:
81 self.node_info = logdata.node_info()
84 if not self.detected_crunch1 and '-8i9sb-' in line:
85 self.detected_crunch1 = True
87 if self.detected_crunch1:
88 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
90 seq = int(m.group('seq'))
91 uuid = m.group('task_uuid')
92 self.seq_to_uuid[seq] = uuid
93 logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
96 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
98 task_id = self.seq_to_uuid[int(m.group('seq'))]
99 elapsed = int(m.group('elapsed'))
100 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
101 if elapsed > self.stats_max['time']['elapsed']:
102 self.stats_max['time']['elapsed'] = elapsed
105 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
107 uuid = m.group('uuid')
108 if self._skip_child_jobs:
109 logger.warning('%s: omitting stats from child job %s'
110 ' because --skip-child-jobs flag is on',
113 logger.debug('%s: follow %s', self.label, uuid)
114 child_summarizer = NewSummarizer(uuid)
115 child_summarizer.stats_max = self.stats_max
116 child_summarizer.task_stats = self.task_stats
117 child_summarizer.tasks = self.tasks
118 child_summarizer.starttime = self.starttime
119 child_summarizer.run()
120 logger.debug('%s: done %s', self.label, uuid)
123 # 2017-12-02_17:15:08 e51c5-8i9sb-mfp68stkxnqdd6m 63676 0 stderr crunchstat: keepcalls 0 put 2576 get -- interval 10.0000 seconds 0 put 2576 get
124 m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr (?P<crunchstat>crunchstat: )(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
129 # 2017-12-01T16:56:24.723509200Z crunchstat: keepcalls 0 put 3 get -- interval 10.0000 seconds 0 put 3 get
130 m = re.search(r'^(?P<timestamp>\S+) (?P<crunchstat>crunchstat: )?(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
134 if self.label is None:
136 self.label = m.group('job_uuid')
138 self.label = 'label #1'
139 category = m.group('category')
140 if category.endswith(':'):
141 # "stderr crunchstat: notice: ..."
143 elif category in ('error', 'caught'):
145 elif category in ('read', 'open', 'cgroup', 'CID', 'Running'):
146 # "stderr crunchstat: read /proc/1234/net/dev: ..."
147 # (old logs are less careful with unprefixed error messages)
150 if self.detected_crunch1:
151 task_id = self.seq_to_uuid[int(m.group('seq'))]
153 task_id = 'container'
154 task = self.tasks[task_id]
156 # Use the first and last crunchstat timestamps as
157 # approximations of starttime and finishtime.
158 timestamp = m.group('timestamp')
159 if timestamp[10:11] == '_':
160 timestamp = datetime.datetime.strptime(
161 timestamp, '%Y-%m-%d_%H:%M:%S')
162 elif timestamp[10:11] == 'T':
163 timestamp = datetime.datetime.strptime(
164 timestamp[:19], '%Y-%m-%dT%H:%M:%S')
166 raise ValueError("Cannot parse timestamp {!r}".format(
169 if task.starttime is None:
170 logger.debug('%s: task %s starttime %s',
171 self.label, task_id, timestamp)
172 if task.starttime is None or timestamp < task.starttime:
173 task.starttime = timestamp
174 if task.finishtime is None or timestamp > task.finishtime:
175 task.finishtime = timestamp
177 if self.starttime is None or timestamp < self.starttime:
178 self.starttime = timestamp
179 if self.finishtime is None or timestamp > self.finishtime:
180 self.finishtime = timestamp
182 if (not self.detected_crunch1) and task.starttime is not None and task.finishtime is not None:
183 elapsed = (task.finishtime - task.starttime).seconds
184 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
185 if elapsed > self.stats_max['time']['elapsed']:
186 self.stats_max['time']['elapsed'] = elapsed
188 this_interval_s = None
189 for group in ['current', 'interval']:
190 if not m.group(group):
192 category = m.group('category')
193 words = m.group(group).split(' ')
196 for val, stat in zip(words[::2], words[1::2]):
198 stats[stat] = float(val)
200 stats[stat] = int(val)
201 except ValueError as e:
202 # If the line doesn't start with 'crunchstat:' we
203 # might have mistaken an error message for a
204 # structured crunchstat line.
205 if m.group("crunchstat") is None or m.group("category") == "crunchstat":
206 logger.warning("%s: log contains message\n %s", self.label, line)
209 '%s: Error parsing value %r (stat %r, category %r): %r',
210 self.label, val, stat, category, e)
211 logger.warning('%s', line)
213 if 'user' in stats or 'sys' in stats:
214 stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
215 if 'tx' in stats or 'rx' in stats:
216 stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
217 if group == 'interval':
218 if 'seconds' in stats:
219 this_interval_s = stats.get('seconds',0)
221 if this_interval_s <= 0:
223 "BUG? interval stat given with duration {!r}".
224 format(this_interval_s))
226 logger.error('BUG? interval stat missing duration')
227 for stat, val in stats.items():
228 if group == 'interval' and this_interval_s:
229 stat = stat + '__rate'
230 val = val / this_interval_s
231 if stat in ['user+sys__rate', 'user__rate', 'sys__rate', 'tx+rx__rate', 'rx__rate', 'tx__rate']:
232 task.series[category, stat].append(
233 (timestamp - self.starttime, val))
235 if stat in ['rss','used','total']:
236 task.series[category, stat].append(
237 (timestamp - self.starttime, val))
238 self.task_stats[task_id][category][stat] = val
239 if val > self.stats_max[category][stat]:
240 self.stats_max[category][stat] = val
241 logger.debug('%s: done parsing', self.label)
243 self.job_tot = collections.defaultdict(
244 functools.partial(collections.defaultdict, int))
245 for task_id, task_stat in self.task_stats.items():
246 for category, stat_last in task_stat.items():
247 for stat, val in stat_last.items():
248 if stat in ['cpus', 'cache', 'swap', 'rss']:
249 # meaningless stats like 16 cpu cores x 5 tasks = 80
251 self.job_tot[category][stat] += val
252 logger.debug('%s: done totals', self.label)
257 'net:': 'network I/O',
258 'statfs': 'storage space',
260 for task_stat in self.task_stats.values():
261 for category in task_stat.keys():
262 for checkcat in missing_category:
263 if checkcat.endswith(':'):
264 if category.startswith(checkcat):
265 missing_category.pop(checkcat)
268 if category == checkcat:
269 missing_category.pop(checkcat)
271 for catlabel in missing_category.values():
272 logger.warning('%s: %s stats are missing -- possible cluster configuration issue',
273 self.label, catlabel)
275 def long_label(self):
277 if hasattr(self, 'process') and self.process['uuid'] not in label:
278 label = '{} ({})'.format(label, self.process['uuid'])
281 def elapsed_time(self):
282 if not self.finishtime:
285 s = (self.finishtime - self.starttime).total_seconds()
287 label += '{}d '.format(int(s/86400))
289 label += '{}h '.format(int(s/3600) % 24)
291 label += '{}m '.format(int(s/60) % 60)
292 label += '{}s'.format(int(s) % 60)
295 def text_report(self):
297 return "(no report generated)\n"
298 return "\n".join(itertools.chain(
299 self._text_report_table_gen(lambda x: "\t".join(x),
300 lambda x: "\t".join(x)),
301 self._text_report_agg_gen(lambda x: "# {}: {}{}".format(x[0], x[1], x[2])),
302 self._recommend_gen(lambda x: "#!! "+x))) + "\n"
304 def html_report(self):
305 return WEBCHART_CLASS(self.label, [self]).html()
307 def _text_report_table_gen(self, headerformat, rowformat):
308 yield headerformat(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
309 for category, stat_max in sorted(self.stats_max.items()):
310 for stat, val in sorted(stat_max.items()):
311 if stat.endswith('__rate'):
313 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
314 val = self._format(val)
315 tot = self._format(self.job_tot[category].get(stat, '-'))
316 yield rowformat([category, stat, str(val), max_rate, tot])
318 def _text_report_agg_gen(self, aggformat):
320 if len(self.tasks) > 1:
321 by_single_task = " by a single task"
330 '${:.3f}'.format(self.cost),
332 '') if self.cost > 0 else None,
334 ('Assigned instance type',
335 self.node_info.get('ProviderType'),
337 '') if self.node_info.get('ProviderType') else None,
339 ('Instance hourly price',
340 '${:.3f}'.format(self.node_info.get('Price')),
342 '') if self.node_info.get('Price') else None,
344 ('Max CPU usage in a single interval',
345 self.stats_max['cpu']['user+sys__rate'],
349 ('Overall CPU usage',
350 float(self.job_tot['cpu']['user+sys']) /
351 self.job_tot['time']['elapsed']
352 if self.job_tot['time']['elapsed'] > 0 else 0,
356 ('Requested CPU cores',
357 self.existing_constraints.get(self._map_runtime_constraint('vcpus')),
362 self.node_info.get('VCPUs'),
364 '') if self.node_info.get('VCPUs') else None,
366 ('Max memory used{}'.format(by_single_task),
367 self.stats_max['mem']['rss'],
372 self.existing_constraints.get(self._map_runtime_constraint('ram')),
376 ('Maximum RAM request for this instance type',
377 (self.node_info.get('RAM') - self.arv_config.get('Containers', {}).get('ReserveExtraRAM', {}))*.95,
381 ('Max network traffic{}'.format(by_single_task),
382 self.stats_max['net:eth0']['tx+rx'] +
383 self.stats_max['net:keep0']['tx+rx'],
387 ('Max network speed in a single interval',
388 self.stats_max['net:eth0']['tx+rx__rate'] +
389 self.stats_max['net:keep0']['tx+rx__rate'],
393 ('Keep cache miss rate',
394 (float(self.job_tot['keepcache']['miss']) /
395 float(self.job_tot['keepcalls']['get']))
396 if self.job_tot['keepcalls']['get'] > 0 else 0,
400 ('Keep cache utilization',
401 (float(self.job_tot['blkio:0:0']['read']) /
402 float(self.job_tot['net:keep0']['rx']))
403 if self.job_tot['net:keep0']['rx'] > 0 else 0,
407 ('Temp disk utilization',
408 (float(self.job_tot['statfs']['used']) /
409 float(self.job_tot['statfs']['total']))
410 if self.job_tot['statfs']['total'] > 0 else 0,
415 if len(self.tasks) > 1:
416 metrics.insert(0, ('Number of tasks',
423 format_string, val, transform, suffix = args
424 if val == float('-Inf'):
428 yield aggformat((format_string, self._format(val), suffix))
430 def _recommend_gen(self, recommendformat):
431 # TODO recommend fixing job granularity if elapsed time is too short
432 return itertools.chain(
433 self._recommend_cpu(recommendformat),
434 self._recommend_ram(recommendformat),
435 self._recommend_keep_cache(recommendformat),
436 self._recommend_temp_disk(recommendformat),
439 def _recommend_cpu(self, recommendformat):
440 """Recommend asking for 4 cores if max CPU usage was 333%"""
442 constraint_key = self._map_runtime_constraint('vcpus')
443 cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
444 if cpu_max_rate == float('-Inf') or cpu_max_rate == 0.0:
445 logger.warning('%s: no CPU usage data', self.label)
447 # TODO Don't necessarily want to recommend on isolated max peak
448 # take average CPU usage into account as well or % time at max
449 used_cores = max(1, int(math.ceil(cpu_max_rate)))
450 asked_cores = self.existing_constraints.get(constraint_key)
451 if asked_cores is None:
453 # TODO: This should be more nuanced in cases where max >> avg
454 if used_cores < asked_cores:
455 yield recommendformat(
456 '{} max CPU usage was {}% -- '
457 'try reducing runtime_constraints to "{}":{}'
460 math.ceil(cpu_max_rate*100),
464 # FIXME: This needs to be updated to account for current a-d-c algorithms
465 def _recommend_ram(self, recommendformat):
466 """Recommend an economical RAM constraint for this job.
468 Nodes that are advertised as "8 gibibytes" actually have what
469 we might call "8 nearlygibs" of memory available for jobs.
470 Here, we calculate a whole number of nearlygibs that would
471 have sufficed to run the job, then recommend requesting a node
472 with that number of nearlygibs (expressed as mebibytes).
474 Requesting a node with "nearly 8 gibibytes" is our best hope
475 of getting a node that actually has nearly 8 gibibytes
476 available. If the node manager is smart enough to account for
477 the discrepancy itself when choosing/creating a node, we'll
478 get an 8 GiB node with nearly 8 GiB available. Otherwise, the
479 advertised size of the next-size-smaller node (say, 6 GiB)
480 will be too low to satisfy our request, so we will effectively
481 get rounded up to 8 GiB.
483 For example, if we need 7500 MiB, we can ask for 7500 MiB, and
484 we will generally get a node that is advertised as "8 GiB" and
485 has at least 7500 MiB available. However, asking for 8192 MiB
486 would either result in an unnecessarily expensive 12 GiB node
487 (if node manager knows about the discrepancy), or an 8 GiB
488 node which has less than 8192 MiB available and is therefore
489 considered by crunch-dispatch to be too small to meet our
492 When node manager learns how to predict the available memory
493 for each node type such that crunch-dispatch always agrees
494 that a node is big enough to run the job it was brought up
495 for, all this will be unnecessary. We'll just ask for exactly
496 the memory we want -- even if that happens to be 8192 MiB.
499 constraint_key = self._map_runtime_constraint('ram')
500 used_bytes = self.stats_max['mem']['rss']
501 if used_bytes == float('-Inf'):
502 logger.warning('%s: no memory usage data', self.label)
504 used_mib = math.ceil(float(used_bytes) / MB)
505 asked_mib = self.existing_constraints.get(constraint_key) / MB
507 nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
509 recommend_mib = int(math.ceil(nearlygibs(used_mib/ratio))*AVAILABLE_RAM_RATIO*1024)
510 if used_mib > 0 and (used_mib / asked_mib) < ratio and asked_mib > recommend_mib:
511 yield recommendformat(
512 '{} requested {} MiB of RAM but actual RAM usage was below {}% at {} MiB -- '
513 'suggest reducing RAM request to {} MiB'
521 def _recommend_keep_cache(self, recommendformat):
522 """Recommend increasing keep cache if utilization < 80%"""
523 constraint_key = self._map_runtime_constraint('keep_cache_ram')
524 if self.job_tot['net:keep0']['rx'] == 0:
526 utilization = (float(self.job_tot['blkio:0:0']['read']) /
527 float(self.job_tot['net:keep0']['rx']))
528 # FIXME: the default on this get won't work correctly
529 asked_cache = self.existing_constraints.get(constraint_key, 256) * self._runtime_constraint_mem_unit()
531 if utilization < 0.8:
532 yield recommendformat(
533 '{} Keep cache utilization was {:.2f}% -- '
534 'try doubling runtime_constraints to "{}":{} (or more)'
539 math.ceil(asked_cache * 2 / self._runtime_constraint_mem_unit()))
542 def _recommend_temp_disk(self, recommendformat):
543 """Recommend decreasing temp disk if utilization < 50%"""
544 total = float(self.job_tot['statfs']['total'])
545 utilization = (float(self.job_tot['statfs']['used']) / total) if total > 0 else 0.0
547 if utilization < 50.8 and total > 0:
548 yield recommendformat(
549 '{} max temp disk utilization was {:.0f}% of {:.0f} MiB -- '
550 'consider reducing "tmpdirMin" and/or "outdirMin"'
557 def _format(self, val):
558 """Return a string representation of a stat.
560 {:.2f} for floats, default format for everything else."""
561 if isinstance(val, float):
562 return '{:.2f}'.format(val)
564 return '{}'.format(val)
566 def _runtime_constraint_mem_unit(self):
567 if hasattr(self, 'runtime_constraint_mem_unit'):
568 return self.runtime_constraint_mem_unit
569 elif self.detected_crunch1:
570 return JobSummarizer.runtime_constraint_mem_unit
572 return ContainerRequestSummarizer.runtime_constraint_mem_unit
574 def _map_runtime_constraint(self, key):
575 if hasattr(self, 'map_runtime_constraint'):
576 return self.map_runtime_constraint[key]
577 elif self.detected_crunch1:
578 return JobSummarizer.map_runtime_constraint[key]
583 class CollectionSummarizer(Summarizer):
584 def __init__(self, collection_id, **kwargs):
585 super(CollectionSummarizer, self).__init__(
586 crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
587 self.label = collection_id
590 def NewSummarizer(process_or_uuid, **kwargs):
591 """Construct with the appropriate subclass for this uuid/object."""
593 if isinstance(process_or_uuid, dict):
594 process = process_or_uuid
595 uuid = process['uuid']
597 uuid = process_or_uuid
599 arv = kwargs.get("arv") or arvados.api('v1')
601 if '-dz642-' in uuid:
603 # Get the associated CR. Doesn't matter which since they all have the same logs
604 crs = arv.container_requests().list(filters=[['container_uuid','=',uuid]],limit=1).execute()['items']
607 klass = ContainerRequestTreeSummarizer
608 elif '-xvhdp-' in uuid:
610 process = arv.container_requests().get(uuid=uuid).execute()
611 klass = ContainerRequestTreeSummarizer
612 elif '-8i9sb-' in uuid:
614 process = arv.jobs().get(uuid=uuid).execute()
615 klass = JobTreeSummarizer
616 elif '-d1hrv-' in uuid:
618 process = arv.pipeline_instances().get(uuid=uuid).execute()
619 klass = PipelineSummarizer
620 elif '-4zz18-' in uuid:
621 return CollectionSummarizer(collection_id=uuid)
623 raise ArgumentError("Unrecognized uuid %s", uuid)
624 return klass(process, uuid=uuid, **kwargs)
627 class ProcessSummarizer(Summarizer):
628 """Process is a job, pipeline, or container request."""
630 def __init__(self, process, label=None, **kwargs):
632 self.process = process
633 arv = kwargs.get("arv") or arvados.api('v1')
635 label = self.process.get('name', self.process['uuid'])
636 # Pre-Arvados v1.4 everything is in 'log'
637 # For 1.4+ containers have no logs and container_requests have them in 'log_uuid', not 'log'
638 log_collection = self.process.get('log', self.process.get('log_uuid'))
639 if log_collection and self.process.get('state') != 'Uncommitted': # arvados.util.CR_UNCOMMITTED:
641 rdr = crunchstat_summary.reader.CollectionReader(log_collection, api_client=arv)
642 except arvados.errors.NotFoundError as e:
643 logger.warning("Trying event logs after failing to read "
644 "log collection %s: %s", self.process['log'], e)
646 uuid = self.process.get('container_uuid', self.process.get('uuid'))
647 rdr = crunchstat_summary.reader.LiveLogReader(uuid)
648 label = label + ' (partial)'
650 super(ProcessSummarizer, self).__init__(rdr, label=label, **kwargs)
651 self.existing_constraints = self.process.get('runtime_constraints', {})
652 self.arv_config = arv.config()
653 self.cost = self.process.get('cost', 0)
657 class JobSummarizer(ProcessSummarizer):
658 runtime_constraint_mem_unit = MB
659 map_runtime_constraint = {
660 'keep_cache_ram': 'keep_cache_mb_per_task',
661 'ram': 'min_ram_mb_per_node',
662 'vcpus': 'min_cores_per_node',
666 class ContainerRequestSummarizer(ProcessSummarizer):
667 runtime_constraint_mem_unit = 1
670 class MultiSummarizer(object):
671 def __init__(self, children={}, label=None, threads=1, **kwargs):
672 self.children = children
674 self.threadcount = threads
677 if self.threadcount > 1 and len(self.children) > 1:
679 def run_and_progress(child):
682 except Exception as e:
683 logger.exception("parse error")
685 logger.info("%s/%s summarized %s", completed, len(self.children), child.label)
686 with ThreadPoolExecutor(max_workers=self.threadcount) as tpe:
687 for child in self.children.values():
688 tpe.submit(run_and_progress, child)
690 for child in self.children.values():
693 def text_report(self):
695 d = self._descendants()
696 for child in d.values():
698 txt += '### Summary for {} ({})\n'.format(
699 child.label, child.process['uuid'])
700 txt += child.text_report()
704 def _descendants(self):
705 """Dict of self and all descendants.
707 Nodes with nothing of their own to report (like
708 MultiSummarizers) are omitted.
710 d = collections.OrderedDict()
711 for key, child in self.children.items():
712 if isinstance(child, Summarizer):
714 if isinstance(child, MultiSummarizer):
715 d.update(child._descendants())
718 def html_report(self):
722 if len(self._descendants()) == 1:
723 summarizer = next(iter(self._descendants().values()))
724 tophtml = """{}\n<table class='aggtable'><tbody>{}</tbody></table>\n""".format(
725 "\n".join(summarizer._recommend_gen(lambda x: "<p>{}</p>".format(x))),
726 "\n".join(summarizer._text_report_agg_gen(lambda x: "<tr><th>{}</th><td>{}{}</td></tr>".format(*x))))
728 bottomhtml = """<table class='metricstable'><tbody>{}</tbody></table>\n""".format(
729 "\n".join(summarizer._text_report_table_gen(lambda x: "<tr><th>{}</th><th>{}</th><th>{}</th><th>{}</th><th>{}</th></tr>".format(*x),
730 lambda x: "<tr><td>{}</td><td>{}</td><td>{}</td><td>{}</td><td>{}</td></tr>".format(*x))))
731 label = summarizer.long_label()
733 return WEBCHART_CLASS(label, iter(self._descendants().values())).html(tophtml, bottomhtml)
736 class JobTreeSummarizer(MultiSummarizer):
737 """Summarizes a job and all children listed in its components field."""
738 def __init__(self, job, label=None, **kwargs):
739 arv = kwargs.get("arv") or arvados.api('v1')
740 label = label or job.get('name', job['uuid'])
741 children = collections.OrderedDict()
742 children[job['uuid']] = JobSummarizer(job, label=label, **kwargs)
743 if job.get('components', None):
745 for j in arv.jobs().index(
746 limit=len(job['components']),
747 filters=[['uuid','in',list(job['components'].values())]]).execute()['items']:
748 preloaded[j['uuid']] = j
749 for cname in sorted(job['components'].keys()):
750 child_uuid = job['components'][cname]
751 j = (preloaded.get(child_uuid) or
752 arv.jobs().get(uuid=child_uuid).execute())
753 children[child_uuid] = JobTreeSummarizer(job=j, label=cname, **kwargs)
755 super(JobTreeSummarizer, self).__init__(
761 class PipelineSummarizer(MultiSummarizer):
762 def __init__(self, instance, **kwargs):
763 children = collections.OrderedDict()
764 for cname, component in instance['components'].items():
765 if 'job' not in component:
767 "%s: skipping component with no job assigned", cname)
770 "%s: job %s", cname, component['job']['uuid'])
771 summarizer = JobTreeSummarizer(component['job'], label=cname, **kwargs)
772 summarizer.label = '{} {}'.format(
773 cname, component['job']['uuid'])
774 children[cname] = summarizer
775 super(PipelineSummarizer, self).__init__(
777 label=instance['uuid'],
781 class ContainerRequestTreeSummarizer(MultiSummarizer):
782 def __init__(self, root, skip_child_jobs=False, **kwargs):
783 arv = kwargs.get("arv") or arvados.api('v1')
785 label = kwargs.pop('label', None) or root.get('name') or root['uuid']
788 children = collections.OrderedDict()
789 todo = collections.deque((root, ))
791 current = todo.popleft()
792 label = current['name']
793 sort_key = current['created_at']
795 summer = ContainerRequestSummarizer(current, label=label, **kwargs)
796 summer.sort_key = sort_key
797 children[current['uuid']] = summer
800 child_crs = arv.container_requests().list(filters=[['requesting_container_uuid', '=', current['container_uuid']]],
802 logger.warning('%s: omitting stats from child containers'
803 ' because --skip-child-jobs flag is on',
804 label, child_crs['items_available'])
806 for cr in arvados.util.keyset_list_all(arv.container_requests().list,
807 filters=[['requesting_container_uuid', '=', current['container_uuid']]]):
808 if cr['container_uuid']:
809 logger.debug('%s: container req %s', current['uuid'], cr['uuid'])
810 cr['name'] = cr.get('name') or cr['uuid']
812 sorted_children = collections.OrderedDict()
813 for uuid in sorted(list(children.keys()), key=lambda uuid: children[uuid].sort_key):
814 sorted_children[uuid] = children[uuid]
815 super(ContainerRequestTreeSummarizer, self).__init__(
816 children=sorted_children,