1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
7 import crunchstat_summary.dygraphs
8 import crunchstat_summary.reader
18 from concurrent.futures import ThreadPoolExecutor
20 from crunchstat_summary import logger
22 # Recommend memory constraints that are this multiple of an integral
23 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
24 # that have amounts like 7.5 GiB according to the kernel.)
25 AVAILABLE_RAM_RATIO = 0.90
28 # Workaround datetime.datetime.strptime() thread-safety bug by calling
29 # it once before starting threads. https://bugs.python.org/issue7980
30 datetime.datetime.strptime('1999-12-31_23:59:59', '%Y-%m-%d_%H:%M:%S')
33 WEBCHART_CLASS = crunchstat_summary.dygraphs.DygraphsChart
39 self.finishtime = None
40 self.series = collections.defaultdict(list)
43 class Summarizer(object):
44 def __init__(self, logdata, label=None, skip_child_jobs=False, uuid=None, **kwargs):
45 self._logdata = logdata
50 self.finishtime = None
51 self._skip_child_jobs = skip_child_jobs
53 # stats_max: {category: {stat: val}}
54 self.stats_max = collections.defaultdict(
55 functools.partial(collections.defaultdict, lambda: 0))
56 # task_stats: {task_id: {category: {stat: val}}}
57 self.task_stats = collections.defaultdict(
58 functools.partial(collections.defaultdict, dict))
61 self.tasks = collections.defaultdict(Task)
63 # We won't bother recommending new runtime constraints if the
64 # constraints given when running the job are known to us and
65 # are already suitable. If applicable, the subclass
66 # constructor will overwrite this with something useful.
67 self.existing_constraints = {}
72 logger.info("%s: logdata %s", self.label, logdata)
75 logger.debug("%s: parsing logdata %s", self.label, self._logdata)
76 with self._logdata as logdata:
79 def _run(self, logdata):
80 if not self.node_info:
81 self.node_info = logdata.node_info()
85 # 2017-12-01T16:56:24.723509200Z crunchstat: keepcalls 0 put 3 get -- interval 10.0000 seconds 0 put 3 get
86 m = re.search(r'^(?P<timestamp>\S+) (?P<crunchstat>crunchstat: )?(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
90 if self.label is None:
92 self.label = m.group('job_uuid')
94 self.label = 'label #1'
97 task = self.tasks[task_id]
99 # Use the first and last crunchstat timestamps as
100 # approximations of starttime and finishtime.
101 timestamp = m.group('timestamp')
102 if timestamp[10:11] == '_':
103 timestamp = datetime.datetime.strptime(
104 timestamp, '%Y-%m-%d_%H:%M:%S')
105 elif timestamp[10:11] == 'T':
106 timestamp = datetime.datetime.strptime(
107 timestamp[:19], '%Y-%m-%dT%H:%M:%S')
109 raise ValueError("Cannot parse timestamp {!r}".format(
112 if task.starttime is None:
113 logger.debug('%s: task %s starttime %s',
114 self.label, task_id, timestamp)
115 if task.starttime is None or timestamp < task.starttime:
116 task.starttime = timestamp
117 if task.finishtime is None or timestamp > task.finishtime:
118 task.finishtime = timestamp
120 if self.starttime is None or timestamp < self.starttime:
121 self.starttime = timestamp
122 if self.finishtime is None or timestamp > self.finishtime:
123 self.finishtime = timestamp
125 if task.starttime is not None and task.finishtime is not None:
126 elapsed = (task.finishtime - task.starttime).seconds
127 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
128 if elapsed > self.stats_max['time']['elapsed']:
129 self.stats_max['time']['elapsed'] = elapsed
131 category = m.group('category')
132 if category.endswith(':'):
133 # "stderr crunchstat: notice: ..."
135 elif category in ('error', 'caught'):
137 elif category in ('read', 'open', 'cgroup', 'CID', 'Running'):
138 # "stderr crunchstat: read /proc/1234/net/dev: ..."
139 # (old logs are less careful with unprefixed error messages)
142 this_interval_s = None
143 for group in ['current', 'interval']:
144 if not m.group(group):
146 category = m.group('category')
147 words = m.group(group).split(' ')
150 for val, stat in zip(words[::2], words[1::2]):
152 stats[stat] = float(val)
154 stats[stat] = int(val)
155 except ValueError as e:
156 # If the line doesn't start with 'crunchstat:' we
157 # might have mistaken an error message for a
158 # structured crunchstat line.
159 if m.group("crunchstat") is None or m.group("category") == "crunchstat":
160 logger.warning("%s: log contains message\n %s", self.label, line)
163 '%s: Error parsing value %r (stat %r, category %r): %r',
164 self.label, val, stat, category, e)
165 logger.warning('%s', line)
167 if 'user' in stats or 'sys' in stats:
168 stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
169 if 'tx' in stats or 'rx' in stats:
170 stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
171 if group == 'interval':
172 if 'seconds' in stats:
173 this_interval_s = stats.get('seconds',0)
175 if this_interval_s <= 0:
177 "BUG? interval stat given with duration {!r}".
178 format(this_interval_s))
180 logger.error('BUG? interval stat missing duration')
181 for stat, val in stats.items():
182 if group == 'interval' and this_interval_s:
183 stat = stat + '__rate'
184 val = val / this_interval_s
185 if stat in ['user+sys__rate', 'user__rate', 'sys__rate', 'tx+rx__rate', 'rx__rate', 'tx__rate']:
186 task.series[category, stat].append(
187 (timestamp - self.starttime, val))
189 if stat in ['rss','used','total']:
190 task.series[category, stat].append(
191 (timestamp - self.starttime, val))
192 self.task_stats[task_id][category][stat] = val
193 if val > self.stats_max[category][stat]:
194 self.stats_max[category][stat] = val
195 logger.debug('%s: done parsing', self.label)
197 self.job_tot = collections.defaultdict(
198 functools.partial(collections.defaultdict, int))
199 for task_id, task_stat in self.task_stats.items():
200 for category, stat_last in task_stat.items():
201 for stat, val in stat_last.items():
202 if stat in ['cpus', 'cache', 'swap', 'rss']:
203 # meaningless stats like 16 cpu cores x 5 tasks = 80
205 self.job_tot[category][stat] += val
206 logger.debug('%s: done totals', self.label)
208 if self.stats_max['time'].get('elapsed', 0) > 20:
209 # needs to have executed for at least 20 seconds or we may
210 # not have collected any metrics and these warnings are duds.
214 'net:': 'network I/O',
215 'statfs': 'storage space',
217 for task_stat in self.task_stats.values():
218 for category in task_stat.keys():
219 for checkcat in missing_category:
220 if checkcat.endswith(':'):
221 if category.startswith(checkcat):
222 missing_category.pop(checkcat)
225 if category == checkcat:
226 missing_category.pop(checkcat)
228 for catlabel in missing_category.values():
229 logger.warning('%s: %s stats are missing -- possible cluster configuration issue',
230 self.label, catlabel)
232 def long_label(self):
234 if hasattr(self, 'process') and self.process['uuid'] not in label:
235 label = '{} ({})'.format(label, self.process['uuid'])
238 def elapsed_time(self):
239 if not self.finishtime:
242 s = (self.finishtime - self.starttime).total_seconds()
244 label += '{}d '.format(int(s/86400))
246 label += '{}h '.format(int(s/3600) % 24)
248 label += '{}m '.format(int(s/60) % 60)
249 label += '{}s'.format(int(s) % 60)
252 def text_report(self):
254 return "(no report generated)\n"
255 return "\n".join(itertools.chain(
256 self._text_report_table_gen(lambda x: "\t".join(x),
257 lambda x: "\t".join(x)),
258 self._text_report_agg_gen(lambda x: "# {}: {}{}".format(x[0], x[1], x[2])),
259 self._recommend_gen(lambda x: "#!! "+x))) + "\n"
261 def html_report(self):
262 tophtml = """{}\n<table class='aggtable'><tbody>{}</tbody></table>\n""".format(
263 "\n".join(self._recommend_gen(lambda x: "<p>{}</p>".format(x))),
264 "\n".join(self._text_report_agg_gen(lambda x: "<tr><th>{}</th><td>{}{}</td></tr>".format(*x))))
266 bottomhtml = """<table class='metricstable'><tbody>{}</tbody></table>\n""".format(
267 "\n".join(self._text_report_table_gen(lambda x: "<tr><th>{}</th><th>{}</th><th>{}</th><th>{}</th><th>{}</th></tr>".format(*x),
268 lambda x: "<tr><td>{}</td><td>{}</td><td>{}</td><td>{}</td><td>{}</td></tr>".format(*x))))
269 label = self.long_label()
271 return WEBCHART_CLASS(label, [self]).html(tophtml, bottomhtml)
273 def _text_report_table_gen(self, headerformat, rowformat):
274 yield headerformat(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
275 for category, stat_max in sorted(self.stats_max.items()):
276 for stat, val in sorted(stat_max.items()):
277 if stat.endswith('__rate'):
279 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
280 val = self._format(val)
281 tot = self._format(self.job_tot[category].get(stat, '-'))
282 yield rowformat([category, stat, str(val), max_rate, tot])
284 def _text_report_agg_gen(self, aggformat):
286 if len(self.tasks) > 1:
287 by_single_task = " by a single task"
296 '${:.3f}'.format(self.cost),
298 '') if self.cost > 0 else None,
300 ('Assigned instance type',
301 self.node_info.get('ProviderType'),
303 '') if self.node_info.get('ProviderType') else None,
305 ('Instance hourly price',
306 '${:.3f}'.format(self.node_info.get('Price')),
308 '') if self.node_info.get('Price') else None,
310 ('Max CPU usage in a single interval',
311 self.stats_max['cpu']['user+sys__rate'],
315 ('Overall CPU usage',
316 float(self.job_tot['cpu']['user+sys']) /
317 self.job_tot['time']['elapsed']
318 if self.job_tot['time']['elapsed'] > 0 else 0,
322 ('Requested CPU cores',
323 self.existing_constraints.get(self._map_runtime_constraint('vcpus')),
325 '') if self.existing_constraints.get(self._map_runtime_constraint('vcpus')) else None,
328 self.node_info.get('VCPUs'),
330 '') if self.node_info.get('VCPUs') else None,
332 ('Max memory used{}'.format(by_single_task),
333 self.stats_max['mem']['rss'],
338 self.existing_constraints.get(self._map_runtime_constraint('ram')),
340 'MB') if self.existing_constraints.get(self._map_runtime_constraint('ram')) else None,
342 ('Maximum RAM request for this instance type',
343 (self.node_info.get('RAM') - self.arv_config.get('Containers', {}).get('ReserveExtraRAM', 0))*.95,
345 'MB') if self.node_info.get('RAM') else None,
347 ('Max network traffic{}'.format(by_single_task),
348 self.stats_max['net:eth0']['tx+rx'] +
349 self.stats_max['net:keep0']['tx+rx'],
353 ('Max network speed in a single interval',
354 self.stats_max['net:eth0']['tx+rx__rate'] +
355 self.stats_max['net:keep0']['tx+rx__rate'],
359 ('Keep cache miss rate',
360 (float(self.job_tot['keepcache']['miss']) /
361 float(self.job_tot['keepcalls']['get']))
362 if self.job_tot['keepcalls']['get'] > 0 else 0,
366 ('Keep cache utilization',
367 (float(self.job_tot['blkio:0:0']['read']) /
368 float(self.job_tot['net:keep0']['rx']))
369 if self.job_tot['net:keep0']['rx'] > 0 else 0,
373 ('Temp disk utilization',
374 (float(self.job_tot['statfs']['used']) /
375 float(self.job_tot['statfs']['total']))
376 if self.job_tot['statfs']['total'] > 0 else 0,
381 if len(self.tasks) > 1:
382 metrics.insert(0, ('Number of tasks',
389 format_string, val, transform, suffix = args
390 if val == float('-Inf'):
394 yield aggformat((format_string, self._format(val), suffix))
396 def _recommend_gen(self, recommendformat):
397 # TODO recommend fixing job granularity if elapsed time is too short
399 if self.stats_max['time'].get('elapsed', 0) <= 20:
403 return itertools.chain(
404 self._recommend_cpu(recommendformat),
405 self._recommend_ram(recommendformat),
406 self._recommend_keep_cache(recommendformat),
407 self._recommend_temp_disk(recommendformat),
410 def _recommend_cpu(self, recommendformat):
411 """Recommend asking for 4 cores if max CPU usage was 333%"""
413 constraint_key = self._map_runtime_constraint('vcpus')
414 cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
415 if cpu_max_rate == float('-Inf') or cpu_max_rate == 0.0:
416 logger.warning('%s: no CPU usage data', self.label)
418 # TODO Don't necessarily want to recommend on isolated max peak
419 # take average CPU usage into account as well or % time at max
420 used_cores = max(1, int(math.ceil(cpu_max_rate)))
421 asked_cores = self.existing_constraints.get(constraint_key)
422 if asked_cores is None:
424 # TODO: This should be more nuanced in cases where max >> avg
425 if used_cores < asked_cores:
426 yield recommendformat(
427 '{} max CPU usage was {}% -- '
428 'try reducing runtime_constraints to "{}":{}'
431 math.ceil(cpu_max_rate*100),
435 # FIXME: This needs to be updated to account for current a-d-c algorithms
436 def _recommend_ram(self, recommendformat):
437 """Recommend an economical RAM constraint for this job.
439 Nodes that are advertised as "8 gibibytes" actually have what
440 we might call "8 nearlygibs" of memory available for jobs.
441 Here, we calculate a whole number of nearlygibs that would
442 have sufficed to run the job, then recommend requesting a node
443 with that number of nearlygibs (expressed as mebibytes).
445 Requesting a node with "nearly 8 gibibytes" is our best hope
446 of getting a node that actually has nearly 8 gibibytes
447 available. If the node manager is smart enough to account for
448 the discrepancy itself when choosing/creating a node, we'll
449 get an 8 GiB node with nearly 8 GiB available. Otherwise, the
450 advertised size of the next-size-smaller node (say, 6 GiB)
451 will be too low to satisfy our request, so we will effectively
452 get rounded up to 8 GiB.
454 For example, if we need 7500 MiB, we can ask for 7500 MiB, and
455 we will generally get a node that is advertised as "8 GiB" and
456 has at least 7500 MiB available. However, asking for 8192 MiB
457 would either result in an unnecessarily expensive 12 GiB node
458 (if node manager knows about the discrepancy), or an 8 GiB
459 node which has less than 8192 MiB available and is therefore
460 considered by crunch-dispatch to be too small to meet our
463 When node manager learns how to predict the available memory
464 for each node type such that crunch-dispatch always agrees
465 that a node is big enough to run the job it was brought up
466 for, all this will be unnecessary. We'll just ask for exactly
467 the memory we want -- even if that happens to be 8192 MiB.
470 constraint_key = self._map_runtime_constraint('ram')
471 used_bytes = self.stats_max['mem']['rss']
472 if used_bytes == float('-Inf'):
473 logger.warning('%s: no memory usage data', self.label)
475 if not self.existing_constraints.get(constraint_key):
477 used_mib = math.ceil(float(used_bytes) / MB)
478 asked_mib = self.existing_constraints.get(constraint_key) / MB
480 nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
482 recommend_mib = int(math.ceil(nearlygibs(used_mib/ratio))*AVAILABLE_RAM_RATIO*1024)
483 if used_mib > 0 and (used_mib / asked_mib) < ratio and asked_mib > recommend_mib:
484 yield recommendformat(
485 '{} requested {} MiB of RAM but actual RAM usage was below {}% at {} MiB -- '
486 'suggest reducing RAM request to {} MiB'
494 def _recommend_keep_cache(self, recommendformat):
495 """Recommend increasing keep cache if utilization < 50%.
497 This means the amount of data returned to the program is less
498 than 50% of the amount of data actually downloaded by
502 constraint_key = self._map_runtime_constraint('keep_cache_ram')
503 if self.job_tot['net:keep0']['rx'] == 0:
505 utilization = (float(self.job_tot['blkio:0:0']['read']) /
506 float(self.job_tot['net:keep0']['rx']))
507 # FIXME: the default on this get won't work correctly
508 asked_cache = self.existing_constraints.get('keep_cache_ram') or self.existing_constraints.get('keep_cache_disk')
510 if utilization < 0.5:
511 yield recommendformat(
512 '{} Keep cache utilization was {:.2f}% -- '
513 'try increasing keep_cache to {} MB'
517 math.ceil((asked_cache * 2) / (1024*1024)))
520 def _recommend_temp_disk(self, recommendformat):
521 """Recommend decreasing temp disk if utilization < 50%.
523 This recommendation is disabled for the time being. It uses
524 the total disk on the node and not the amount of disk
525 requested, so it triggers a false positive basically every
526 time. To get the amount of disk requested we need to fish it
527 out of the mounts, which is extra work I don't want do right
533 # total = float(self.job_tot['statfs']['total'])
534 # utilization = (float(self.job_tot['statfs']['used']) / total) if total > 0 else 0.0
536 # if utilization < 50.0 and total > 0:
537 # yield recommendformat(
538 # '{} max temp disk utilization was {:.0f}% of {:.0f} MiB -- '
539 # 'consider reducing "tmpdirMin" and/or "outdirMin"'
542 # utilization * 100.0,
546 def _format(self, val):
547 """Return a string representation of a stat.
549 {:.2f} for floats, default format for everything else."""
550 if isinstance(val, float):
551 return '{:.2f}'.format(val)
553 return '{}'.format(val)
555 def _runtime_constraint_mem_unit(self):
556 if hasattr(self, 'runtime_constraint_mem_unit'):
557 return self.runtime_constraint_mem_unit
559 return ContainerRequestSummarizer.runtime_constraint_mem_unit
561 def _map_runtime_constraint(self, key):
565 class CollectionSummarizer(Summarizer):
566 def __init__(self, collection_id, **kwargs):
567 super(CollectionSummarizer, self).__init__(
568 crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
569 self.label = collection_id
572 def NewSummarizer(process_or_uuid, **kwargs):
573 """Construct with the appropriate subclass for this uuid/object."""
575 if isinstance(process_or_uuid, dict):
576 process = process_or_uuid
577 uuid = process['uuid']
579 uuid = process_or_uuid
581 arv = kwargs.get("arv") or arvados.api('v1')
583 if '-dz642-' in uuid:
585 # Get the associated CR. Doesn't matter which since they all have the same logs
586 crs = arv.container_requests().list(filters=[['container_uuid','=',uuid]],limit=1).execute()['items']
589 klass = ContainerRequestTreeSummarizer
590 elif '-xvhdp-' in uuid:
592 process = arv.container_requests().get(uuid=uuid).execute()
593 klass = ContainerRequestTreeSummarizer
594 elif '-4zz18-' in uuid:
595 return CollectionSummarizer(collection_id=uuid)
597 raise ArgumentError("Unrecognized uuid %s", uuid)
598 return klass(process, uuid=uuid, **kwargs)
601 class ProcessSummarizer(Summarizer):
602 """Process is a job, pipeline, or container request."""
604 def __init__(self, process, label=None, **kwargs):
606 self.process = process
607 arv = kwargs.get("arv") or arvados.api('v1')
609 label = self.process.get('name', self.process['uuid'])
610 # Pre-Arvados v1.4 everything is in 'log'
611 # For 1.4+ containers have no logs and container_requests have them in 'log_uuid', not 'log'
612 log_collection = self.process.get('log', self.process.get('log_uuid'))
613 if log_collection and self.process.get('state') != 'Uncommitted': # arvados.util.CR_UNCOMMITTED:
615 rdr = crunchstat_summary.reader.CollectionReader(
618 collection_object=kwargs.get("collection_object"))
619 except arvados.errors.NotFoundError as e:
620 logger.warning("Trying event logs after failing to read "
621 "log collection %s: %s", self.process['log'], e)
623 uuid = self.process.get('container_uuid', self.process.get('uuid'))
624 rdr = crunchstat_summary.reader.LiveLogReader(uuid)
625 label = label + ' (partial)'
627 super(ProcessSummarizer, self).__init__(rdr, label=label, **kwargs)
628 self.existing_constraints = self.process.get('runtime_constraints', {})
629 self.arv_config = arv.config()
630 self.cost = self.process.get('cost', 0)
633 class ContainerRequestSummarizer(ProcessSummarizer):
634 runtime_constraint_mem_unit = 1
637 class MultiSummarizer(object):
638 def __init__(self, children={}, label=None, threads=1, **kwargs):
639 self.children = children
641 self.threadcount = threads
644 if self.threadcount > 1 and len(self.children) > 1:
646 def run_and_progress(child):
649 except Exception as e:
650 logger.exception("parse error")
652 logger.info("%s/%s summarized %s", completed, len(self.children), child.label)
653 with ThreadPoolExecutor(max_workers=self.threadcount) as tpe:
654 for child in self.children.values():
655 tpe.submit(run_and_progress, child)
657 for child in self.children.values():
660 def text_report(self):
662 d = self._descendants()
663 for child in d.values():
665 txt += '### Summary for {} ({})\n'.format(
666 child.label, child.process['uuid'])
667 txt += child.text_report()
671 def _descendants(self):
672 """Dict of self and all descendants.
674 Nodes with nothing of their own to report (like
675 MultiSummarizers) are omitted.
677 d = collections.OrderedDict()
678 for key, child in self.children.items():
679 if isinstance(child, Summarizer):
681 if isinstance(child, MultiSummarizer):
682 d.update(child._descendants())
685 def html_report(self):
689 if len(self._descendants()) == 1:
690 summarizer = next(iter(self._descendants().values()))
691 tophtml = """{}\n<table class='aggtable'><tbody>{}</tbody></table>\n""".format(
692 "\n".join(summarizer._recommend_gen(lambda x: "<p>{}</p>".format(x))),
693 "\n".join(summarizer._text_report_agg_gen(lambda x: "<tr><th>{}</th><td>{}{}</td></tr>".format(*x))))
695 bottomhtml = """<table class='metricstable'><tbody>{}</tbody></table>\n""".format(
696 "\n".join(summarizer._text_report_table_gen(lambda x: "<tr><th>{}</th><th>{}</th><th>{}</th><th>{}</th><th>{}</th></tr>".format(*x),
697 lambda x: "<tr><td>{}</td><td>{}</td><td>{}</td><td>{}</td><td>{}</td></tr>".format(*x))))
698 label = summarizer.long_label()
700 return WEBCHART_CLASS(label, iter(self._descendants().values())).html(tophtml, bottomhtml)
703 class ContainerRequestTreeSummarizer(MultiSummarizer):
704 def __init__(self, root, skip_child_jobs=False, **kwargs):
705 arv = kwargs.get("arv") or arvados.api('v1')
707 label = kwargs.pop('label', None) or root.get('name') or root['uuid']
710 children = collections.OrderedDict()
711 todo = collections.deque((root, ))
713 current = todo.popleft()
714 label = current['name']
715 sort_key = current['created_at']
717 summer = ContainerRequestSummarizer(current, label=label, **kwargs)
718 summer.sort_key = sort_key
719 children[current['uuid']] = summer
722 child_crs = arv.container_requests().list(filters=[['requesting_container_uuid', '=', current['container_uuid']]],
724 logger.warning('%s: omitting stats from child containers'
725 ' because --skip-child-jobs flag is on',
726 label, child_crs['items_available'])
728 for cr in arvados.util.keyset_list_all(arv.container_requests().list,
729 filters=[['requesting_container_uuid', '=', current['container_uuid']]]):
730 if cr['container_uuid']:
731 logger.debug('%s: container req %s', current['uuid'], cr['uuid'])
732 cr['name'] = cr.get('name') or cr['uuid']
734 sorted_children = collections.OrderedDict()
735 for uuid in sorted(list(children.keys()), key=lambda uuid: children[uuid].sort_key):
736 sorted_children[uuid] = children[uuid]
737 super(ContainerRequestTreeSummarizer, self).__init__(
738 children=sorted_children,