1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
7 import crunchstat_summary.dygraphs
8 import crunchstat_summary.reader
18 from arvados.api import OrderedJsonModel
19 from crunchstat_summary import logger
21 # Recommend memory constraints that are this multiple of an integral
22 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
23 # that have amounts like 7.5 GiB according to the kernel.)
24 AVAILABLE_RAM_RATIO = 0.95
27 # Workaround datetime.datetime.strptime() thread-safety bug by calling
28 # it once before starting threads. https://bugs.python.org/issue7980
29 datetime.datetime.strptime('1999-12-31_23:59:59', '%Y-%m-%d_%H:%M:%S')
32 WEBCHART_CLASS = crunchstat_summary.dygraphs.DygraphsChart
38 self.finishtime = None
39 self.series = collections.defaultdict(list)
42 class Summarizer(object):
43 def __init__(self, logdata, label=None, skip_child_jobs=False, uuid=None, **kwargs):
44 self._logdata = logdata
49 self.finishtime = None
50 self._skip_child_jobs = skip_child_jobs
52 # stats_max: {category: {stat: val}}
53 self.stats_max = collections.defaultdict(
54 functools.partial(collections.defaultdict, lambda: 0))
55 # task_stats: {task_id: {category: {stat: val}}}
56 self.task_stats = collections.defaultdict(
57 functools.partial(collections.defaultdict, dict))
60 self.tasks = collections.defaultdict(Task)
62 # We won't bother recommending new runtime constraints if the
63 # constraints given when running the job are known to us and
64 # are already suitable. If applicable, the subclass
65 # constructor will overwrite this with something useful.
66 self.existing_constraints = {}
68 logger.debug("%s: logdata %s", self.label, logdata)
71 logger.debug("%s: parsing logdata %s", self.label, self._logdata)
72 with self._logdata as logdata:
75 def _run(self, logdata):
76 self.detected_crunch1 = False
78 if not self.detected_crunch1 and '-8i9sb-' in line:
79 self.detected_crunch1 = True
81 if self.detected_crunch1:
82 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
84 seq = int(m.group('seq'))
85 uuid = m.group('task_uuid')
86 self.seq_to_uuid[seq] = uuid
87 logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
90 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
92 task_id = self.seq_to_uuid[int(m.group('seq'))]
93 elapsed = int(m.group('elapsed'))
94 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
95 if elapsed > self.stats_max['time']['elapsed']:
96 self.stats_max['time']['elapsed'] = elapsed
99 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
101 uuid = m.group('uuid')
102 if self._skip_child_jobs:
103 logger.warning('%s: omitting stats from child job %s'
104 ' because --skip-child-jobs flag is on',
107 logger.debug('%s: follow %s', self.label, uuid)
108 child_summarizer = ProcessSummarizer(uuid)
109 child_summarizer.stats_max = self.stats_max
110 child_summarizer.task_stats = self.task_stats
111 child_summarizer.tasks = self.tasks
112 child_summarizer.starttime = self.starttime
113 child_summarizer.run()
114 logger.debug('%s: done %s', self.label, uuid)
117 # 2017-12-02_17:15:08 e51c5-8i9sb-mfp68stkxnqdd6m 63676 0 stderr crunchstat: keepcalls 0 put 2576 get -- interval 10.0000 seconds 0 put 2576 get
118 m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr (?P<crunchstat>crunchstat: )(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
123 # 2017-12-01T16:56:24.723509200Z crunchstat: keepcalls 0 put 3 get -- interval 10.0000 seconds 0 put 3 get
124 m = re.search(r'^(?P<timestamp>\S+) (?P<crunchstat>crunchstat: )?(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
128 if self.label is None:
130 self.label = m.group('job_uuid')
132 self.label = 'container'
133 if m.group('category').endswith(':'):
134 # "stderr crunchstat: notice: ..."
136 elif m.group('category') in ('error', 'caught'):
138 elif m.group('category') in ('read', 'open', 'cgroup', 'CID', 'Running'):
139 # "stderr crunchstat: read /proc/1234/net/dev: ..."
140 # (old logs are less careful with unprefixed error messages)
143 if self.detected_crunch1:
144 task_id = self.seq_to_uuid[int(m.group('seq'))]
146 task_id = 'container'
147 task = self.tasks[task_id]
149 # Use the first and last crunchstat timestamps as
150 # approximations of starttime and finishtime.
151 timestamp = m.group('timestamp')
152 if timestamp[10:11] == '_':
153 timestamp = datetime.datetime.strptime(
154 timestamp, '%Y-%m-%d_%H:%M:%S')
155 elif timestamp[10:11] == 'T':
156 timestamp = datetime.datetime.strptime(
157 timestamp[:19], '%Y-%m-%dT%H:%M:%S')
159 raise ValueError("Cannot parse timestamp {!r}".format(
162 if task.starttime is None:
163 logger.debug('%s: task %s starttime %s',
164 self.label, task_id, timestamp)
165 if task.starttime is None or timestamp < task.starttime:
166 task.starttime = timestamp
167 if task.finishtime is None or timestamp > task.finishtime:
168 task.finishtime = timestamp
170 if self.starttime is None or timestamp < task.starttime:
171 self.starttime = timestamp
172 if self.finishtime is None or timestamp < task.finishtime:
173 self.finishtime = timestamp
175 if (not self.detected_crunch1) and task.starttime is not None and task.finishtime is not None:
176 elapsed = (task.finishtime - task.starttime).seconds
177 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
178 if elapsed > self.stats_max['time']['elapsed']:
179 self.stats_max['time']['elapsed'] = elapsed
181 this_interval_s = None
182 for group in ['current', 'interval']:
183 if not m.group(group):
185 category = m.group('category')
186 words = m.group(group).split(' ')
189 for val, stat in zip(words[::2], words[1::2]):
191 stats[stat] = float(val)
193 stats[stat] = int(val)
194 except ValueError as e:
195 # If the line doesn't start with 'crunchstat:' we
196 # might have mistaken an error message for a
197 # structured crunchstat line.
198 if m.group("crunchstat") is None or m.group("category") == "crunchstat":
199 logger.warning("%s: log contains message\n %s", self.label, line)
202 '%s: Error parsing value %r (stat %r, category %r): %r',
203 self.label, val, stat, category, e)
204 logger.warning('%s', line)
206 if 'user' in stats or 'sys' in stats:
207 stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
208 if 'tx' in stats or 'rx' in stats:
209 stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
210 for stat, val in stats.items():
211 if group == 'interval':
212 if stat == 'seconds':
213 this_interval_s = val
215 elif not (this_interval_s > 0):
217 "BUG? interval stat given with duration {!r}".
218 format(this_interval_s))
221 stat = stat + '__rate'
222 val = val / this_interval_s
223 if stat in ['user+sys__rate', 'tx+rx__rate']:
224 task.series[category, stat].append(
225 (timestamp - self.starttime, val))
228 task.series[category, stat].append(
229 (timestamp - self.starttime, val))
230 self.task_stats[task_id][category][stat] = val
231 if val > self.stats_max[category][stat]:
232 self.stats_max[category][stat] = val
233 logger.debug('%s: done parsing', self.label)
235 self.job_tot = collections.defaultdict(
236 functools.partial(collections.defaultdict, int))
237 for task_id, task_stat in self.task_stats.items():
238 for category, stat_last in task_stat.items():
239 for stat, val in stat_last.items():
240 if stat in ['cpus', 'cache', 'swap', 'rss']:
241 # meaningless stats like 16 cpu cores x 5 tasks = 80
243 self.job_tot[category][stat] += val
244 logger.debug('%s: done totals', self.label)
246 def long_label(self):
248 if hasattr(self, 'process') and self.process['uuid'] not in label:
249 label = '{} ({})'.format(label, self.process['uuid'])
251 label += ' -- elapsed time '
252 s = (self.finishtime - self.starttime).total_seconds()
254 label += '{}d'.format(int(s/86400))
256 label += '{}h'.format(int(s/3600) % 24)
258 label += '{}m'.format(int(s/60) % 60)
259 label += '{}s'.format(int(s) % 60)
262 def text_report(self):
264 return "(no report generated)\n"
265 return "\n".join(itertools.chain(
266 self._text_report_gen(),
267 self._recommend_gen())) + "\n"
269 def html_report(self):
270 return WEBCHART_CLASS(self.label, [self]).html()
272 def _text_report_gen(self):
273 yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
274 for category, stat_max in sorted(self.stats_max.items()):
275 for stat, val in sorted(stat_max.items()):
276 if stat.endswith('__rate'):
278 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
279 val = self._format(val)
280 tot = self._format(self.job_tot[category].get(stat, '-'))
281 yield "\t".join([category, stat, str(val), max_rate, tot])
283 ('Number of tasks: {}',
286 ('Max CPU time spent by a single task: {}s',
287 self.stats_max['cpu']['user+sys'],
289 ('Max CPU usage in a single interval: {}%',
290 self.stats_max['cpu']['user+sys__rate'],
292 ('Overall CPU usage: {}%',
293 float(self.job_tot['cpu']['user+sys']) /
294 self.job_tot['time']['elapsed']
295 if self.job_tot['time']['elapsed'] > 0 else 0,
297 ('Max memory used by a single task: {}GB',
298 self.stats_max['mem']['rss'],
300 ('Max network traffic in a single task: {}GB',
301 self.stats_max['net:eth0']['tx+rx'] +
302 self.stats_max['net:keep0']['tx+rx'],
304 ('Max network speed in a single interval: {}MB/s',
305 self.stats_max['net:eth0']['tx+rx__rate'] +
306 self.stats_max['net:keep0']['tx+rx__rate'],
308 ('Keep cache miss rate {}%',
309 (float(self.job_tot['keepcache']['miss']) /
310 float(self.job_tot['keepcalls']['get']))
311 if self.job_tot['keepcalls']['get'] > 0 else 0,
312 lambda x: x * 100.0),
313 ('Keep cache utilization {}%',
314 (float(self.job_tot['blkio:0:0']['read']) /
315 float(self.job_tot['net:keep0']['rx']))
316 if self.job_tot['net:keep0']['rx'] > 0 else 0,
317 lambda x: x * 100.0)):
318 format_string, val, transform = args
319 if val == float('-Inf'):
323 yield "# "+format_string.format(self._format(val))
325 def _recommend_gen(self):
326 # TODO recommend fixing job granularity if elapsed time is too short
327 return itertools.chain(
328 self._recommend_cpu(),
329 self._recommend_ram(),
330 self._recommend_keep_cache())
332 def _recommend_cpu(self):
333 """Recommend asking for 4 cores if max CPU usage was 333%"""
335 constraint_key = self._map_runtime_constraint('vcpus')
336 cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
337 if cpu_max_rate == float('-Inf') or cpu_max_rate == 0.0:
338 logger.warning('%s: no CPU usage data', self.label)
340 # TODO Don't necessarily want to recommend on isolated max peak
341 # take average CPU usage into account as well or % time at max
342 used_cores = max(1, int(math.ceil(cpu_max_rate)))
343 asked_cores = self.existing_constraints.get(constraint_key)
344 if asked_cores is None:
346 # TODO: This should be more nuanced in cases where max >> avg
347 if used_cores < asked_cores:
349 '#!! {} max CPU usage was {}% -- '
350 'try reducing runtime_constraints to "{}":{}'
353 math.ceil(cpu_max_rate*100),
357 # FIXME: This needs to be updated to account for current nodemanager algorithms
358 def _recommend_ram(self):
359 """Recommend an economical RAM constraint for this job.
361 Nodes that are advertised as "8 gibibytes" actually have what
362 we might call "8 nearlygibs" of memory available for jobs.
363 Here, we calculate a whole number of nearlygibs that would
364 have sufficed to run the job, then recommend requesting a node
365 with that number of nearlygibs (expressed as mebibytes).
367 Requesting a node with "nearly 8 gibibytes" is our best hope
368 of getting a node that actually has nearly 8 gibibytes
369 available. If the node manager is smart enough to account for
370 the discrepancy itself when choosing/creating a node, we'll
371 get an 8 GiB node with nearly 8 GiB available. Otherwise, the
372 advertised size of the next-size-smaller node (say, 6 GiB)
373 will be too low to satisfy our request, so we will effectively
374 get rounded up to 8 GiB.
376 For example, if we need 7500 MiB, we can ask for 7500 MiB, and
377 we will generally get a node that is advertised as "8 GiB" and
378 has at least 7500 MiB available. However, asking for 8192 MiB
379 would either result in an unnecessarily expensive 12 GiB node
380 (if node manager knows about the discrepancy), or an 8 GiB
381 node which has less than 8192 MiB available and is therefore
382 considered by crunch-dispatch to be too small to meet our
385 When node manager learns how to predict the available memory
386 for each node type such that crunch-dispatch always agrees
387 that a node is big enough to run the job it was brought up
388 for, all this will be unnecessary. We'll just ask for exactly
389 the memory we want -- even if that happens to be 8192 MiB.
392 constraint_key = self._map_runtime_constraint('ram')
393 used_bytes = self.stats_max['mem']['rss']
394 if used_bytes == float('-Inf'):
395 logger.warning('%s: no memory usage data', self.label)
397 used_mib = math.ceil(float(used_bytes) / MB)
398 asked_mib = self.existing_constraints.get(constraint_key)
400 nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
401 if used_mib > 0 and (asked_mib is None or (
402 math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib))):
404 '#!! {} max RSS was {} MiB -- '
405 'try reducing runtime_constraints to "{}":{}'
410 int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024*(MB)/self._runtime_constraint_mem_unit()))
412 def _recommend_keep_cache(self):
413 """Recommend increasing keep cache if utilization < 80%"""
414 constraint_key = self._map_runtime_constraint('keep_cache_ram')
415 if self.job_tot['net:keep0']['rx'] == 0:
417 utilization = (float(self.job_tot['blkio:0:0']['read']) /
418 float(self.job_tot['net:keep0']['rx']))
419 # FIXME: the default on this get won't work correctly
420 asked_cache = self.existing_constraints.get(constraint_key, 256) * self._runtime_constraint_mem_unit()
422 if utilization < 0.8:
424 '#!! {} Keep cache utilization was {:.2f}% -- '
425 'try doubling runtime_constraints to "{}":{} (or more)'
430 math.ceil(asked_cache * 2 / self._runtime_constraint_mem_unit()))
433 def _format(self, val):
434 """Return a string representation of a stat.
436 {:.2f} for floats, default format for everything else."""
437 if isinstance(val, float):
438 return '{:.2f}'.format(val)
440 return '{}'.format(val)
442 def _runtime_constraint_mem_unit(self):
443 if hasattr(self, 'runtime_constraint_mem_unit'):
444 return self.runtime_constraint_mem_unit
445 elif self.detected_crunch1:
446 return JobSummarizer.runtime_constraint_mem_unit
448 return ContainerSummarizer.runtime_constraint_mem_unit
450 def _map_runtime_constraint(self, key):
451 if hasattr(self, 'map_runtime_constraint'):
452 return self.map_runtime_constraint[key]
453 elif self.detected_crunch1:
454 return JobSummarizer.map_runtime_constraint[key]
459 class CollectionSummarizer(Summarizer):
460 def __init__(self, collection_id, **kwargs):
461 super(CollectionSummarizer, self).__init__(
462 crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
463 self.label = collection_id
466 def NewSummarizer(process_or_uuid, **kwargs):
467 """Construct with the appropriate subclass for this uuid/object."""
469 if isinstance(process_or_uuid, dict):
470 process = process_or_uuid
471 uuid = process['uuid']
473 uuid = process_or_uuid
475 arv = arvados.api('v1', model=OrderedJsonModel())
477 if '-dz642-' in uuid:
479 process = arv.containers().get(uuid=uuid).execute()
480 klass = ContainerTreeSummarizer
481 elif '-xvhdp-' in uuid:
483 process = arv.container_requests().get(uuid=uuid).execute()
484 klass = ContainerTreeSummarizer
485 elif '-8i9sb-' in uuid:
487 process = arv.jobs().get(uuid=uuid).execute()
488 klass = JobTreeSummarizer
489 elif '-d1hrv-' in uuid:
491 process = arv.pipeline_instances().get(uuid=uuid).execute()
492 klass = PipelineSummarizer
493 elif '-4zz18-' in uuid:
494 return CollectionSummarizer(collection_id=uuid)
496 raise ArgumentError("Unrecognized uuid %s", uuid)
497 return klass(process, uuid=uuid, **kwargs)
500 class ProcessSummarizer(Summarizer):
501 """Process is a job, pipeline, container, or container request."""
503 def __init__(self, process, label=None, **kwargs):
505 self.process = process
507 label = self.process.get('name', self.process['uuid'])
508 if self.process.get('log'):
510 rdr = crunchstat_summary.reader.CollectionReader(self.process['log'])
511 except arvados.errors.NotFoundError as e:
512 logger.warning("Trying event logs after failing to read "
513 "log collection %s: %s", self.process['log'], e)
515 rdr = crunchstat_summary.reader.LiveLogReader(self.process['uuid'])
516 label = label + ' (partial)'
517 super(ProcessSummarizer, self).__init__(rdr, label=label, **kwargs)
518 self.existing_constraints = self.process.get('runtime_constraints', {})
521 class JobSummarizer(ProcessSummarizer):
522 runtime_constraint_mem_unit = MB
523 map_runtime_constraint = {
524 'keep_cache_ram': 'keep_cache_mb_per_task',
525 'ram': 'min_ram_mb_per_node',
526 'vcpus': 'min_cores_per_node',
530 class ContainerSummarizer(ProcessSummarizer):
531 runtime_constraint_mem_unit = 1
534 class MultiSummarizer(object):
535 def __init__(self, children={}, label=None, threads=1, **kwargs):
536 self.throttle = threading.Semaphore(threads)
537 self.children = children
540 def run_and_release(self, target, *args, **kwargs):
542 return target(*args, **kwargs)
544 self.throttle.release()
548 for child in self.children.values():
549 self.throttle.acquire()
550 t = threading.Thread(target=self.run_and_release, args=(child.run, ))
557 def text_report(self):
559 d = self._descendants()
560 for child in d.values():
562 txt += '### Summary for {} ({})\n'.format(
563 child.label, child.process['uuid'])
564 txt += child.text_report()
568 def _descendants(self):
569 """Dict of self and all descendants.
571 Nodes with nothing of their own to report (like
572 MultiSummarizers) are omitted.
574 d = collections.OrderedDict()
575 for key, child in self.children.items():
576 if isinstance(child, Summarizer):
578 if isinstance(child, MultiSummarizer):
579 d.update(child._descendants())
582 def html_report(self):
583 return WEBCHART_CLASS(self.label, iter(self._descendants().values())).html()
586 class JobTreeSummarizer(MultiSummarizer):
587 """Summarizes a job and all children listed in its components field."""
588 def __init__(self, job, label=None, **kwargs):
589 arv = arvados.api('v1', model=OrderedJsonModel())
590 label = label or job.get('name', job['uuid'])
591 children = collections.OrderedDict()
592 children[job['uuid']] = JobSummarizer(job, label=label, **kwargs)
593 if job.get('components', None):
595 for j in arv.jobs().index(
596 limit=len(job['components']),
597 filters=[['uuid','in',list(job['components'].values())]]).execute()['items']:
598 preloaded[j['uuid']] = j
599 for cname in sorted(job['components'].keys()):
600 child_uuid = job['components'][cname]
601 j = (preloaded.get(child_uuid) or
602 arv.jobs().get(uuid=child_uuid).execute())
603 children[child_uuid] = JobTreeSummarizer(job=j, label=cname, **kwargs)
605 super(JobTreeSummarizer, self).__init__(
611 class PipelineSummarizer(MultiSummarizer):
612 def __init__(self, instance, **kwargs):
613 children = collections.OrderedDict()
614 for cname, component in instance['components'].items():
615 if 'job' not in component:
617 "%s: skipping component with no job assigned", cname)
620 "%s: job %s", cname, component['job']['uuid'])
621 summarizer = JobTreeSummarizer(component['job'], label=cname, **kwargs)
622 summarizer.label = '{} {}'.format(
623 cname, component['job']['uuid'])
624 children[cname] = summarizer
625 super(PipelineSummarizer, self).__init__(
627 label=instance['uuid'],
631 class ContainerTreeSummarizer(MultiSummarizer):
632 def __init__(self, root, skip_child_jobs=False, **kwargs):
633 arv = arvados.api('v1', model=OrderedJsonModel())
635 label = kwargs.pop('label', None) or root.get('name') or root['uuid']
638 children = collections.OrderedDict()
639 todo = collections.deque((root, ))
641 current = todo.popleft()
642 label = current['name']
643 sort_key = current['created_at']
644 if current['uuid'].find('-xvhdp-') > 0:
645 current = arv.containers().get(uuid=current['container_uuid']).execute()
647 summer = ContainerSummarizer(current, label=label, **kwargs)
648 summer.sort_key = sort_key
649 children[current['uuid']] = summer
653 child_crs = arv.container_requests().index(
655 filters=page_filters+[
656 ['requesting_container_uuid', '=', current['uuid']]],
658 if not child_crs['items']:
660 elif skip_child_jobs:
661 logger.warning('%s: omitting stats from %d child containers'
662 ' because --skip-child-jobs flag is on',
663 label, child_crs['items_available'])
665 page_filters = [['uuid', '>', child_crs['items'][-1]['uuid']]]
666 for cr in child_crs['items']:
667 if cr['container_uuid']:
668 logger.debug('%s: container req %s', current['uuid'], cr['uuid'])
669 cr['name'] = cr.get('name') or cr['uuid']
671 sorted_children = collections.OrderedDict()
672 for uuid in sorted(list(children.keys()), key=lambda uuid: children[uuid].sort_key):
673 sorted_children[uuid] = children[uuid]
674 super(ContainerTreeSummarizer, self).__init__(
675 children=sorted_children,