1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
5 from __future__ import print_function
9 import crunchstat_summary.dygraphs
10 import crunchstat_summary.reader
20 from arvados.api import OrderedJsonModel
21 from crunchstat_summary import logger
23 # Recommend memory constraints that are this multiple of an integral
24 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
25 # that have amounts like 7.5 GiB according to the kernel.)
26 AVAILABLE_RAM_RATIO = 0.95
29 # Workaround datetime.datetime.strptime() thread-safety bug by calling
30 # it once before starting threads. https://bugs.python.org/issue7980
31 datetime.datetime.strptime('1999-12-31_23:59:59', '%Y-%m-%d_%H:%M:%S')
34 WEBCHART_CLASS = crunchstat_summary.dygraphs.DygraphsChart
40 self.finishtime = None
41 self.series = collections.defaultdict(list)
44 class Summarizer(object):
45 def __init__(self, logdata, label=None, skip_child_jobs=False, uuid=None, **kwargs):
46 self._logdata = logdata
51 self.finishtime = None
52 self._skip_child_jobs = skip_child_jobs
54 # stats_max: {category: {stat: val}}
55 self.stats_max = collections.defaultdict(
56 functools.partial(collections.defaultdict, lambda: 0))
57 # task_stats: {task_id: {category: {stat: val}}}
58 self.task_stats = collections.defaultdict(
59 functools.partial(collections.defaultdict, dict))
62 self.tasks = collections.defaultdict(Task)
64 # We won't bother recommending new runtime constraints if the
65 # constraints given when running the job are known to us and
66 # are already suitable. If applicable, the subclass
67 # constructor will overwrite this with something useful.
68 self.existing_constraints = {}
70 logger.debug("%s: logdata %s", self.label, logdata)
73 logger.debug("%s: parsing logdata %s", self.label, self._logdata)
74 with self._logdata as logdata:
77 def _run(self, logdata):
78 self.detected_crunch1 = False
80 if not self.detected_crunch1 and '-8i9sb-' in line:
81 self.detected_crunch1 = True
83 if self.detected_crunch1:
84 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
86 seq = int(m.group('seq'))
87 uuid = m.group('task_uuid')
88 self.seq_to_uuid[seq] = uuid
89 logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
92 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
94 task_id = self.seq_to_uuid[int(m.group('seq'))]
95 elapsed = int(m.group('elapsed'))
96 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
97 if elapsed > self.stats_max['time']['elapsed']:
98 self.stats_max['time']['elapsed'] = elapsed
101 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
103 uuid = m.group('uuid')
104 if self._skip_child_jobs:
105 logger.warning('%s: omitting stats from child job %s'
106 ' because --skip-child-jobs flag is on',
109 logger.debug('%s: follow %s', self.label, uuid)
110 child_summarizer = ProcessSummarizer(uuid)
111 child_summarizer.stats_max = self.stats_max
112 child_summarizer.task_stats = self.task_stats
113 child_summarizer.tasks = self.tasks
114 child_summarizer.starttime = self.starttime
115 child_summarizer.run()
116 logger.debug('%s: done %s', self.label, uuid)
119 # 2017-12-02_17:15:08 e51c5-8i9sb-mfp68stkxnqdd6m 63676 0 stderr crunchstat: keepcalls 0 put 2576 get -- interval 10.0000 seconds 0 put 2576 get
120 m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr (?P<crunchstat>crunchstat: )(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
125 # 2017-12-01T16:56:24.723509200Z crunchstat: keepcalls 0 put 3 get -- interval 10.0000 seconds 0 put 3 get
126 m = re.search(r'^(?P<timestamp>\S+) (?P<crunchstat>crunchstat: )?(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
130 if self.label is None:
132 self.label = m.group('job_uuid')
134 self.label = 'container'
135 if m.group('category').endswith(':'):
136 # "stderr crunchstat: notice: ..."
138 elif m.group('category') in ('error', 'caught'):
140 elif m.group('category') in ('read', 'open', 'cgroup', 'CID', 'Running'):
141 # "stderr crunchstat: read /proc/1234/net/dev: ..."
142 # (old logs are less careful with unprefixed error messages)
145 if self.detected_crunch1:
146 task_id = self.seq_to_uuid[int(m.group('seq'))]
148 task_id = 'container'
149 task = self.tasks[task_id]
151 # Use the first and last crunchstat timestamps as
152 # approximations of starttime and finishtime.
153 timestamp = m.group('timestamp')
154 if timestamp[10:11] == '_':
155 timestamp = datetime.datetime.strptime(
156 timestamp, '%Y-%m-%d_%H:%M:%S')
157 elif timestamp[10:11] == 'T':
158 timestamp = datetime.datetime.strptime(
159 timestamp[:19], '%Y-%m-%dT%H:%M:%S')
161 raise ValueError("Cannot parse timestamp {!r}".format(
164 if task.starttime is None:
165 logger.debug('%s: task %s starttime %s',
166 self.label, task_id, timestamp)
167 if task.starttime is None or timestamp < task.starttime:
168 task.starttime = timestamp
169 if task.finishtime is None or timestamp > task.finishtime:
170 task.finishtime = timestamp
172 if self.starttime is None or timestamp < task.starttime:
173 self.starttime = timestamp
174 if self.finishtime is None or timestamp < task.finishtime:
175 self.finishtime = timestamp
177 if (not self.detected_crunch1) and task.starttime is not None and task.finishtime is not None:
178 elapsed = (task.finishtime - task.starttime).seconds
179 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
180 if elapsed > self.stats_max['time']['elapsed']:
181 self.stats_max['time']['elapsed'] = elapsed
183 this_interval_s = None
184 for group in ['current', 'interval']:
185 if not m.group(group):
187 category = m.group('category')
188 words = m.group(group).split(' ')
191 for val, stat in zip(words[::2], words[1::2]):
193 stats[stat] = float(val)
195 stats[stat] = int(val)
196 except ValueError as e:
197 # If the line doesn't start with 'crunchstat:' we
198 # might have mistaken an error message for a
199 # structured crunchstat line.
200 if m.group("crunchstat") is None or m.group("category") == "crunchstat":
201 logger.warning("%s: log contains message\n %s", self.label, line)
204 '%s: Error parsing value %r (stat %r, category %r): %r',
205 self.label, val, stat, category, e)
206 logger.warning('%s', line)
208 if 'user' in stats or 'sys' in stats:
209 stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
210 if 'tx' in stats or 'rx' in stats:
211 stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
212 for stat, val in stats.iteritems():
213 if group == 'interval':
214 if stat == 'seconds':
215 this_interval_s = val
217 elif not (this_interval_s > 0):
219 "BUG? interval stat given with duration {!r}".
220 format(this_interval_s))
223 stat = stat + '__rate'
224 val = val / this_interval_s
225 if stat in ['user+sys__rate', 'tx+rx__rate']:
226 task.series[category, stat].append(
227 (timestamp - self.starttime, val))
230 task.series[category, stat].append(
231 (timestamp - self.starttime, val))
232 self.task_stats[task_id][category][stat] = val
233 if val > self.stats_max[category][stat]:
234 self.stats_max[category][stat] = val
235 logger.debug('%s: done parsing', self.label)
237 self.job_tot = collections.defaultdict(
238 functools.partial(collections.defaultdict, int))
239 for task_id, task_stat in self.task_stats.iteritems():
240 for category, stat_last in task_stat.iteritems():
241 for stat, val in stat_last.iteritems():
242 if stat in ['cpus', 'cache', 'swap', 'rss']:
243 # meaningless stats like 16 cpu cores x 5 tasks = 80
245 self.job_tot[category][stat] += val
246 logger.debug('%s: done totals', self.label)
248 def long_label(self):
250 if hasattr(self, 'process') and self.process['uuid'] not in label:
251 label = '{} ({})'.format(label, self.process['uuid'])
253 label += ' -- elapsed time '
254 s = (self.finishtime - self.starttime).total_seconds()
256 label += '{}d'.format(int(s/86400))
258 label += '{}h'.format(int(s/3600) % 24)
260 label += '{}m'.format(int(s/60) % 60)
261 label += '{}s'.format(int(s) % 60)
264 def text_report(self):
266 return "(no report generated)\n"
267 return "\n".join(itertools.chain(
268 self._text_report_gen(),
269 self._recommend_gen())) + "\n"
271 def html_report(self):
272 return WEBCHART_CLASS(self.label, [self]).html()
274 def _text_report_gen(self):
275 yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
276 for category, stat_max in sorted(self.stats_max.iteritems()):
277 for stat, val in sorted(stat_max.iteritems()):
278 if stat.endswith('__rate'):
280 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
281 val = self._format(val)
282 tot = self._format(self.job_tot[category].get(stat, '-'))
283 yield "\t".join([category, stat, str(val), max_rate, tot])
285 ('Number of tasks: {}',
288 ('Max CPU time spent by a single task: {}s',
289 self.stats_max['cpu']['user+sys'],
291 ('Max CPU usage in a single interval: {}%',
292 self.stats_max['cpu']['user+sys__rate'],
294 ('Overall CPU usage: {}%',
295 self.job_tot['cpu']['user+sys'] /
296 self.job_tot['time']['elapsed']
297 if self.job_tot['time']['elapsed'] > 0 else 0,
299 ('Max memory used by a single task: {}GB',
300 self.stats_max['mem']['rss'],
302 ('Max network traffic in a single task: {}GB',
303 self.stats_max['net:eth0']['tx+rx'] +
304 self.stats_max['net:keep0']['tx+rx'],
306 ('Max network speed in a single interval: {}MB/s',
307 self.stats_max['net:eth0']['tx+rx__rate'] +
308 self.stats_max['net:keep0']['tx+rx__rate'],
310 ('Keep cache miss rate {}%',
311 (float(self.job_tot['keepcache']['miss']) /
312 float(self.job_tot['keepcalls']['get']))
313 if self.job_tot['keepcalls']['get'] > 0 else 0,
314 lambda x: x * 100.0),
315 ('Keep cache utilization {}%',
316 (float(self.job_tot['blkio:0:0']['read']) /
317 float(self.job_tot['net:keep0']['rx']))
318 if self.job_tot['net:keep0']['rx'] > 0 else 0,
319 lambda x: x * 100.0)):
320 format_string, val, transform = args
321 if val == float('-Inf'):
325 yield "# "+format_string.format(self._format(val))
327 def _recommend_gen(self):
328 return itertools.chain(
329 self._recommend_cpu(),
330 self._recommend_ram(),
331 self._recommend_keep_cache())
333 def _recommend_cpu(self):
334 """Recommend asking for 4 cores if max CPU usage was 333%"""
336 constraint_key = self._map_runtime_constraint('vcpus')
337 cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
338 if cpu_max_rate == float('-Inf'):
339 logger.warning('%s: no CPU usage data', self.label)
341 used_cores = max(1, int(math.ceil(cpu_max_rate)))
342 asked_cores = self.existing_constraints.get(constraint_key)
343 if asked_cores is None or used_cores < asked_cores:
345 '#!! {} max CPU usage was {}% -- '
346 'try runtime_constraints "{}":{}'
349 int(math.ceil(cpu_max_rate*100)),
353 def _recommend_ram(self):
354 """Recommend an economical RAM constraint for this job.
356 Nodes that are advertised as "8 gibibytes" actually have what
357 we might call "8 nearlygibs" of memory available for jobs.
358 Here, we calculate a whole number of nearlygibs that would
359 have sufficed to run the job, then recommend requesting a node
360 with that number of nearlygibs (expressed as mebibytes).
362 Requesting a node with "nearly 8 gibibytes" is our best hope
363 of getting a node that actually has nearly 8 gibibytes
364 available. If the node manager is smart enough to account for
365 the discrepancy itself when choosing/creating a node, we'll
366 get an 8 GiB node with nearly 8 GiB available. Otherwise, the
367 advertised size of the next-size-smaller node (say, 6 GiB)
368 will be too low to satisfy our request, so we will effectively
369 get rounded up to 8 GiB.
371 For example, if we need 7500 MiB, we can ask for 7500 MiB, and
372 we will generally get a node that is advertised as "8 GiB" and
373 has at least 7500 MiB available. However, asking for 8192 MiB
374 would either result in an unnecessarily expensive 12 GiB node
375 (if node manager knows about the discrepancy), or an 8 GiB
376 node which has less than 8192 MiB available and is therefore
377 considered by crunch-dispatch to be too small to meet our
380 When node manager learns how to predict the available memory
381 for each node type such that crunch-dispatch always agrees
382 that a node is big enough to run the job it was brought up
383 for, all this will be unnecessary. We'll just ask for exactly
384 the memory we want -- even if that happens to be 8192 MiB.
387 constraint_key = self._map_runtime_constraint('ram')
388 used_bytes = self.stats_max['mem']['rss']
389 if used_bytes == float('-Inf'):
390 logger.warning('%s: no memory usage data', self.label)
392 used_mib = math.ceil(float(used_bytes) / 1048576)
393 asked_mib = self.existing_constraints.get(constraint_key)
395 nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
396 if asked_mib is None or (
397 math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib)):
399 '#!! {} max RSS was {} MiB -- '
400 'try runtime_constraints "{}":{}'
405 int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024*(2**20)/self._runtime_constraint_mem_unit()))
407 def _recommend_keep_cache(self):
408 """Recommend increasing keep cache if utilization < 80%"""
409 constraint_key = self._map_runtime_constraint('keep_cache_ram')
410 if self.job_tot['net:keep0']['rx'] == 0:
412 utilization = (float(self.job_tot['blkio:0:0']['read']) /
413 float(self.job_tot['net:keep0']['rx']))
414 asked_mib = self.existing_constraints.get(constraint_key, 256)
416 if utilization < 0.8:
418 '#!! {} Keep cache utilization was {:.2f}% -- '
419 'try runtime_constraints "{}":{} (or more)'
424 asked_mib*2*(2**20)/self._runtime_constraint_mem_unit())
427 def _format(self, val):
428 """Return a string representation of a stat.
430 {:.2f} for floats, default format for everything else."""
431 if isinstance(val, float):
432 return '{:.2f}'.format(val)
434 return '{}'.format(val)
436 def _runtime_constraint_mem_unit(self):
437 if hasattr(self, 'runtime_constraint_mem_unit'):
438 return self.runtime_constraint_mem_unit
439 elif self.detected_crunch1:
440 return JobSummarizer.runtime_constraint_mem_unit
442 return ContainerSummarizer.runtime_constraint_mem_unit
444 def _map_runtime_constraint(self, key):
445 if hasattr(self, 'map_runtime_constraint'):
446 return self.map_runtime_constraint[key]
447 elif self.detected_crunch1:
448 return JobSummarizer.map_runtime_constraint[key]
453 class CollectionSummarizer(Summarizer):
454 def __init__(self, collection_id, **kwargs):
455 super(CollectionSummarizer, self).__init__(
456 crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
457 self.label = collection_id
460 def NewSummarizer(process_or_uuid, **kwargs):
461 """Construct with the appropriate subclass for this uuid/object."""
463 if isinstance(process_or_uuid, dict):
464 process = process_or_uuid
465 uuid = process['uuid']
467 uuid = process_or_uuid
469 arv = arvados.api('v1', model=OrderedJsonModel())
471 if '-dz642-' in uuid:
473 process = arv.containers().get(uuid=uuid).execute()
474 klass = ContainerTreeSummarizer
475 elif '-xvhdp-' in uuid:
477 process = arv.container_requests().get(uuid=uuid).execute()
478 klass = ContainerTreeSummarizer
479 elif '-8i9sb-' in uuid:
481 process = arv.jobs().get(uuid=uuid).execute()
482 klass = JobTreeSummarizer
483 elif '-d1hrv-' in uuid:
485 process = arv.pipeline_instances().get(uuid=uuid).execute()
486 klass = PipelineSummarizer
487 elif '-4zz18-' in uuid:
488 return CollectionSummarizer(collection_id=uuid)
490 raise ArgumentError("Unrecognized uuid %s", uuid)
491 return klass(process, uuid=uuid, **kwargs)
494 class ProcessSummarizer(Summarizer):
495 """Process is a job, pipeline, container, or container request."""
497 def __init__(self, process, label=None, **kwargs):
499 self.process = process
501 label = self.process.get('name', self.process['uuid'])
502 if self.process.get('log'):
504 rdr = crunchstat_summary.reader.CollectionReader(self.process['log'])
505 except arvados.errors.NotFoundError as e:
506 logger.warning("Trying event logs after failing to read "
507 "log collection %s: %s", self.process['log'], e)
509 rdr = crunchstat_summary.reader.LiveLogReader(self.process['uuid'])
510 label = label + ' (partial)'
511 super(ProcessSummarizer, self).__init__(rdr, label=label, **kwargs)
512 self.existing_constraints = self.process.get('runtime_constraints', {})
515 class JobSummarizer(ProcessSummarizer):
516 runtime_constraint_mem_unit = 1048576
517 map_runtime_constraint = {
518 'keep_cache_ram': 'keep_cache_mb_per_task',
519 'ram': 'min_ram_mb_per_node',
520 'vcpus': 'min_cores_per_node',
524 class ContainerSummarizer(ProcessSummarizer):
525 runtime_constraint_mem_unit = 1
528 class MultiSummarizer(object):
529 def __init__(self, children={}, label=None, threads=1, **kwargs):
530 self.throttle = threading.Semaphore(threads)
531 self.children = children
534 def run_and_release(self, target, *args, **kwargs):
536 return target(*args, **kwargs)
538 self.throttle.release()
542 for child in self.children.itervalues():
543 self.throttle.acquire()
544 t = threading.Thread(target=self.run_and_release, args=(child.run, ))
551 def text_report(self):
553 d = self._descendants()
554 for child in d.itervalues():
556 txt += '### Summary for {} ({})\n'.format(
557 child.label, child.process['uuid'])
558 txt += child.text_report()
562 def _descendants(self):
563 """Dict of self and all descendants.
565 Nodes with nothing of their own to report (like
566 MultiSummarizers) are omitted.
568 d = collections.OrderedDict()
569 for key, child in self.children.iteritems():
570 if isinstance(child, Summarizer):
572 if isinstance(child, MultiSummarizer):
573 d.update(child._descendants())
576 def html_report(self):
577 return WEBCHART_CLASS(self.label, self._descendants().itervalues()).html()
580 class JobTreeSummarizer(MultiSummarizer):
581 """Summarizes a job and all children listed in its components field."""
582 def __init__(self, job, label=None, **kwargs):
583 arv = arvados.api('v1', model=OrderedJsonModel())
584 label = label or job.get('name', job['uuid'])
585 children = collections.OrderedDict()
586 children[job['uuid']] = JobSummarizer(job, label=label, **kwargs)
587 if job.get('components', None):
589 for j in arv.jobs().index(
590 limit=len(job['components']),
591 filters=[['uuid','in',job['components'].values()]]).execute()['items']:
592 preloaded[j['uuid']] = j
593 for cname in sorted(job['components'].keys()):
594 child_uuid = job['components'][cname]
595 j = (preloaded.get(child_uuid) or
596 arv.jobs().get(uuid=child_uuid).execute())
597 children[child_uuid] = JobTreeSummarizer(job=j, label=cname, **kwargs)
599 super(JobTreeSummarizer, self).__init__(
605 class PipelineSummarizer(MultiSummarizer):
606 def __init__(self, instance, **kwargs):
607 children = collections.OrderedDict()
608 for cname, component in instance['components'].iteritems():
609 if 'job' not in component:
611 "%s: skipping component with no job assigned", cname)
614 "%s: job %s", cname, component['job']['uuid'])
615 summarizer = JobTreeSummarizer(component['job'], label=cname, **kwargs)
616 summarizer.label = '{} {}'.format(
617 cname, component['job']['uuid'])
618 children[cname] = summarizer
619 super(PipelineSummarizer, self).__init__(
621 label=instance['uuid'],
625 class ContainerTreeSummarizer(MultiSummarizer):
626 def __init__(self, root, skip_child_jobs=False, **kwargs):
627 arv = arvados.api('v1', model=OrderedJsonModel())
629 label = kwargs.pop('label', None) or root.get('name') or root['uuid']
632 children = collections.OrderedDict()
633 todo = collections.deque((root, ))
635 current = todo.popleft()
636 label = current['name']
637 sort_key = current['created_at']
638 if current['uuid'].find('-xvhdp-') > 0:
639 current = arv.containers().get(uuid=current['container_uuid']).execute()
641 summer = ContainerSummarizer(current, label=label, **kwargs)
642 summer.sort_key = sort_key
643 children[current['uuid']] = summer
647 child_crs = arv.container_requests().index(
649 filters=page_filters+[
650 ['requesting_container_uuid', '=', current['uuid']]],
652 if not child_crs['items']:
654 elif skip_child_jobs:
655 logger.warning('%s: omitting stats from %d child containers'
656 ' because --skip-child-jobs flag is on',
657 label, child_crs['items_available'])
659 page_filters = [['uuid', '>', child_crs['items'][-1]['uuid']]]
660 for cr in child_crs['items']:
661 if cr['container_uuid']:
662 logger.debug('%s: container req %s', current['uuid'], cr['uuid'])
663 cr['name'] = cr.get('name') or cr['uuid']
665 sorted_children = collections.OrderedDict()
666 for uuid in sorted(children.keys(), key=lambda uuid: children[uuid].sort_key):
667 sorted_children[uuid] = children[uuid]
668 super(ContainerTreeSummarizer, self).__init__(
669 children=sorted_children,