1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
7 import crunchstat_summary.dygraphs
8 import crunchstat_summary.reader
18 from arvados.api import OrderedJsonModel
19 from crunchstat_summary import logger
21 # Recommend memory constraints that are this multiple of an integral
22 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
23 # that have amounts like 7.5 GiB according to the kernel.)
24 AVAILABLE_RAM_RATIO = 0.95
27 # Workaround datetime.datetime.strptime() thread-safety bug by calling
28 # it once before starting threads. https://bugs.python.org/issue7980
29 datetime.datetime.strptime('1999-12-31_23:59:59', '%Y-%m-%d_%H:%M:%S')
32 WEBCHART_CLASS = crunchstat_summary.dygraphs.DygraphsChart
38 self.finishtime = None
39 self.series = collections.defaultdict(list)
42 class Summarizer(object):
43 def __init__(self, logdata, label=None, skip_child_jobs=False, uuid=None, **kwargs):
44 self._logdata = logdata
49 self.finishtime = None
50 self._skip_child_jobs = skip_child_jobs
52 # stats_max: {category: {stat: val}}
53 self.stats_max = collections.defaultdict(
54 functools.partial(collections.defaultdict, lambda: 0))
55 # task_stats: {task_id: {category: {stat: val}}}
56 self.task_stats = collections.defaultdict(
57 functools.partial(collections.defaultdict, dict))
60 self.tasks = collections.defaultdict(Task)
62 # We won't bother recommending new runtime constraints if the
63 # constraints given when running the job are known to us and
64 # are already suitable. If applicable, the subclass
65 # constructor will overwrite this with something useful.
66 self.existing_constraints = {}
68 logger.debug("%s: logdata %s", self.label, logdata)
71 logger.debug("%s: parsing logdata %s", self.label, self._logdata)
72 with self._logdata as logdata:
75 def _run(self, logdata):
76 self.detected_crunch1 = False
78 if not self.detected_crunch1 and '-8i9sb-' in line:
79 self.detected_crunch1 = True
81 if self.detected_crunch1:
82 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
84 seq = int(m.group('seq'))
85 uuid = m.group('task_uuid')
86 self.seq_to_uuid[seq] = uuid
87 logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
90 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
92 task_id = self.seq_to_uuid[int(m.group('seq'))]
93 elapsed = int(m.group('elapsed'))
94 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
95 if elapsed > self.stats_max['time']['elapsed']:
96 self.stats_max['time']['elapsed'] = elapsed
99 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
101 uuid = m.group('uuid')
102 if self._skip_child_jobs:
103 logger.warning('%s: omitting stats from child job %s'
104 ' because --skip-child-jobs flag is on',
107 logger.debug('%s: follow %s', self.label, uuid)
108 child_summarizer = ProcessSummarizer(uuid)
109 child_summarizer.stats_max = self.stats_max
110 child_summarizer.task_stats = self.task_stats
111 child_summarizer.tasks = self.tasks
112 child_summarizer.starttime = self.starttime
113 child_summarizer.run()
114 logger.debug('%s: done %s', self.label, uuid)
117 # 2017-12-02_17:15:08 e51c5-8i9sb-mfp68stkxnqdd6m 63676 0 stderr crunchstat: keepcalls 0 put 2576 get -- interval 10.0000 seconds 0 put 2576 get
118 m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr (?P<crunchstat>crunchstat: )(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
123 # 2017-12-01T16:56:24.723509200Z crunchstat: keepcalls 0 put 3 get -- interval 10.0000 seconds 0 put 3 get
124 m = re.search(r'^(?P<timestamp>\S+) (?P<crunchstat>crunchstat: )?(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
128 if self.label is None:
130 self.label = m.group('job_uuid')
132 self.label = 'container'
133 if m.group('category').endswith(':'):
134 # "stderr crunchstat: notice: ..."
136 elif m.group('category') in ('error', 'caught'):
138 elif m.group('category') in ('read', 'open', 'cgroup', 'CID', 'Running'):
139 # "stderr crunchstat: read /proc/1234/net/dev: ..."
140 # (old logs are less careful with unprefixed error messages)
143 if self.detected_crunch1:
144 task_id = self.seq_to_uuid[int(m.group('seq'))]
146 task_id = 'container'
147 task = self.tasks[task_id]
149 # Use the first and last crunchstat timestamps as
150 # approximations of starttime and finishtime.
151 timestamp = m.group('timestamp')
152 if timestamp[10:11] == '_':
153 timestamp = datetime.datetime.strptime(
154 timestamp, '%Y-%m-%d_%H:%M:%S')
155 elif timestamp[10:11] == 'T':
156 timestamp = datetime.datetime.strptime(
157 timestamp[:19], '%Y-%m-%dT%H:%M:%S')
159 raise ValueError("Cannot parse timestamp {!r}".format(
162 if task.starttime is None:
163 logger.debug('%s: task %s starttime %s',
164 self.label, task_id, timestamp)
165 if task.starttime is None or timestamp < task.starttime:
166 task.starttime = timestamp
167 if task.finishtime is None or timestamp > task.finishtime:
168 task.finishtime = timestamp
170 if self.starttime is None or timestamp < self.starttime:
171 self.starttime = timestamp
172 if self.finishtime is None or timestamp > self.finishtime:
173 self.finishtime = timestamp
175 if (not self.detected_crunch1) and task.starttime is not None and task.finishtime is not None:
176 elapsed = (task.finishtime - task.starttime).seconds
177 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
178 if elapsed > self.stats_max['time']['elapsed']:
179 self.stats_max['time']['elapsed'] = elapsed
181 this_interval_s = None
182 for group in ['current', 'interval']:
183 if not m.group(group):
185 category = m.group('category')
186 words = m.group(group).split(' ')
189 for val, stat in zip(words[::2], words[1::2]):
191 stats[stat] = float(val)
193 stats[stat] = int(val)
194 except ValueError as e:
195 # If the line doesn't start with 'crunchstat:' we
196 # might have mistaken an error message for a
197 # structured crunchstat line.
198 if m.group("crunchstat") is None or m.group("category") == "crunchstat":
199 logger.warning("%s: log contains message\n %s", self.label, line)
202 '%s: Error parsing value %r (stat %r, category %r): %r',
203 self.label, val, stat, category, e)
204 logger.warning('%s', line)
206 if 'user' in stats or 'sys' in stats:
207 stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
208 if 'tx' in stats or 'rx' in stats:
209 stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
210 if group == 'interval':
211 if 'seconds' in stats:
212 this_interval_s = stats.get('seconds',0)
214 if this_interval_s <= 0:
216 "BUG? interval stat given with duration {!r}".
217 format(this_interval_s))
219 logger.error('BUG? interval stat missing duration')
220 for stat, val in stats.items():
221 if group == 'interval' and this_interval_s:
222 stat = stat + '__rate'
223 val = val / this_interval_s
224 if stat in ['user+sys__rate', 'tx+rx__rate']:
225 task.series[category, stat].append(
226 (timestamp - self.starttime, val))
229 task.series[category, stat].append(
230 (timestamp - self.starttime, val))
231 self.task_stats[task_id][category][stat] = val
232 if val > self.stats_max[category][stat]:
233 self.stats_max[category][stat] = val
234 logger.debug('%s: done parsing', self.label)
236 self.job_tot = collections.defaultdict(
237 functools.partial(collections.defaultdict, int))
238 for task_id, task_stat in self.task_stats.items():
239 for category, stat_last in task_stat.items():
240 for stat, val in stat_last.items():
241 if stat in ['cpus', 'cache', 'swap', 'rss']:
242 # meaningless stats like 16 cpu cores x 5 tasks = 80
244 self.job_tot[category][stat] += val
245 logger.debug('%s: done totals', self.label)
247 def long_label(self):
249 if hasattr(self, 'process') and self.process['uuid'] not in label:
250 label = '{} ({})'.format(label, self.process['uuid'])
252 label += ' -- elapsed time '
253 s = (self.finishtime - self.starttime).total_seconds()
255 label += '{}d'.format(int(s/86400))
257 label += '{}h'.format(int(s/3600) % 24)
259 label += '{}m'.format(int(s/60) % 60)
260 label += '{}s'.format(int(s) % 60)
263 def text_report(self):
265 return "(no report generated)\n"
266 return "\n".join(itertools.chain(
267 self._text_report_gen(),
268 self._recommend_gen())) + "\n"
270 def html_report(self):
271 return WEBCHART_CLASS(self.label, [self]).html()
273 def _text_report_gen(self):
274 yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
275 for category, stat_max in sorted(self.stats_max.items()):
276 for stat, val in sorted(stat_max.items()):
277 if stat.endswith('__rate'):
279 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
280 val = self._format(val)
281 tot = self._format(self.job_tot[category].get(stat, '-'))
282 yield "\t".join([category, stat, str(val), max_rate, tot])
284 ('Number of tasks: {}',
287 ('Max CPU time spent by a single task: {}s',
288 self.stats_max['cpu']['user+sys'],
290 ('Max CPU usage in a single interval: {}%',
291 self.stats_max['cpu']['user+sys__rate'],
293 ('Overall CPU usage: {}%',
294 float(self.job_tot['cpu']['user+sys']) /
295 self.job_tot['time']['elapsed']
296 if self.job_tot['time']['elapsed'] > 0 else 0,
298 ('Max memory used by a single task: {}GB',
299 self.stats_max['mem']['rss'],
301 ('Max network traffic in a single task: {}GB',
302 self.stats_max['net:eth0']['tx+rx'] +
303 self.stats_max['net:keep0']['tx+rx'],
305 ('Max network speed in a single interval: {}MB/s',
306 self.stats_max['net:eth0']['tx+rx__rate'] +
307 self.stats_max['net:keep0']['tx+rx__rate'],
309 ('Keep cache miss rate {}%',
310 (float(self.job_tot['keepcache']['miss']) /
311 float(self.job_tot['keepcalls']['get']))
312 if self.job_tot['keepcalls']['get'] > 0 else 0,
313 lambda x: x * 100.0),
314 ('Keep cache utilization {}%',
315 (float(self.job_tot['blkio:0:0']['read']) /
316 float(self.job_tot['net:keep0']['rx']))
317 if self.job_tot['net:keep0']['rx'] > 0 else 0,
318 lambda x: x * 100.0)):
319 format_string, val, transform = args
320 if val == float('-Inf'):
324 yield "# "+format_string.format(self._format(val))
326 def _recommend_gen(self):
327 # TODO recommend fixing job granularity if elapsed time is too short
328 return itertools.chain(
329 self._recommend_cpu(),
330 self._recommend_ram(),
331 self._recommend_keep_cache())
333 def _recommend_cpu(self):
334 """Recommend asking for 4 cores if max CPU usage was 333%"""
336 constraint_key = self._map_runtime_constraint('vcpus')
337 cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
338 if cpu_max_rate == float('-Inf') or cpu_max_rate == 0.0:
339 logger.warning('%s: no CPU usage data', self.label)
341 # TODO Don't necessarily want to recommend on isolated max peak
342 # take average CPU usage into account as well or % time at max
343 used_cores = max(1, int(math.ceil(cpu_max_rate)))
344 asked_cores = self.existing_constraints.get(constraint_key)
345 if asked_cores is None:
347 # TODO: This should be more nuanced in cases where max >> avg
348 if used_cores < asked_cores:
350 '#!! {} max CPU usage was {}% -- '
351 'try reducing runtime_constraints to "{}":{}'
354 math.ceil(cpu_max_rate*100),
358 # FIXME: This needs to be updated to account for current nodemanager algorithms
359 def _recommend_ram(self):
360 """Recommend an economical RAM constraint for this job.
362 Nodes that are advertised as "8 gibibytes" actually have what
363 we might call "8 nearlygibs" of memory available for jobs.
364 Here, we calculate a whole number of nearlygibs that would
365 have sufficed to run the job, then recommend requesting a node
366 with that number of nearlygibs (expressed as mebibytes).
368 Requesting a node with "nearly 8 gibibytes" is our best hope
369 of getting a node that actually has nearly 8 gibibytes
370 available. If the node manager is smart enough to account for
371 the discrepancy itself when choosing/creating a node, we'll
372 get an 8 GiB node with nearly 8 GiB available. Otherwise, the
373 advertised size of the next-size-smaller node (say, 6 GiB)
374 will be too low to satisfy our request, so we will effectively
375 get rounded up to 8 GiB.
377 For example, if we need 7500 MiB, we can ask for 7500 MiB, and
378 we will generally get a node that is advertised as "8 GiB" and
379 has at least 7500 MiB available. However, asking for 8192 MiB
380 would either result in an unnecessarily expensive 12 GiB node
381 (if node manager knows about the discrepancy), or an 8 GiB
382 node which has less than 8192 MiB available and is therefore
383 considered by crunch-dispatch to be too small to meet our
386 When node manager learns how to predict the available memory
387 for each node type such that crunch-dispatch always agrees
388 that a node is big enough to run the job it was brought up
389 for, all this will be unnecessary. We'll just ask for exactly
390 the memory we want -- even if that happens to be 8192 MiB.
393 constraint_key = self._map_runtime_constraint('ram')
394 used_bytes = self.stats_max['mem']['rss']
395 if used_bytes == float('-Inf'):
396 logger.warning('%s: no memory usage data', self.label)
398 used_mib = math.ceil(float(used_bytes) / MB)
399 asked_mib = self.existing_constraints.get(constraint_key)
401 nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
402 if used_mib > 0 and (asked_mib is None or (
403 math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib))):
405 '#!! {} max RSS was {} MiB -- '
406 'try reducing runtime_constraints to "{}":{}'
411 int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024*(MB)/self._runtime_constraint_mem_unit()))
413 def _recommend_keep_cache(self):
414 """Recommend increasing keep cache if utilization < 80%"""
415 constraint_key = self._map_runtime_constraint('keep_cache_ram')
416 if self.job_tot['net:keep0']['rx'] == 0:
418 utilization = (float(self.job_tot['blkio:0:0']['read']) /
419 float(self.job_tot['net:keep0']['rx']))
420 # FIXME: the default on this get won't work correctly
421 asked_cache = self.existing_constraints.get(constraint_key, 256) * self._runtime_constraint_mem_unit()
423 if utilization < 0.8:
425 '#!! {} Keep cache utilization was {:.2f}% -- '
426 'try doubling runtime_constraints to "{}":{} (or more)'
431 math.ceil(asked_cache * 2 / self._runtime_constraint_mem_unit()))
434 def _format(self, val):
435 """Return a string representation of a stat.
437 {:.2f} for floats, default format for everything else."""
438 if isinstance(val, float):
439 return '{:.2f}'.format(val)
441 return '{}'.format(val)
443 def _runtime_constraint_mem_unit(self):
444 if hasattr(self, 'runtime_constraint_mem_unit'):
445 return self.runtime_constraint_mem_unit
446 elif self.detected_crunch1:
447 return JobSummarizer.runtime_constraint_mem_unit
449 return ContainerSummarizer.runtime_constraint_mem_unit
451 def _map_runtime_constraint(self, key):
452 if hasattr(self, 'map_runtime_constraint'):
453 return self.map_runtime_constraint[key]
454 elif self.detected_crunch1:
455 return JobSummarizer.map_runtime_constraint[key]
460 class CollectionSummarizer(Summarizer):
461 def __init__(self, collection_id, **kwargs):
462 super(CollectionSummarizer, self).__init__(
463 crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
464 self.label = collection_id
467 def NewSummarizer(process_or_uuid, **kwargs):
468 """Construct with the appropriate subclass for this uuid/object."""
470 if isinstance(process_or_uuid, dict):
471 process = process_or_uuid
472 uuid = process['uuid']
474 uuid = process_or_uuid
476 arv = arvados.api('v1', model=OrderedJsonModel())
478 if '-dz642-' in uuid:
480 process = arv.containers().get(uuid=uuid).execute()
481 klass = ContainerTreeSummarizer
482 elif '-xvhdp-' in uuid:
484 process = arv.container_requests().get(uuid=uuid).execute()
485 klass = ContainerTreeSummarizer
486 elif '-8i9sb-' in uuid:
488 process = arv.jobs().get(uuid=uuid).execute()
489 klass = JobTreeSummarizer
490 elif '-d1hrv-' in uuid:
492 process = arv.pipeline_instances().get(uuid=uuid).execute()
493 klass = PipelineSummarizer
494 elif '-4zz18-' in uuid:
495 return CollectionSummarizer(collection_id=uuid)
497 raise ArgumentError("Unrecognized uuid %s", uuid)
498 return klass(process, uuid=uuid, **kwargs)
501 class ProcessSummarizer(Summarizer):
502 """Process is a job, pipeline, container, or container request."""
504 def __init__(self, process, label=None, **kwargs):
506 self.process = process
508 label = self.process.get('name', self.process['uuid'])
509 if self.process.get('log'):
511 rdr = crunchstat_summary.reader.CollectionReader(self.process['log'])
512 except arvados.errors.NotFoundError as e:
513 logger.warning("Trying event logs after failing to read "
514 "log collection %s: %s", self.process['log'], e)
516 rdr = crunchstat_summary.reader.LiveLogReader(self.process['uuid'])
517 label = label + ' (partial)'
518 super(ProcessSummarizer, self).__init__(rdr, label=label, **kwargs)
519 self.existing_constraints = self.process.get('runtime_constraints', {})
522 class JobSummarizer(ProcessSummarizer):
523 runtime_constraint_mem_unit = MB
524 map_runtime_constraint = {
525 'keep_cache_ram': 'keep_cache_mb_per_task',
526 'ram': 'min_ram_mb_per_node',
527 'vcpus': 'min_cores_per_node',
531 class ContainerSummarizer(ProcessSummarizer):
532 runtime_constraint_mem_unit = 1
535 class MultiSummarizer(object):
536 def __init__(self, children={}, label=None, threads=1, **kwargs):
537 self.throttle = threading.Semaphore(threads)
538 self.children = children
541 def run_and_release(self, target, *args, **kwargs):
543 return target(*args, **kwargs)
545 self.throttle.release()
549 for child in self.children.values():
550 self.throttle.acquire()
551 t = threading.Thread(target=self.run_and_release, args=(child.run, ))
558 def text_report(self):
560 d = self._descendants()
561 for child in d.values():
563 txt += '### Summary for {} ({})\n'.format(
564 child.label, child.process['uuid'])
565 txt += child.text_report()
569 def _descendants(self):
570 """Dict of self and all descendants.
572 Nodes with nothing of their own to report (like
573 MultiSummarizers) are omitted.
575 d = collections.OrderedDict()
576 for key, child in self.children.items():
577 if isinstance(child, Summarizer):
579 if isinstance(child, MultiSummarizer):
580 d.update(child._descendants())
583 def html_report(self):
584 return WEBCHART_CLASS(self.label, iter(self._descendants().values())).html()
587 class JobTreeSummarizer(MultiSummarizer):
588 """Summarizes a job and all children listed in its components field."""
589 def __init__(self, job, label=None, **kwargs):
590 arv = arvados.api('v1', model=OrderedJsonModel())
591 label = label or job.get('name', job['uuid'])
592 children = collections.OrderedDict()
593 children[job['uuid']] = JobSummarizer(job, label=label, **kwargs)
594 if job.get('components', None):
596 for j in arv.jobs().index(
597 limit=len(job['components']),
598 filters=[['uuid','in',list(job['components'].values())]]).execute()['items']:
599 preloaded[j['uuid']] = j
600 for cname in sorted(job['components'].keys()):
601 child_uuid = job['components'][cname]
602 j = (preloaded.get(child_uuid) or
603 arv.jobs().get(uuid=child_uuid).execute())
604 children[child_uuid] = JobTreeSummarizer(job=j, label=cname, **kwargs)
606 super(JobTreeSummarizer, self).__init__(
612 class PipelineSummarizer(MultiSummarizer):
613 def __init__(self, instance, **kwargs):
614 children = collections.OrderedDict()
615 for cname, component in instance['components'].items():
616 if 'job' not in component:
618 "%s: skipping component with no job assigned", cname)
621 "%s: job %s", cname, component['job']['uuid'])
622 summarizer = JobTreeSummarizer(component['job'], label=cname, **kwargs)
623 summarizer.label = '{} {}'.format(
624 cname, component['job']['uuid'])
625 children[cname] = summarizer
626 super(PipelineSummarizer, self).__init__(
628 label=instance['uuid'],
632 class ContainerTreeSummarizer(MultiSummarizer):
633 def __init__(self, root, skip_child_jobs=False, **kwargs):
634 arv = arvados.api('v1', model=OrderedJsonModel())
636 label = kwargs.pop('label', None) or root.get('name') or root['uuid']
639 children = collections.OrderedDict()
640 todo = collections.deque((root, ))
642 current = todo.popleft()
643 label = current['name']
644 sort_key = current['created_at']
645 if current['uuid'].find('-xvhdp-') > 0:
646 current = arv.containers().get(uuid=current['container_uuid']).execute()
648 summer = ContainerSummarizer(current, label=label, **kwargs)
649 summer.sort_key = sort_key
650 children[current['uuid']] = summer
654 child_crs = arv.container_requests().index(
656 filters=page_filters+[
657 ['requesting_container_uuid', '=', current['uuid']]],
659 if not child_crs['items']:
661 elif skip_child_jobs:
662 logger.warning('%s: omitting stats from %d child containers'
663 ' because --skip-child-jobs flag is on',
664 label, child_crs['items_available'])
666 page_filters = [['uuid', '>', child_crs['items'][-1]['uuid']]]
667 for cr in child_crs['items']:
668 if cr['container_uuid']:
669 logger.debug('%s: container req %s', current['uuid'], cr['uuid'])
670 cr['name'] = cr.get('name') or cr['uuid']
672 sorted_children = collections.OrderedDict()
673 for uuid in sorted(list(children.keys()), key=lambda uuid: children[uuid].sort_key):
674 sorted_children[uuid] = children[uuid]
675 super(ContainerTreeSummarizer, self).__init__(
676 children=sorted_children,