1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
7 import crunchstat_summary.dygraphs
8 import crunchstat_summary.reader
18 from concurrent.futures import ThreadPoolExecutor
20 from crunchstat_summary import logger
22 # Recommend memory constraints that are this multiple of an integral
23 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
24 # that have amounts like 7.5 GiB according to the kernel.)
25 AVAILABLE_RAM_RATIO = 0.90
28 # Workaround datetime.datetime.strptime() thread-safety bug by calling
29 # it once before starting threads. https://bugs.python.org/issue7980
30 datetime.datetime.strptime('1999-12-31_23:59:59', '%Y-%m-%d_%H:%M:%S')
33 WEBCHART_CLASS = crunchstat_summary.dygraphs.DygraphsChart
39 self.finishtime = None
40 self.series = collections.defaultdict(list)
43 class Summarizer(object):
44 def __init__(self, logdata, label=None, skip_child_jobs=False, uuid=None, **kwargs):
45 self._logdata = logdata
50 self.finishtime = None
51 self._skip_child_jobs = skip_child_jobs
53 # stats_max: {category: {stat: val}}
54 self.stats_max = collections.defaultdict(
55 functools.partial(collections.defaultdict, lambda: 0))
56 # task_stats: {task_id: {category: {stat: val}}}
57 self.task_stats = collections.defaultdict(
58 functools.partial(collections.defaultdict, dict))
61 self.tasks = collections.defaultdict(Task)
63 # We won't bother recommending new runtime constraints if the
64 # constraints given when running the job are known to us and
65 # are already suitable. If applicable, the subclass
66 # constructor will overwrite this with something useful.
67 self.existing_constraints = {}
70 logger.info("%s: logdata %s", self.label, logdata)
73 logger.debug("%s: parsing logdata %s", self.label, self._logdata)
74 with self._logdata as logdata:
77 def _run(self, logdata):
78 self.detected_crunch1 = False
80 if not self.node_info:
81 self.node_info = logdata.node_info()
84 if not self.detected_crunch1 and '-8i9sb-' in line:
85 self.detected_crunch1 = True
87 if self.detected_crunch1:
88 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
90 seq = int(m.group('seq'))
91 uuid = m.group('task_uuid')
92 self.seq_to_uuid[seq] = uuid
93 logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
96 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
98 task_id = self.seq_to_uuid[int(m.group('seq'))]
99 elapsed = int(m.group('elapsed'))
100 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
101 if elapsed > self.stats_max['time']['elapsed']:
102 self.stats_max['time']['elapsed'] = elapsed
105 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
107 uuid = m.group('uuid')
108 if self._skip_child_jobs:
109 logger.warning('%s: omitting stats from child job %s'
110 ' because --skip-child-jobs flag is on',
113 logger.debug('%s: follow %s', self.label, uuid)
114 child_summarizer = NewSummarizer(uuid)
115 child_summarizer.stats_max = self.stats_max
116 child_summarizer.task_stats = self.task_stats
117 child_summarizer.tasks = self.tasks
118 child_summarizer.starttime = self.starttime
119 child_summarizer.run()
120 logger.debug('%s: done %s', self.label, uuid)
123 # 2017-12-02_17:15:08 e51c5-8i9sb-mfp68stkxnqdd6m 63676 0 stderr crunchstat: keepcalls 0 put 2576 get -- interval 10.0000 seconds 0 put 2576 get
124 m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr (?P<crunchstat>crunchstat: )(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
129 # 2017-12-01T16:56:24.723509200Z crunchstat: keepcalls 0 put 3 get -- interval 10.0000 seconds 0 put 3 get
130 m = re.search(r'^(?P<timestamp>\S+) (?P<crunchstat>crunchstat: )?(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
134 if self.label is None:
136 self.label = m.group('job_uuid')
138 self.label = 'label #1'
139 category = m.group('category')
140 if category.endswith(':'):
141 # "stderr crunchstat: notice: ..."
143 elif category in ('error', 'caught'):
145 elif category in ('read', 'open', 'cgroup', 'CID', 'Running'):
146 # "stderr crunchstat: read /proc/1234/net/dev: ..."
147 # (old logs are less careful with unprefixed error messages)
150 if self.detected_crunch1:
151 task_id = self.seq_to_uuid[int(m.group('seq'))]
153 task_id = 'container'
154 task = self.tasks[task_id]
156 # Use the first and last crunchstat timestamps as
157 # approximations of starttime and finishtime.
158 timestamp = m.group('timestamp')
159 if timestamp[10:11] == '_':
160 timestamp = datetime.datetime.strptime(
161 timestamp, '%Y-%m-%d_%H:%M:%S')
162 elif timestamp[10:11] == 'T':
163 timestamp = datetime.datetime.strptime(
164 timestamp[:19], '%Y-%m-%dT%H:%M:%S')
166 raise ValueError("Cannot parse timestamp {!r}".format(
169 if task.starttime is None:
170 logger.debug('%s: task %s starttime %s',
171 self.label, task_id, timestamp)
172 if task.starttime is None or timestamp < task.starttime:
173 task.starttime = timestamp
174 if task.finishtime is None or timestamp > task.finishtime:
175 task.finishtime = timestamp
177 if self.starttime is None or timestamp < self.starttime:
178 self.starttime = timestamp
179 if self.finishtime is None or timestamp > self.finishtime:
180 self.finishtime = timestamp
182 if (not self.detected_crunch1) and task.starttime is not None and task.finishtime is not None:
183 elapsed = (task.finishtime - task.starttime).seconds
184 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
185 if elapsed > self.stats_max['time']['elapsed']:
186 self.stats_max['time']['elapsed'] = elapsed
188 this_interval_s = None
189 for group in ['current', 'interval']:
190 if not m.group(group):
192 category = m.group('category')
193 words = m.group(group).split(' ')
196 for val, stat in zip(words[::2], words[1::2]):
198 stats[stat] = float(val)
200 stats[stat] = int(val)
201 except ValueError as e:
202 # If the line doesn't start with 'crunchstat:' we
203 # might have mistaken an error message for a
204 # structured crunchstat line.
205 if m.group("crunchstat") is None or m.group("category") == "crunchstat":
206 logger.warning("%s: log contains message\n %s", self.label, line)
209 '%s: Error parsing value %r (stat %r, category %r): %r',
210 self.label, val, stat, category, e)
211 logger.warning('%s', line)
213 if 'user' in stats or 'sys' in stats:
214 stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
215 if 'tx' in stats or 'rx' in stats:
216 stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
217 if group == 'interval':
218 if 'seconds' in stats:
219 this_interval_s = stats.get('seconds',0)
221 if this_interval_s <= 0:
223 "BUG? interval stat given with duration {!r}".
224 format(this_interval_s))
226 logger.error('BUG? interval stat missing duration')
227 for stat, val in stats.items():
228 if group == 'interval' and this_interval_s:
229 stat = stat + '__rate'
230 val = val / this_interval_s
231 if stat in ['user+sys__rate', 'user__rate', 'sys__rate', 'tx+rx__rate', 'rx__rate', 'tx__rate']:
232 task.series[category, stat].append(
233 (timestamp - self.starttime, val))
235 if stat in ['rss','used','total']:
236 task.series[category, stat].append(
237 (timestamp - self.starttime, val))
238 self.task_stats[task_id][category][stat] = val
239 if val > self.stats_max[category][stat]:
240 self.stats_max[category][stat] = val
241 logger.debug('%s: done parsing', self.label)
243 self.job_tot = collections.defaultdict(
244 functools.partial(collections.defaultdict, int))
245 for task_id, task_stat in self.task_stats.items():
246 for category, stat_last in task_stat.items():
247 for stat, val in stat_last.items():
248 if stat in ['cpus', 'cache', 'swap', 'rss']:
249 # meaningless stats like 16 cpu cores x 5 tasks = 80
251 self.job_tot[category][stat] += val
252 logger.debug('%s: done totals', self.label)
254 if self.stats_max['time'].get('elapsed', 0) > 20:
255 # needs to have executed for at least 20 seconds or we may
256 # not have collected any metrics and these warnings are duds.
260 'net:': 'network I/O',
261 'statfs': 'storage space',
263 for task_stat in self.task_stats.values():
264 for category in task_stat.keys():
265 for checkcat in missing_category:
266 if checkcat.endswith(':'):
267 if category.startswith(checkcat):
268 missing_category.pop(checkcat)
271 if category == checkcat:
272 missing_category.pop(checkcat)
274 for catlabel in missing_category.values():
275 logger.warning('%s: %s stats are missing -- possible cluster configuration issue',
276 self.label, catlabel)
278 def long_label(self):
280 if hasattr(self, 'process') and self.process['uuid'] not in label:
281 label = '{} ({})'.format(label, self.process['uuid'])
284 def elapsed_time(self):
285 if not self.finishtime:
288 s = (self.finishtime - self.starttime).total_seconds()
290 label += '{}d '.format(int(s/86400))
292 label += '{}h '.format(int(s/3600) % 24)
294 label += '{}m '.format(int(s/60) % 60)
295 label += '{}s'.format(int(s) % 60)
298 def text_report(self):
300 return "(no report generated)\n"
301 return "\n".join(itertools.chain(
302 self._text_report_table_gen(lambda x: "\t".join(x),
303 lambda x: "\t".join(x)),
304 self._text_report_agg_gen(lambda x: "# {}: {}{}".format(x[0], x[1], x[2])),
305 self._recommend_gen(lambda x: "#!! "+x))) + "\n"
307 def html_report(self):
308 return WEBCHART_CLASS(self.label, [self]).html()
310 def _text_report_table_gen(self, headerformat, rowformat):
311 yield headerformat(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
312 for category, stat_max in sorted(self.stats_max.items()):
313 for stat, val in sorted(stat_max.items()):
314 if stat.endswith('__rate'):
316 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
317 val = self._format(val)
318 tot = self._format(self.job_tot[category].get(stat, '-'))
319 yield rowformat([category, stat, str(val), max_rate, tot])
321 def _text_report_agg_gen(self, aggformat):
323 if len(self.tasks) > 1:
324 by_single_task = " by a single task"
333 '${:.3f}'.format(self.cost),
335 '') if self.cost > 0 else None,
337 ('Assigned instance type',
338 self.node_info.get('ProviderType'),
340 '') if self.node_info.get('ProviderType') else None,
342 ('Instance hourly price',
343 '${:.3f}'.format(self.node_info.get('Price')),
345 '') if self.node_info.get('Price') else None,
347 ('Max CPU usage in a single interval',
348 self.stats_max['cpu']['user+sys__rate'],
352 ('Overall CPU usage',
353 float(self.job_tot['cpu']['user+sys']) /
354 self.job_tot['time']['elapsed']
355 if self.job_tot['time']['elapsed'] > 0 else 0,
359 ('Requested CPU cores',
360 self.existing_constraints.get(self._map_runtime_constraint('vcpus')),
365 self.node_info.get('VCPUs'),
367 '') if self.node_info.get('VCPUs') else None,
369 ('Max memory used{}'.format(by_single_task),
370 self.stats_max['mem']['rss'],
375 self.existing_constraints.get(self._map_runtime_constraint('ram')),
379 ('Maximum RAM request for this instance type',
380 (self.node_info.get('RAM') - self.arv_config.get('Containers', {}).get('ReserveExtraRAM', {}))*.95,
384 ('Max network traffic{}'.format(by_single_task),
385 self.stats_max['net:eth0']['tx+rx'] +
386 self.stats_max['net:keep0']['tx+rx'],
390 ('Max network speed in a single interval',
391 self.stats_max['net:eth0']['tx+rx__rate'] +
392 self.stats_max['net:keep0']['tx+rx__rate'],
396 ('Keep cache miss rate',
397 (float(self.job_tot['keepcache']['miss']) /
398 float(self.job_tot['keepcalls']['get']))
399 if self.job_tot['keepcalls']['get'] > 0 else 0,
403 ('Keep cache utilization',
404 (float(self.job_tot['blkio:0:0']['read']) /
405 float(self.job_tot['net:keep0']['rx']))
406 if self.job_tot['net:keep0']['rx'] > 0 else 0,
410 ('Temp disk utilization',
411 (float(self.job_tot['statfs']['used']) /
412 float(self.job_tot['statfs']['total']))
413 if self.job_tot['statfs']['total'] > 0 else 0,
418 if len(self.tasks) > 1:
419 metrics.insert(0, ('Number of tasks',
426 format_string, val, transform, suffix = args
427 if val == float('-Inf'):
431 yield aggformat((format_string, self._format(val), suffix))
433 def _recommend_gen(self, recommendformat):
434 # TODO recommend fixing job granularity if elapsed time is too short
435 return itertools.chain(
436 self._recommend_cpu(recommendformat),
437 self._recommend_ram(recommendformat),
438 self._recommend_keep_cache(recommendformat),
439 self._recommend_temp_disk(recommendformat),
442 def _recommend_cpu(self, recommendformat):
443 """Recommend asking for 4 cores if max CPU usage was 333%"""
445 constraint_key = self._map_runtime_constraint('vcpus')
446 cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
447 if cpu_max_rate == float('-Inf') or cpu_max_rate == 0.0:
448 logger.warning('%s: no CPU usage data', self.label)
450 # TODO Don't necessarily want to recommend on isolated max peak
451 # take average CPU usage into account as well or % time at max
452 used_cores = max(1, int(math.ceil(cpu_max_rate)))
453 asked_cores = self.existing_constraints.get(constraint_key)
454 if asked_cores is None:
456 # TODO: This should be more nuanced in cases where max >> avg
457 if used_cores < asked_cores:
458 yield recommendformat(
459 '{} max CPU usage was {}% -- '
460 'try reducing runtime_constraints to "{}":{}'
463 math.ceil(cpu_max_rate*100),
467 # FIXME: This needs to be updated to account for current a-d-c algorithms
468 def _recommend_ram(self, recommendformat):
469 """Recommend an economical RAM constraint for this job.
471 Nodes that are advertised as "8 gibibytes" actually have what
472 we might call "8 nearlygibs" of memory available for jobs.
473 Here, we calculate a whole number of nearlygibs that would
474 have sufficed to run the job, then recommend requesting a node
475 with that number of nearlygibs (expressed as mebibytes).
477 Requesting a node with "nearly 8 gibibytes" is our best hope
478 of getting a node that actually has nearly 8 gibibytes
479 available. If the node manager is smart enough to account for
480 the discrepancy itself when choosing/creating a node, we'll
481 get an 8 GiB node with nearly 8 GiB available. Otherwise, the
482 advertised size of the next-size-smaller node (say, 6 GiB)
483 will be too low to satisfy our request, so we will effectively
484 get rounded up to 8 GiB.
486 For example, if we need 7500 MiB, we can ask for 7500 MiB, and
487 we will generally get a node that is advertised as "8 GiB" and
488 has at least 7500 MiB available. However, asking for 8192 MiB
489 would either result in an unnecessarily expensive 12 GiB node
490 (if node manager knows about the discrepancy), or an 8 GiB
491 node which has less than 8192 MiB available and is therefore
492 considered by crunch-dispatch to be too small to meet our
495 When node manager learns how to predict the available memory
496 for each node type such that crunch-dispatch always agrees
497 that a node is big enough to run the job it was brought up
498 for, all this will be unnecessary. We'll just ask for exactly
499 the memory we want -- even if that happens to be 8192 MiB.
502 constraint_key = self._map_runtime_constraint('ram')
503 used_bytes = self.stats_max['mem']['rss']
504 if used_bytes == float('-Inf'):
505 logger.warning('%s: no memory usage data', self.label)
507 used_mib = math.ceil(float(used_bytes) / MB)
508 asked_mib = self.existing_constraints.get(constraint_key) / MB
510 nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
512 recommend_mib = int(math.ceil(nearlygibs(used_mib/ratio))*AVAILABLE_RAM_RATIO*1024)
513 if used_mib > 0 and (used_mib / asked_mib) < ratio and asked_mib > recommend_mib:
514 yield recommendformat(
515 '{} requested {} MiB of RAM but actual RAM usage was below {}% at {} MiB -- '
516 'suggest reducing RAM request to {} MiB'
524 def _recommend_keep_cache(self, recommendformat):
525 """Recommend increasing keep cache if utilization < 80%"""
526 constraint_key = self._map_runtime_constraint('keep_cache_ram')
527 if self.job_tot['net:keep0']['rx'] == 0:
529 utilization = (float(self.job_tot['blkio:0:0']['read']) /
530 float(self.job_tot['net:keep0']['rx']))
531 # FIXME: the default on this get won't work correctly
532 asked_cache = self.existing_constraints.get(constraint_key, 256) * self._runtime_constraint_mem_unit()
534 if utilization < 0.8:
535 yield recommendformat(
536 '{} Keep cache utilization was {:.2f}% -- '
537 'try doubling runtime_constraints to "{}":{} (or more)'
542 math.ceil(asked_cache * 2 / self._runtime_constraint_mem_unit()))
545 def _recommend_temp_disk(self, recommendformat):
546 """Recommend decreasing temp disk if utilization < 50%"""
547 total = float(self.job_tot['statfs']['total'])
548 utilization = (float(self.job_tot['statfs']['used']) / total) if total > 0 else 0.0
550 if utilization < 50.8 and total > 0:
551 yield recommendformat(
552 '{} max temp disk utilization was {:.0f}% of {:.0f} MiB -- '
553 'consider reducing "tmpdirMin" and/or "outdirMin"'
560 def _format(self, val):
561 """Return a string representation of a stat.
563 {:.2f} for floats, default format for everything else."""
564 if isinstance(val, float):
565 return '{:.2f}'.format(val)
567 return '{}'.format(val)
569 def _runtime_constraint_mem_unit(self):
570 if hasattr(self, 'runtime_constraint_mem_unit'):
571 return self.runtime_constraint_mem_unit
572 elif self.detected_crunch1:
573 return JobSummarizer.runtime_constraint_mem_unit
575 return ContainerRequestSummarizer.runtime_constraint_mem_unit
577 def _map_runtime_constraint(self, key):
578 if hasattr(self, 'map_runtime_constraint'):
579 return self.map_runtime_constraint[key]
580 elif self.detected_crunch1:
581 return JobSummarizer.map_runtime_constraint[key]
586 class CollectionSummarizer(Summarizer):
587 def __init__(self, collection_id, **kwargs):
588 super(CollectionSummarizer, self).__init__(
589 crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
590 self.label = collection_id
593 def NewSummarizer(process_or_uuid, **kwargs):
594 """Construct with the appropriate subclass for this uuid/object."""
596 if isinstance(process_or_uuid, dict):
597 process = process_or_uuid
598 uuid = process['uuid']
600 uuid = process_or_uuid
602 arv = kwargs.get("arv") or arvados.api('v1')
604 if '-dz642-' in uuid:
606 # Get the associated CR. Doesn't matter which since they all have the same logs
607 crs = arv.container_requests().list(filters=[['container_uuid','=',uuid]],limit=1).execute()['items']
610 klass = ContainerRequestTreeSummarizer
611 elif '-xvhdp-' in uuid:
613 process = arv.container_requests().get(uuid=uuid).execute()
614 klass = ContainerRequestTreeSummarizer
615 elif '-8i9sb-' in uuid:
617 process = arv.jobs().get(uuid=uuid).execute()
618 klass = JobTreeSummarizer
619 elif '-d1hrv-' in uuid:
621 process = arv.pipeline_instances().get(uuid=uuid).execute()
622 klass = PipelineSummarizer
623 elif '-4zz18-' in uuid:
624 return CollectionSummarizer(collection_id=uuid)
626 raise ArgumentError("Unrecognized uuid %s", uuid)
627 return klass(process, uuid=uuid, **kwargs)
630 class ProcessSummarizer(Summarizer):
631 """Process is a job, pipeline, or container request."""
633 def __init__(self, process, label=None, **kwargs):
635 self.process = process
636 arv = kwargs.get("arv") or arvados.api('v1')
638 label = self.process.get('name', self.process['uuid'])
639 # Pre-Arvados v1.4 everything is in 'log'
640 # For 1.4+ containers have no logs and container_requests have them in 'log_uuid', not 'log'
641 log_collection = self.process.get('log', self.process.get('log_uuid'))
642 if log_collection and self.process.get('state') != 'Uncommitted': # arvados.util.CR_UNCOMMITTED:
644 rdr = crunchstat_summary.reader.CollectionReader(log_collection, api_client=arv)
645 except arvados.errors.NotFoundError as e:
646 logger.warning("Trying event logs after failing to read "
647 "log collection %s: %s", self.process['log'], e)
649 uuid = self.process.get('container_uuid', self.process.get('uuid'))
650 rdr = crunchstat_summary.reader.LiveLogReader(uuid)
651 label = label + ' (partial)'
653 super(ProcessSummarizer, self).__init__(rdr, label=label, **kwargs)
654 self.existing_constraints = self.process.get('runtime_constraints', {})
655 self.arv_config = arv.config()
656 self.cost = self.process.get('cost', 0)
660 class JobSummarizer(ProcessSummarizer):
661 runtime_constraint_mem_unit = MB
662 map_runtime_constraint = {
663 'keep_cache_ram': 'keep_cache_mb_per_task',
664 'ram': 'min_ram_mb_per_node',
665 'vcpus': 'min_cores_per_node',
669 class ContainerRequestSummarizer(ProcessSummarizer):
670 runtime_constraint_mem_unit = 1
673 class MultiSummarizer(object):
674 def __init__(self, children={}, label=None, threads=1, **kwargs):
675 self.children = children
677 self.threadcount = threads
680 if self.threadcount > 1 and len(self.children) > 1:
682 def run_and_progress(child):
685 except Exception as e:
686 logger.exception("parse error")
688 logger.info("%s/%s summarized %s", completed, len(self.children), child.label)
689 with ThreadPoolExecutor(max_workers=self.threadcount) as tpe:
690 for child in self.children.values():
691 tpe.submit(run_and_progress, child)
693 for child in self.children.values():
696 def text_report(self):
698 d = self._descendants()
699 for child in d.values():
701 txt += '### Summary for {} ({})\n'.format(
702 child.label, child.process['uuid'])
703 txt += child.text_report()
707 def _descendants(self):
708 """Dict of self and all descendants.
710 Nodes with nothing of their own to report (like
711 MultiSummarizers) are omitted.
713 d = collections.OrderedDict()
714 for key, child in self.children.items():
715 if isinstance(child, Summarizer):
717 if isinstance(child, MultiSummarizer):
718 d.update(child._descendants())
721 def html_report(self):
725 if len(self._descendants()) == 1:
726 summarizer = next(iter(self._descendants().values()))
727 tophtml = """{}\n<table class='aggtable'><tbody>{}</tbody></table>\n""".format(
728 "\n".join(summarizer._recommend_gen(lambda x: "<p>{}</p>".format(x))),
729 "\n".join(summarizer._text_report_agg_gen(lambda x: "<tr><th>{}</th><td>{}{}</td></tr>".format(*x))))
731 bottomhtml = """<table class='metricstable'><tbody>{}</tbody></table>\n""".format(
732 "\n".join(summarizer._text_report_table_gen(lambda x: "<tr><th>{}</th><th>{}</th><th>{}</th><th>{}</th><th>{}</th></tr>".format(*x),
733 lambda x: "<tr><td>{}</td><td>{}</td><td>{}</td><td>{}</td><td>{}</td></tr>".format(*x))))
734 label = summarizer.long_label()
736 return WEBCHART_CLASS(label, iter(self._descendants().values())).html(tophtml, bottomhtml)
739 class JobTreeSummarizer(MultiSummarizer):
740 """Summarizes a job and all children listed in its components field."""
741 def __init__(self, job, label=None, **kwargs):
742 arv = kwargs.get("arv") or arvados.api('v1')
743 label = label or job.get('name', job['uuid'])
744 children = collections.OrderedDict()
745 children[job['uuid']] = JobSummarizer(job, label=label, **kwargs)
746 if job.get('components', None):
748 for j in arv.jobs().index(
749 limit=len(job['components']),
750 filters=[['uuid','in',list(job['components'].values())]]).execute()['items']:
751 preloaded[j['uuid']] = j
752 for cname in sorted(job['components'].keys()):
753 child_uuid = job['components'][cname]
754 j = (preloaded.get(child_uuid) or
755 arv.jobs().get(uuid=child_uuid).execute())
756 children[child_uuid] = JobTreeSummarizer(job=j, label=cname, **kwargs)
758 super(JobTreeSummarizer, self).__init__(
764 class PipelineSummarizer(MultiSummarizer):
765 def __init__(self, instance, **kwargs):
766 children = collections.OrderedDict()
767 for cname, component in instance['components'].items():
768 if 'job' not in component:
770 "%s: skipping component with no job assigned", cname)
773 "%s: job %s", cname, component['job']['uuid'])
774 summarizer = JobTreeSummarizer(component['job'], label=cname, **kwargs)
775 summarizer.label = '{} {}'.format(
776 cname, component['job']['uuid'])
777 children[cname] = summarizer
778 super(PipelineSummarizer, self).__init__(
780 label=instance['uuid'],
784 class ContainerRequestTreeSummarizer(MultiSummarizer):
785 def __init__(self, root, skip_child_jobs=False, **kwargs):
786 arv = kwargs.get("arv") or arvados.api('v1')
788 label = kwargs.pop('label', None) or root.get('name') or root['uuid']
791 children = collections.OrderedDict()
792 todo = collections.deque((root, ))
794 current = todo.popleft()
795 label = current['name']
796 sort_key = current['created_at']
798 summer = ContainerRequestSummarizer(current, label=label, **kwargs)
799 summer.sort_key = sort_key
800 children[current['uuid']] = summer
803 child_crs = arv.container_requests().list(filters=[['requesting_container_uuid', '=', current['container_uuid']]],
805 logger.warning('%s: omitting stats from child containers'
806 ' because --skip-child-jobs flag is on',
807 label, child_crs['items_available'])
809 for cr in arvados.util.keyset_list_all(arv.container_requests().list,
810 filters=[['requesting_container_uuid', '=', current['container_uuid']]]):
811 if cr['container_uuid']:
812 logger.debug('%s: container req %s', current['uuid'], cr['uuid'])
813 cr['name'] = cr.get('name') or cr['uuid']
815 sorted_children = collections.OrderedDict()
816 for uuid in sorted(list(children.keys()), key=lambda uuid: children[uuid].sort_key):
817 sorted_children[uuid] = children[uuid]
818 super(ContainerRequestTreeSummarizer, self).__init__(
819 children=sorted_children,