1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
7 import crunchstat_summary.dygraphs
8 import crunchstat_summary.reader
18 from concurrent.futures import ThreadPoolExecutor
20 from crunchstat_summary import logger
22 # Recommend memory constraints that are this multiple of an integral
23 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
24 # that have amounts like 7.5 GiB according to the kernel.)
25 AVAILABLE_RAM_RATIO = 0.90
28 # Workaround datetime.datetime.strptime() thread-safety bug by calling
29 # it once before starting threads. https://bugs.python.org/issue7980
30 datetime.datetime.strptime('1999-12-31_23:59:59', '%Y-%m-%d_%H:%M:%S')
33 WEBCHART_CLASS = crunchstat_summary.dygraphs.DygraphsChart
39 self.finishtime = None
40 self.series = collections.defaultdict(list)
43 class Summarizer(object):
44 def __init__(self, logdata, label=None, skip_child_jobs=False, uuid=None, **kwargs):
45 self._logdata = logdata
50 self.finishtime = None
51 self._skip_child_jobs = skip_child_jobs
53 # stats_max: {category: {stat: val}}
54 self.stats_max = collections.defaultdict(
55 functools.partial(collections.defaultdict, lambda: 0))
56 # task_stats: {task_id: {category: {stat: val}}}
57 self.task_stats = collections.defaultdict(
58 functools.partial(collections.defaultdict, dict))
61 self.tasks = collections.defaultdict(Task)
63 # We won't bother recommending new runtime constraints if the
64 # constraints given when running the job are known to us and
65 # are already suitable. If applicable, the subclass
66 # constructor will overwrite this with something useful.
67 self.existing_constraints = {}
72 logger.info("%s: logdata %s", self.label, logdata)
75 logger.debug("%s: parsing logdata %s", self.label, self._logdata)
76 with self._logdata as logdata:
79 def _run(self, logdata):
80 if not self.node_info:
81 self.node_info = logdata.node_info()
85 # 2017-12-01T16:56:24.723509200Z crunchstat: keepcalls 0 put 3 get -- interval 10.0000 seconds 0 put 3 get
86 m = re.search(r'^(?P<timestamp>\S+) (?P<crunchstat>crunchstat: )?(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
90 if self.label is None:
92 self.label = m.group('job_uuid')
94 self.label = 'label #1'
97 task = self.tasks[task_id]
99 # Use the first and last crunchstat timestamps as
100 # approximations of starttime and finishtime.
101 timestamp = m.group('timestamp')
102 if timestamp[10:11] == '_':
103 timestamp = datetime.datetime.strptime(
104 timestamp, '%Y-%m-%d_%H:%M:%S')
105 elif timestamp[10:11] == 'T':
106 timestamp = datetime.datetime.strptime(
107 timestamp[:19], '%Y-%m-%dT%H:%M:%S')
109 raise ValueError("Cannot parse timestamp {!r}".format(
112 if task.starttime is None:
113 logger.debug('%s: task %s starttime %s',
114 self.label, task_id, timestamp)
115 if task.starttime is None or timestamp < task.starttime:
116 task.starttime = timestamp
117 if task.finishtime is None or timestamp > task.finishtime:
118 task.finishtime = timestamp
120 if self.starttime is None or timestamp < self.starttime:
121 self.starttime = timestamp
122 if self.finishtime is None or timestamp > self.finishtime:
123 self.finishtime = timestamp
125 if task.starttime is not None and task.finishtime is not None:
126 elapsed = (task.finishtime - task.starttime).seconds
127 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
128 if elapsed > self.stats_max['time']['elapsed']:
129 self.stats_max['time']['elapsed'] = elapsed
131 category = m.group('category')
132 if category.endswith(':'):
133 # "stderr crunchstat: notice: ..."
135 elif category in ('error', 'caught'):
137 elif category in ('read', 'open', 'cgroup', 'CID', 'Running'):
138 # "stderr crunchstat: read /proc/1234/net/dev: ..."
139 # (old logs are less careful with unprefixed error messages)
142 this_interval_s = None
143 for group in ['current', 'interval']:
144 if not m.group(group):
146 category = m.group('category')
147 words = m.group(group).split(' ')
150 for val, stat in zip(words[::2], words[1::2]):
152 stats[stat] = float(val)
154 stats[stat] = int(val)
155 except ValueError as e:
156 # If the line doesn't start with 'crunchstat:' we
157 # might have mistaken an error message for a
158 # structured crunchstat line.
159 if m.group("crunchstat") is None or m.group("category") == "crunchstat":
160 logger.warning("%s: log contains message\n %s", self.label, line)
163 '%s: Error parsing value %r (stat %r, category %r): %r',
164 self.label, val, stat, category, e)
165 logger.warning('%s', line)
167 if 'user' in stats or 'sys' in stats:
168 stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
169 if 'tx' in stats or 'rx' in stats:
170 stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
171 if group == 'interval':
172 if 'seconds' in stats:
173 this_interval_s = stats.get('seconds',0)
175 if this_interval_s <= 0:
177 "BUG? interval stat given with duration {!r}".
178 format(this_interval_s))
180 logger.error('BUG? interval stat missing duration')
181 for stat, val in stats.items():
182 if group == 'interval' and this_interval_s:
183 stat = stat + '__rate'
184 val = val / this_interval_s
185 if stat in ['user+sys__rate', 'user__rate', 'sys__rate', 'tx+rx__rate', 'rx__rate', 'tx__rate']:
186 task.series[category, stat].append(
187 (timestamp - self.starttime, val))
189 if stat in ['rss','used','total']:
190 task.series[category, stat].append(
191 (timestamp - self.starttime, val))
192 self.task_stats[task_id][category][stat] = val
193 if val > self.stats_max[category][stat]:
194 self.stats_max[category][stat] = val
195 logger.debug('%s: done parsing', self.label)
197 self.job_tot = collections.defaultdict(
198 functools.partial(collections.defaultdict, int))
199 for task_id, task_stat in self.task_stats.items():
200 for category, stat_last in task_stat.items():
201 for stat, val in stat_last.items():
202 if stat in ['cpus', 'cache', 'swap', 'rss']:
203 # meaningless stats like 16 cpu cores x 5 tasks = 80
205 self.job_tot[category][stat] += val
206 logger.debug('%s: done totals', self.label)
208 if self.stats_max['time'].get('elapsed', 0) > 20:
209 # needs to have executed for at least 20 seconds or we may
210 # not have collected any metrics and these warnings are duds.
214 'net:': 'network I/O',
215 'statfs': 'storage space',
217 for task_stat in self.task_stats.values():
218 for category in task_stat.keys():
219 for checkcat in missing_category:
220 if checkcat.endswith(':'):
221 if category.startswith(checkcat):
222 missing_category.pop(checkcat)
225 if category == checkcat:
226 missing_category.pop(checkcat)
228 for catlabel in missing_category.values():
229 logger.warning('%s: %s stats are missing -- possible cluster configuration issue',
230 self.label, catlabel)
232 def long_label(self):
234 if hasattr(self, 'process') and self.process['uuid'] not in label:
235 label = '{} ({})'.format(label, self.process['uuid'])
238 def elapsed_time(self):
239 if not self.finishtime:
242 s = (self.finishtime - self.starttime).total_seconds()
244 label += '{}d '.format(int(s/86400))
246 label += '{}h '.format(int(s/3600) % 24)
248 label += '{}m '.format(int(s/60) % 60)
249 label += '{}s'.format(int(s) % 60)
252 def text_report(self):
254 return "(no report generated)\n"
255 return "\n".join(itertools.chain(
256 self._text_report_table_gen(lambda x: "\t".join(x),
257 lambda x: "\t".join(x)),
258 self._text_report_agg_gen(lambda x: "# {}: {}{}".format(x[0], x[1], x[2])),
259 self._recommend_gen(lambda x: "#!! "+x))) + "\n"
261 def html_report(self):
262 tophtml = """<h2>Summary</h2>{}\n<table class='aggtable'><tbody>{}</tbody></table>\n""".format(
263 "\n".join(self._recommend_gen(lambda x: "<p>{}</p>".format(x))),
264 "\n".join(self._text_report_agg_gen(lambda x: "<tr><th>{}</th><td>{}{}</td></tr>".format(*x))))
266 bottomhtml = """<h2>Metrics</h2><table class='metricstable'><tbody>{}</tbody></table>\n""".format(
267 "\n".join(self._text_report_table_gen(lambda x: "<tr><th>{}</th><th>{}</th><th>{}</th><th>{}</th><th>{}</th></tr>".format(*x),
268 lambda x: "<tr><td>{}</td><td>{}</td><td>{}</td><td>{}</td><td>{}</td></tr>".format(*x))))
269 label = self.long_label()
271 return WEBCHART_CLASS(label, [self], [tophtml], [bottomhtml]).html()
273 def _text_report_table_gen(self, headerformat, rowformat):
274 yield headerformat(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
275 for category, stat_max in sorted(self.stats_max.items()):
276 for stat, val in sorted(stat_max.items()):
277 if stat.endswith('__rate'):
279 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
280 val = self._format(val)
281 tot = self._format(self.job_tot[category].get(stat, '-'))
282 yield rowformat([category, stat, str(val), max_rate, tot])
284 def _text_report_agg_gen(self, aggformat):
286 if len(self.tasks) > 1:
287 by_single_task = " by a single task"
296 '${:.3f}'.format(self.cost),
298 '') if self.cost > 0 else None,
300 ('Assigned instance type',
301 self.node_info.get('ProviderType'),
303 '') if self.node_info.get('ProviderType') else None,
305 ('Instance hourly price',
306 '${:.3f}'.format(self.node_info.get('Price')),
308 '') if self.node_info.get('Price') else None,
310 ('Max CPU usage in a single interval',
311 self.stats_max['cpu']['user+sys__rate'],
315 ('Overall CPU usage',
316 float(self.job_tot['cpu']['user+sys']) /
317 self.job_tot['time']['elapsed']
318 if self.job_tot['time']['elapsed'] > 0 else 0,
322 ('Requested CPU cores',
323 self.existing_constraints.get(self._map_runtime_constraint('vcpus')),
325 '') if self.existing_constraints.get(self._map_runtime_constraint('vcpus')) else None,
328 self.node_info.get('VCPUs'),
330 '') if self.node_info.get('VCPUs') else None,
332 ('Max memory used{}'.format(by_single_task),
333 self.stats_max['mem']['rss'],
338 self.existing_constraints.get(self._map_runtime_constraint('ram')),
340 'MB') if self.existing_constraints.get(self._map_runtime_constraint('ram')) else None,
342 ('Maximum RAM request for this instance type',
343 (self.node_info.get('RAM') - self.arv_config.get('Containers', {}).get('ReserveExtraRAM', 0))*.95,
345 'MB') if self.node_info.get('RAM') else None,
347 ('Max network traffic{}'.format(by_single_task),
348 self.stats_max['net:eth0']['tx+rx'] +
349 self.stats_max['net:keep0']['tx+rx'],
353 ('Max network speed in a single interval',
354 self.stats_max['net:eth0']['tx+rx__rate'] +
355 self.stats_max['net:keep0']['tx+rx__rate'],
359 ('Keep cache miss rate',
360 (float(self.job_tot['keepcache']['miss']) /
361 float(self.job_tot['keepcalls']['get']))
362 if self.job_tot['keepcalls']['get'] > 0 else 0,
366 ('Keep cache utilization',
367 (float(self.job_tot['blkio:0:0']['read']) /
368 float(self.job_tot['net:keep0']['rx']))
369 if self.job_tot['net:keep0']['rx'] > 0 else 0,
373 ('Temp disk utilization',
374 (float(self.job_tot['statfs']['used']) /
375 float(self.job_tot['statfs']['total']))
376 if self.job_tot['statfs']['total'] > 0 else 0,
381 if len(self.tasks) > 1:
382 metrics.insert(0, ('Number of tasks',
389 format_string, val, transform, suffix = args
390 if val == float('-Inf'):
394 yield aggformat((format_string, self._format(val), suffix))
396 def _recommend_gen(self, recommendformat):
397 # TODO recommend fixing job granularity if elapsed time is too short
399 if self.stats_max['time'].get('elapsed', 0) <= 20:
403 return itertools.chain(
404 self._recommend_cpu(recommendformat),
405 self._recommend_ram(recommendformat),
406 self._recommend_keep_cache(recommendformat),
407 self._recommend_temp_disk(recommendformat),
410 def _recommend_cpu(self, recommendformat):
411 """Recommend asking for 4 cores if max CPU usage was 333%"""
413 constraint_key = self._map_runtime_constraint('vcpus')
414 cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
415 if cpu_max_rate == float('-Inf') or cpu_max_rate == 0.0:
416 logger.warning('%s: no CPU usage data', self.label)
418 # TODO Don't necessarily want to recommend on isolated max peak
419 # take average CPU usage into account as well or % time at max
420 used_cores = max(1, int(math.ceil(cpu_max_rate)))
421 asked_cores = self.existing_constraints.get(constraint_key)
422 if asked_cores is None:
425 if used_cores < (asked_cores*.5):
426 yield recommendformat(
427 '{} peak CPU usage was only {}% out of possible {}% ({} cores requested)'
430 math.ceil(cpu_max_rate*100),
431 asked_cores*100, asked_cores)
433 # FIXME: This needs to be updated to account for current a-d-c algorithms
434 def _recommend_ram(self, recommendformat):
435 """Recommend an economical RAM constraint for this job.
437 Nodes that are advertised as "8 gibibytes" actually have what
438 we might call "8 nearlygibs" of memory available for jobs.
439 Here, we calculate a whole number of nearlygibs that would
440 have sufficed to run the job, then recommend requesting a node
441 with that number of nearlygibs (expressed as mebibytes).
443 Requesting a node with "nearly 8 gibibytes" is our best hope
444 of getting a node that actually has nearly 8 gibibytes
445 available. If the node manager is smart enough to account for
446 the discrepancy itself when choosing/creating a node, we'll
447 get an 8 GiB node with nearly 8 GiB available. Otherwise, the
448 advertised size of the next-size-smaller node (say, 6 GiB)
449 will be too low to satisfy our request, so we will effectively
450 get rounded up to 8 GiB.
452 For example, if we need 7500 MiB, we can ask for 7500 MiB, and
453 we will generally get a node that is advertised as "8 GiB" and
454 has at least 7500 MiB available. However, asking for 8192 MiB
455 would either result in an unnecessarily expensive 12 GiB node
456 (if node manager knows about the discrepancy), or an 8 GiB
457 node which has less than 8192 MiB available and is therefore
458 considered by crunch-dispatch to be too small to meet our
461 When node manager learns how to predict the available memory
462 for each node type such that crunch-dispatch always agrees
463 that a node is big enough to run the job it was brought up
464 for, all this will be unnecessary. We'll just ask for exactly
465 the memory we want -- even if that happens to be 8192 MiB.
468 constraint_key = self._map_runtime_constraint('ram')
469 used_bytes = self.stats_max['mem']['rss']
470 if used_bytes == float('-Inf'):
471 logger.warning('%s: no memory usage data', self.label)
473 if not self.existing_constraints.get(constraint_key):
475 used_mib = math.ceil(float(used_bytes) / MB)
476 asked_mib = self.existing_constraints.get(constraint_key) / MB
478 nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
480 recommend_mib = int(math.ceil(nearlygibs(used_mib/ratio))*AVAILABLE_RAM_RATIO*1024)
481 if used_mib > 0 and (used_mib / asked_mib) < ratio and asked_mib > recommend_mib:
482 yield recommendformat(
483 '{} peak RAM usage was only {}% ({} MiB used / {} MiB requested)'
486 int(math.ceil(100*(used_mib / asked_mib))),
490 def _recommend_keep_cache(self, recommendformat):
491 """Recommend increasing keep cache if utilization < 50%.
493 This means the amount of data returned to the program is less
494 than 50% of the amount of data actually downloaded by
498 if self.job_tot['net:keep0']['rx'] == 0:
501 miss_rate = (float(self.job_tot['keepcache']['miss']) /
502 float(self.job_tot['keepcalls']['get']))
504 utilization = (float(self.job_tot['blkio:0:0']['read']) /
505 float(self.job_tot['net:keep0']['rx']))
506 # FIXME: the default on this get won't work correctly
507 asked_cache = self.existing_constraints.get('keep_cache_ram') or self.existing_constraints.get('keep_cache_disk')
509 if utilization < 0.5 and miss_rate > .05:
510 yield recommendformat(
511 '{} Keep cache utilization was only {:.2f}% and miss rate was {:.2f}% -- '
512 'recommend increasing keep_cache'
519 def _recommend_temp_disk(self, recommendformat):
520 """This recommendation is disabled for the time being. It was
521 using the total disk on the node and not the amount of disk
522 requested, so it would trigger a false positive basically
523 every time. To get the amount of disk requested we need to
524 fish it out of the mounts, which is extra work I don't want do
525 right now. You can find the old code at commit 616d135e77
532 def _format(self, val):
533 """Return a string representation of a stat.
535 {:.2f} for floats, default format for everything else."""
536 if isinstance(val, float):
537 return '{:.2f}'.format(val)
539 return '{}'.format(val)
541 def _runtime_constraint_mem_unit(self):
542 if hasattr(self, 'runtime_constraint_mem_unit'):
543 return self.runtime_constraint_mem_unit
545 return ContainerRequestSummarizer.runtime_constraint_mem_unit
547 def _map_runtime_constraint(self, key):
551 class CollectionSummarizer(Summarizer):
552 def __init__(self, collection_id, **kwargs):
553 super(CollectionSummarizer, self).__init__(
554 crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
555 self.label = collection_id
558 def NewSummarizer(process_or_uuid, **kwargs):
559 """Construct with the appropriate subclass for this uuid/object."""
561 if isinstance(process_or_uuid, dict):
562 process = process_or_uuid
563 uuid = process['uuid']
565 uuid = process_or_uuid
567 arv = kwargs.get("arv") or arvados.api('v1')
569 if '-dz642-' in uuid:
571 # Get the associated CR. Doesn't matter which since they all have the same logs
572 crs = arv.container_requests().list(filters=[['container_uuid','=',uuid]],limit=1).execute()['items']
575 klass = ContainerRequestTreeSummarizer
576 elif '-xvhdp-' in uuid:
578 process = arv.container_requests().get(uuid=uuid).execute()
579 klass = ContainerRequestTreeSummarizer
580 elif '-4zz18-' in uuid:
581 return CollectionSummarizer(collection_id=uuid)
583 raise ArgumentError("Unrecognized uuid %s", uuid)
584 return klass(process, uuid=uuid, **kwargs)
587 class ProcessSummarizer(Summarizer):
588 """Process is a job, pipeline, or container request."""
590 def __init__(self, process, label=None, **kwargs):
592 self.process = process
593 arv = kwargs.get("arv") or arvados.api('v1')
595 label = self.process.get('name', self.process['uuid'])
596 # Pre-Arvados v1.4 everything is in 'log'
597 # For 1.4+ containers have no logs and container_requests have them in 'log_uuid', not 'log'
598 log_collection = self.process.get('log', self.process.get('log_uuid'))
599 if log_collection and self.process.get('state') != 'Uncommitted': # arvados.util.CR_UNCOMMITTED:
601 rdr = crunchstat_summary.reader.CollectionReader(
604 collection_object=kwargs.get("collection_object"))
605 except arvados.errors.NotFoundError as e:
606 logger.warning("Trying event logs after failing to read "
607 "log collection %s: %s", self.process['log'], e)
609 uuid = self.process.get('container_uuid', self.process.get('uuid'))
610 rdr = crunchstat_summary.reader.LiveLogReader(uuid)
611 label = label + ' (partial)'
613 super(ProcessSummarizer, self).__init__(rdr, label=label, **kwargs)
614 self.existing_constraints = self.process.get('runtime_constraints', {})
615 self.arv_config = arv.config()
616 self.cost = self.process.get('cost', 0)
619 class ContainerRequestSummarizer(ProcessSummarizer):
620 runtime_constraint_mem_unit = 1
623 class MultiSummarizer(object):
624 def __init__(self, children={}, label=None, threads=1, **kwargs):
625 self.children = children
627 self.threadcount = threads
630 if self.threadcount > 1 and len(self.children) > 1:
632 def run_and_progress(child):
635 except Exception as e:
636 logger.exception("parse error")
638 logger.info("%s/%s summarized %s", completed, len(self.children), child.label)
639 with ThreadPoolExecutor(max_workers=self.threadcount) as tpe:
640 for child in self.children.values():
641 tpe.submit(run_and_progress, child)
643 for child in self.children.values():
646 def text_report(self):
648 d = self._descendants()
649 for child in d.values():
651 txt += '### Summary for {} ({})\n'.format(
652 child.label, child.process['uuid'])
653 txt += child.text_report()
657 def _descendants(self):
658 """Dict of self and all descendants.
660 Nodes with nothing of their own to report (like
661 MultiSummarizers) are omitted.
663 d = collections.OrderedDict()
664 for key, child in self.children.items():
665 if isinstance(child, Summarizer):
667 if isinstance(child, MultiSummarizer):
668 d.update(child._descendants())
671 def html_report(self):
675 if len(self._descendants()) == 1:
676 summarizer = next(iter(self._descendants().values()))
677 tophtml = """{}\n<table class='aggtable'><tbody>{}</tbody></table>\n""".format(
678 "\n".join(summarizer._recommend_gen(lambda x: "<p>{}</p>".format(x))),
679 "\n".join(summarizer._text_report_agg_gen(lambda x: "<tr><th>{}</th><td>{}{}</td></tr>".format(*x))))
681 bottomhtml = """<table class='metricstable'><tbody>{}</tbody></table>\n""".format(
682 "\n".join(summarizer._text_report_table_gen(lambda x: "<tr><th>{}</th><th>{}</th><th>{}</th><th>{}</th><th>{}</th></tr>".format(*x),
683 lambda x: "<tr><td>{}</td><td>{}</td><td>{}</td><td>{}</td><td>{}</td></tr>".format(*x))))
684 label = summarizer.long_label()
686 return WEBCHART_CLASS(label, iter(self._descendants().values()), [tophtml], [bottomhtml]).html()
689 class ContainerRequestTreeSummarizer(MultiSummarizer):
690 def __init__(self, root, skip_child_jobs=False, **kwargs):
691 arv = kwargs.get("arv") or arvados.api('v1')
693 label = kwargs.pop('label', None) or root.get('name') or root['uuid']
696 children = collections.OrderedDict()
697 todo = collections.deque((root, ))
699 current = todo.popleft()
700 label = current['name']
701 sort_key = current['created_at']
703 summer = ContainerRequestSummarizer(current, label=label, **kwargs)
704 summer.sort_key = sort_key
705 children[current['uuid']] = summer
708 child_crs = arv.container_requests().list(filters=[['requesting_container_uuid', '=', current['container_uuid']]],
710 logger.warning('%s: omitting stats from child containers'
711 ' because --skip-child-jobs flag is on',
712 label, child_crs['items_available'])
714 for cr in arvados.util.keyset_list_all(arv.container_requests().list,
715 filters=[['requesting_container_uuid', '=', current['container_uuid']]]):
716 if cr['container_uuid']:
717 logger.debug('%s: container req %s', current['uuid'], cr['uuid'])
718 cr['name'] = cr.get('name') or cr['uuid']
720 sorted_children = collections.OrderedDict()
721 for uuid in sorted(list(children.keys()), key=lambda uuid: children[uuid].sort_key):
722 sorted_children[uuid] = children[uuid]
723 super(ContainerRequestTreeSummarizer, self).__init__(
724 children=sorted_children,