1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
5 from __future__ import print_function
9 import crunchstat_summary.dygraphs
10 import crunchstat_summary.reader
20 from arvados.api import OrderedJsonModel
21 from crunchstat_summary import logger
23 # Recommend memory constraints that are this multiple of an integral
24 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
25 # that have amounts like 7.5 GiB according to the kernel.)
26 AVAILABLE_RAM_RATIO = 0.95
29 # Workaround datetime.datetime.strptime() thread-safety bug by calling
30 # it once before starting threads. https://bugs.python.org/issue7980
31 datetime.datetime.strptime('1999-12-31_23:59:59', '%Y-%m-%d_%H:%M:%S')
34 WEBCHART_CLASS = crunchstat_summary.dygraphs.DygraphsChart
40 self.series = collections.defaultdict(list)
43 class Summarizer(object):
44 def __init__(self, logdata, label=None, skip_child_jobs=False, uuid=None, **kwargs):
45 self._logdata = logdata
50 self.finishtime = None
51 self._skip_child_jobs = skip_child_jobs
53 # stats_max: {category: {stat: val}}
54 self.stats_max = collections.defaultdict(
55 functools.partial(collections.defaultdict, lambda: 0))
56 # task_stats: {task_id: {category: {stat: val}}}
57 self.task_stats = collections.defaultdict(
58 functools.partial(collections.defaultdict, dict))
61 self.tasks = collections.defaultdict(Task)
63 # We won't bother recommending new runtime constraints if the
64 # constraints given when running the job are known to us and
65 # are already suitable. If applicable, the subclass
66 # constructor will overwrite this with something useful.
67 self.existing_constraints = {}
69 logger.debug("%s: logdata %s", self.label, logdata)
72 logger.debug("%s: parsing logdata %s", self.label, self._logdata)
73 with self._logdata as logdata:
76 def _run(self, logdata):
77 self.detected_crunch1 = False
79 if not self.detected_crunch1 and '-8i9sb-' in line:
80 self.detected_crunch1 = True
82 if self.detected_crunch1:
83 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
85 seq = int(m.group('seq'))
86 uuid = m.group('task_uuid')
87 self.seq_to_uuid[seq] = uuid
88 logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
91 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
93 task_id = self.seq_to_uuid[int(m.group('seq'))]
94 elapsed = int(m.group('elapsed'))
95 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
96 if elapsed > self.stats_max['time']['elapsed']:
97 self.stats_max['time']['elapsed'] = elapsed
100 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
102 uuid = m.group('uuid')
103 if self._skip_child_jobs:
104 logger.warning('%s: omitting stats from child job %s'
105 ' because --skip-child-jobs flag is on',
108 logger.debug('%s: follow %s', self.label, uuid)
109 child_summarizer = ProcessSummarizer(uuid)
110 child_summarizer.stats_max = self.stats_max
111 child_summarizer.task_stats = self.task_stats
112 child_summarizer.tasks = self.tasks
113 child_summarizer.starttime = self.starttime
114 child_summarizer.run()
115 logger.debug('%s: done %s', self.label, uuid)
118 m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
123 m = re.search(r'^(?P<timestamp>\S+) (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
127 if self.label is None:
129 self.label = m.group('job_uuid')
131 self.label = 'container'
132 if m.group('category').endswith(':'):
133 # "stderr crunchstat: notice: ..."
135 elif m.group('category') in ('error', 'caught'):
137 elif m.group('category') in ('read', 'open', 'cgroup', 'CID', 'Running'):
138 # "stderr crunchstat: read /proc/1234/net/dev: ..."
139 # (old logs are less careful with unprefixed error messages)
142 if self.detected_crunch1:
143 task_id = self.seq_to_uuid[int(m.group('seq'))]
145 task_id = 'container'
146 task = self.tasks[task_id]
148 # Use the first and last crunchstat timestamps as
149 # approximations of starttime and finishtime.
150 timestamp = m.group('timestamp')
151 if timestamp[10:11] == '_':
152 timestamp = datetime.datetime.strptime(
153 timestamp, '%Y-%m-%d_%H:%M:%S')
154 elif timestamp[10:11] == 'T':
155 timestamp = datetime.datetime.strptime(
156 timestamp[:19], '%Y-%m-%dT%H:%M:%S')
158 raise ValueError("Cannot parse timestamp {!r}".format(
161 if not task.starttime:
162 task.starttime = timestamp
163 logger.debug('%s: task %s starttime %s',
164 self.label, task_id, timestamp)
165 task.finishtime = timestamp
167 if not self.starttime:
168 self.starttime = timestamp
169 self.finishtime = timestamp
171 this_interval_s = None
172 for group in ['current', 'interval']:
173 if not m.group(group):
175 category = m.group('category')
176 words = m.group(group).split(' ')
179 for val, stat in zip(words[::2], words[1::2]):
181 stats[stat] = float(val)
183 stats[stat] = int(val)
184 except ValueError as e:
186 'Error parsing value %r (stat %r, category %r): %r',
187 val, stat, category, e)
188 logger.warning('%s', line)
190 if 'user' in stats or 'sys' in stats:
191 stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
192 if 'tx' in stats or 'rx' in stats:
193 stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
194 for stat, val in stats.iteritems():
195 if group == 'interval':
196 if stat == 'seconds':
197 this_interval_s = val
199 elif not (this_interval_s > 0):
201 "BUG? interval stat given with duration {!r}".
202 format(this_interval_s))
205 stat = stat + '__rate'
206 val = val / this_interval_s
207 if stat in ['user+sys__rate', 'tx+rx__rate']:
208 task.series[category, stat].append(
209 (timestamp - self.starttime, val))
212 task.series[category, stat].append(
213 (timestamp - self.starttime, val))
214 self.task_stats[task_id][category][stat] = val
215 if val > self.stats_max[category][stat]:
216 self.stats_max[category][stat] = val
217 logger.debug('%s: done parsing', self.label)
219 self.job_tot = collections.defaultdict(
220 functools.partial(collections.defaultdict, int))
221 for task_id, task_stat in self.task_stats.iteritems():
222 for category, stat_last in task_stat.iteritems():
223 for stat, val in stat_last.iteritems():
224 if stat in ['cpus', 'cache', 'swap', 'rss']:
225 # meaningless stats like 16 cpu cores x 5 tasks = 80
227 self.job_tot[category][stat] += val
228 logger.debug('%s: done totals', self.label)
230 def long_label(self):
232 if hasattr(self, 'process') and self.process['uuid'] not in label:
233 label = '{} ({})'.format(label, self.process['uuid'])
235 label += ' -- elapsed time '
236 s = (self.finishtime - self.starttime).total_seconds()
238 label += '{}d'.format(int(s/86400))
240 label += '{}h'.format(int(s/3600) % 24)
242 label += '{}m'.format(int(s/60) % 60)
243 label += '{}s'.format(int(s) % 60)
246 def text_report(self):
248 return "(no report generated)\n"
249 return "\n".join(itertools.chain(
250 self._text_report_gen(),
251 self._recommend_gen())) + "\n"
253 def html_report(self):
254 return WEBCHART_CLASS(self.label, [self]).html()
256 def _text_report_gen(self):
257 yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
258 for category, stat_max in sorted(self.stats_max.iteritems()):
259 for stat, val in sorted(stat_max.iteritems()):
260 if stat.endswith('__rate'):
262 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
263 val = self._format(val)
264 tot = self._format(self.job_tot[category].get(stat, '-'))
265 yield "\t".join([category, stat, str(val), max_rate, tot])
267 ('Number of tasks: {}',
270 ('Max CPU time spent by a single task: {}s',
271 self.stats_max['cpu']['user+sys'],
273 ('Max CPU usage in a single interval: {}%',
274 self.stats_max['cpu']['user+sys__rate'],
276 ('Overall CPU usage: {}%',
277 self.job_tot['cpu']['user+sys'] /
278 self.job_tot['time']['elapsed']
279 if self.job_tot['time']['elapsed'] > 0 else 0,
281 ('Max memory used by a single task: {}GB',
282 self.stats_max['mem']['rss'],
284 ('Max network traffic in a single task: {}GB',
285 self.stats_max['net:eth0']['tx+rx'] +
286 self.stats_max['net:keep0']['tx+rx'],
288 ('Max network speed in a single interval: {}MB/s',
289 self.stats_max['net:eth0']['tx+rx__rate'] +
290 self.stats_max['net:keep0']['tx+rx__rate'],
292 ('Keep cache miss rate {}%',
293 (float(self.job_tot['keepcache']['miss']) /
294 float(self.job_tot['keepcalls']['get']))
295 if self.job_tot['keepcalls']['get'] > 0 else 0,
296 lambda x: x * 100.0),
297 ('Keep cache utilization {}%',
298 (float(self.job_tot['blkio:0:0']['read']) /
299 float(self.job_tot['net:keep0']['rx']))
300 if self.job_tot['net:keep0']['rx'] > 0 else 0,
301 lambda x: x * 100.0)):
302 format_string, val, transform = args
303 if val == float('-Inf'):
307 yield "# "+format_string.format(self._format(val))
309 def _recommend_gen(self):
310 return itertools.chain(
311 self._recommend_cpu(),
312 self._recommend_ram(),
313 self._recommend_keep_cache())
315 def _recommend_cpu(self):
316 """Recommend asking for 4 cores if max CPU usage was 333%"""
318 constraint_key = self._map_runtime_constraint('vcpus')
319 cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
320 if cpu_max_rate == float('-Inf'):
321 logger.warning('%s: no CPU usage data', self.label)
323 used_cores = max(1, int(math.ceil(cpu_max_rate)))
324 asked_cores = self.existing_constraints.get(constraint_key)
325 if asked_cores is None or used_cores < asked_cores:
327 '#!! {} max CPU usage was {}% -- '
328 'try runtime_constraints "{}":{}'
331 int(math.ceil(cpu_max_rate*100)),
335 def _recommend_ram(self):
336 """Recommend an economical RAM constraint for this job.
338 Nodes that are advertised as "8 gibibytes" actually have what
339 we might call "8 nearlygibs" of memory available for jobs.
340 Here, we calculate a whole number of nearlygibs that would
341 have sufficed to run the job, then recommend requesting a node
342 with that number of nearlygibs (expressed as mebibytes).
344 Requesting a node with "nearly 8 gibibytes" is our best hope
345 of getting a node that actually has nearly 8 gibibytes
346 available. If the node manager is smart enough to account for
347 the discrepancy itself when choosing/creating a node, we'll
348 get an 8 GiB node with nearly 8 GiB available. Otherwise, the
349 advertised size of the next-size-smaller node (say, 6 GiB)
350 will be too low to satisfy our request, so we will effectively
351 get rounded up to 8 GiB.
353 For example, if we need 7500 MiB, we can ask for 7500 MiB, and
354 we will generally get a node that is advertised as "8 GiB" and
355 has at least 7500 MiB available. However, asking for 8192 MiB
356 would either result in an unnecessarily expensive 12 GiB node
357 (if node manager knows about the discrepancy), or an 8 GiB
358 node which has less than 8192 MiB available and is therefore
359 considered by crunch-dispatch to be too small to meet our
362 When node manager learns how to predict the available memory
363 for each node type such that crunch-dispatch always agrees
364 that a node is big enough to run the job it was brought up
365 for, all this will be unnecessary. We'll just ask for exactly
366 the memory we want -- even if that happens to be 8192 MiB.
369 constraint_key = self._map_runtime_constraint('ram')
370 used_bytes = self.stats_max['mem']['rss']
371 if used_bytes == float('-Inf'):
372 logger.warning('%s: no memory usage data', self.label)
374 used_mib = math.ceil(float(used_bytes) / 1048576)
375 asked_mib = self.existing_constraints.get(constraint_key)
377 nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
378 if asked_mib is None or (
379 math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib)):
381 '#!! {} max RSS was {} MiB -- '
382 'try runtime_constraints "{}":{}'
387 int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024*(2**20)/self._runtime_constraint_mem_unit()))
389 def _recommend_keep_cache(self):
390 """Recommend increasing keep cache if utilization < 80%"""
391 constraint_key = self._map_runtime_constraint('keep_cache_ram')
392 if self.job_tot['net:keep0']['rx'] == 0:
394 utilization = (float(self.job_tot['blkio:0:0']['read']) /
395 float(self.job_tot['net:keep0']['rx']))
396 asked_mib = self.existing_constraints.get(constraint_key, 256)
398 if utilization < 0.8:
400 '#!! {} Keep cache utilization was {:.2f}% -- '
401 'try runtime_constraints "{}":{} (or more)'
406 asked_mib*2*(2**20)/self._runtime_constraint_mem_unit())
409 def _format(self, val):
410 """Return a string representation of a stat.
412 {:.2f} for floats, default format for everything else."""
413 if isinstance(val, float):
414 return '{:.2f}'.format(val)
416 return '{}'.format(val)
418 def _runtime_constraint_mem_unit(self):
419 if hasattr(self, 'runtime_constraint_mem_unit'):
420 return self.runtime_constraint_mem_unit
421 elif self.detected_crunch1:
422 return JobSummarizer.runtime_constraint_mem_unit
424 return ContainerSummarizer.runtime_constraint_mem_unit
426 def _map_runtime_constraint(self, key):
427 if hasattr(self, 'map_runtime_constraint'):
428 return self.map_runtime_constraint[key]
429 elif self.detected_crunch1:
430 return JobSummarizer.map_runtime_constraint[key]
435 class CollectionSummarizer(Summarizer):
436 def __init__(self, collection_id, **kwargs):
437 super(CollectionSummarizer, self).__init__(
438 crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
439 self.label = collection_id
442 def NewSummarizer(process_or_uuid, **kwargs):
443 """Construct with the appropriate subclass for this uuid/object."""
445 if isinstance(process_or_uuid, dict):
446 process = process_or_uuid
447 uuid = process['uuid']
449 uuid = process_or_uuid
451 arv = arvados.api('v1', model=OrderedJsonModel())
453 if '-dz642-' in uuid:
455 process = arv.containers().get(uuid=uuid).execute()
456 klass = ContainerTreeSummarizer
457 elif '-xvhdp-' in uuid:
459 process = arv.container_requests().get(uuid=uuid).execute()
460 klass = ContainerTreeSummarizer
461 elif '-8i9sb-' in uuid:
463 process = arv.jobs().get(uuid=uuid).execute()
464 klass = JobTreeSummarizer
465 elif '-d1hrv-' in uuid:
467 process = arv.pipeline_instances().get(uuid=uuid).execute()
468 klass = PipelineSummarizer
469 elif '-4zz18-' in uuid:
470 return CollectionSummarizer(collection_id=uuid)
472 raise ArgumentError("Unrecognized uuid %s", uuid)
473 return klass(process, uuid=uuid, **kwargs)
476 class ProcessSummarizer(Summarizer):
477 """Process is a job, pipeline, container, or container request."""
479 def __init__(self, process, label=None, **kwargs):
481 self.process = process
483 label = self.process.get('name', self.process['uuid'])
484 if self.process.get('log'):
486 rdr = crunchstat_summary.reader.CollectionReader(self.process['log'])
487 except arvados.errors.NotFoundError as e:
488 logger.warning("Trying event logs after failing to read "
489 "log collection %s: %s", self.process['log'], e)
491 rdr = crunchstat_summary.reader.LiveLogReader(self.process['uuid'])
492 label = label + ' (partial)'
493 super(ProcessSummarizer, self).__init__(rdr, label=label, **kwargs)
494 self.existing_constraints = self.process.get('runtime_constraints', {})
497 class JobSummarizer(ProcessSummarizer):
498 runtime_constraint_mem_unit = 1048576
499 map_runtime_constraint = {
500 'keep_cache_ram': 'keep_cache_mb_per_task',
501 'ram': 'min_ram_mb_per_node',
502 'vcpus': 'min_cores_per_node',
506 class ContainerSummarizer(ProcessSummarizer):
507 runtime_constraint_mem_unit = 1
510 class MultiSummarizer(object):
511 def __init__(self, children={}, label=None, threads=1, **kwargs):
512 self.throttle = threading.Semaphore(threads)
513 self.children = children
516 def run_and_release(self, target, *args, **kwargs):
518 return target(*args, **kwargs)
520 self.throttle.release()
524 for child in self.children.itervalues():
525 self.throttle.acquire()
526 t = threading.Thread(target=self.run_and_release, args=(child.run, ))
533 def text_report(self):
535 d = self._descendants()
536 for child in d.itervalues():
538 txt += '### Summary for {} ({})\n'.format(
539 child.label, child.process['uuid'])
540 txt += child.text_report()
544 def _descendants(self):
545 """Dict of self and all descendants.
547 Nodes with nothing of their own to report (like
548 MultiSummarizers) are omitted.
550 d = collections.OrderedDict()
551 for key, child in self.children.iteritems():
552 if isinstance(child, Summarizer):
554 if isinstance(child, MultiSummarizer):
555 d.update(child._descendants())
558 def html_report(self):
559 return WEBCHART_CLASS(self.label, self._descendants().itervalues()).html()
562 class JobTreeSummarizer(MultiSummarizer):
563 """Summarizes a job and all children listed in its components field."""
564 def __init__(self, job, label=None, **kwargs):
565 arv = arvados.api('v1', model=OrderedJsonModel())
566 label = label or job.get('name', job['uuid'])
567 children = collections.OrderedDict()
568 children[job['uuid']] = JobSummarizer(job, label=label, **kwargs)
569 if job.get('components', None):
571 for j in arv.jobs().index(
572 limit=len(job['components']),
573 filters=[['uuid','in',job['components'].values()]]).execute()['items']:
574 preloaded[j['uuid']] = j
575 for cname in sorted(job['components'].keys()):
576 child_uuid = job['components'][cname]
577 j = (preloaded.get(child_uuid) or
578 arv.jobs().get(uuid=child_uuid).execute())
579 children[child_uuid] = JobTreeSummarizer(job=j, label=cname, **kwargs)
581 super(JobTreeSummarizer, self).__init__(
587 class PipelineSummarizer(MultiSummarizer):
588 def __init__(self, instance, **kwargs):
589 children = collections.OrderedDict()
590 for cname, component in instance['components'].iteritems():
591 if 'job' not in component:
593 "%s: skipping component with no job assigned", cname)
596 "%s: job %s", cname, component['job']['uuid'])
597 summarizer = JobTreeSummarizer(component['job'], label=cname, **kwargs)
598 summarizer.label = '{} {}'.format(
599 cname, component['job']['uuid'])
600 children[cname] = summarizer
601 super(PipelineSummarizer, self).__init__(
603 label=instance['uuid'],
607 class ContainerTreeSummarizer(MultiSummarizer):
608 def __init__(self, root, skip_child_jobs=False, **kwargs):
609 arv = arvados.api('v1', model=OrderedJsonModel())
611 label = kwargs.pop('label', None) or root.get('name') or root['uuid']
614 children = collections.OrderedDict()
615 todo = collections.deque((root, ))
617 current = todo.popleft()
618 label = current['name']
619 sort_key = current['created_at']
620 if current['uuid'].find('-xvhdp-') > 0:
621 current = arv.containers().get(uuid=current['container_uuid']).execute()
623 summer = ContainerSummarizer(current, label=label, **kwargs)
624 summer.sort_key = sort_key
625 children[current['uuid']] = summer
629 child_crs = arv.container_requests().index(
631 filters=page_filters+[
632 ['requesting_container_uuid', '=', current['uuid']]],
634 if not child_crs['items']:
636 elif skip_child_jobs:
637 logger.warning('%s: omitting stats from %d child containers'
638 ' because --skip-child-jobs flag is on',
639 label, child_crs['items_available'])
641 page_filters = [['uuid', '>', child_crs['items'][-1]['uuid']]]
642 for cr in child_crs['items']:
643 if cr['container_uuid']:
644 logger.debug('%s: container req %s', current['uuid'], cr['uuid'])
645 cr['name'] = cr.get('name') or cr['uuid']
647 sorted_children = collections.OrderedDict()
648 for uuid in sorted(children.keys(), key=lambda uuid: children[uuid].sort_key):
649 sorted_children[uuid] = children[uuid]
650 super(ContainerTreeSummarizer, self).__init__(
651 children=sorted_children,