1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
5 from __future__ import print_function
9 import crunchstat_summary.dygraphs
10 import crunchstat_summary.reader
20 from arvados.api import OrderedJsonModel
21 from crunchstat_summary import logger
23 # Recommend memory constraints that are this multiple of an integral
24 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
25 # that have amounts like 7.5 GiB according to the kernel.)
26 AVAILABLE_RAM_RATIO = 0.95
29 # Workaround datetime.datetime.strptime() thread-safety bug by calling
30 # it once before starting threads. https://bugs.python.org/issue7980
31 datetime.datetime.strptime('1999-12-31_23:59:59', '%Y-%m-%d_%H:%M:%S')
34 WEBCHART_CLASS = crunchstat_summary.dygraphs.DygraphsChart
40 self.series = collections.defaultdict(list)
43 class Summarizer(object):
44 def __init__(self, logdata, label=None, skip_child_jobs=False, uuid=None, **kwargs):
45 self._logdata = logdata
50 self.finishtime = None
51 self._skip_child_jobs = skip_child_jobs
53 # stats_max: {category: {stat: val}}
54 self.stats_max = collections.defaultdict(
55 functools.partial(collections.defaultdict, lambda: 0))
56 # task_stats: {task_id: {category: {stat: val}}}
57 self.task_stats = collections.defaultdict(
58 functools.partial(collections.defaultdict, dict))
61 self.tasks = collections.defaultdict(Task)
63 # We won't bother recommending new runtime constraints if the
64 # constraints given when running the job are known to us and
65 # are already suitable. If applicable, the subclass
66 # constructor will overwrite this with something useful.
67 self.existing_constraints = {}
69 logger.debug("%s: logdata %s", self.label, logdata)
72 logger.debug("%s: parsing logdata %s", self.label, self._logdata)
73 detected_crunch1 = False
74 for line in self._logdata:
75 if not detected_crunch1 and '-8i9sb-' in line:
76 detected_crunch1 = True
79 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
81 seq = int(m.group('seq'))
82 uuid = m.group('task_uuid')
83 self.seq_to_uuid[seq] = uuid
84 logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
87 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
89 task_id = self.seq_to_uuid[int(m.group('seq'))]
90 elapsed = int(m.group('elapsed'))
91 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
92 if elapsed > self.stats_max['time']['elapsed']:
93 self.stats_max['time']['elapsed'] = elapsed
96 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
98 uuid = m.group('uuid')
99 if self._skip_child_jobs:
100 logger.warning('%s: omitting stats from child job %s'
101 ' because --skip-child-jobs flag is on',
104 logger.debug('%s: follow %s', self.label, uuid)
105 child_summarizer = ProcessSummarizer(uuid)
106 child_summarizer.stats_max = self.stats_max
107 child_summarizer.task_stats = self.task_stats
108 child_summarizer.tasks = self.tasks
109 child_summarizer.starttime = self.starttime
110 child_summarizer.run()
111 logger.debug('%s: done %s', self.label, uuid)
114 m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
119 m = re.search(r'^(?P<timestamp>\S+) (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
123 if self.label is None:
125 self.label = m.group('job_uuid')
127 self.label = 'container'
128 if m.group('category').endswith(':'):
129 # "stderr crunchstat: notice: ..."
131 elif m.group('category') in ('error', 'caught'):
133 elif m.group('category') in ['read', 'open', 'cgroup', 'CID']:
134 # "stderr crunchstat: read /proc/1234/net/dev: ..."
135 # (old logs are less careful with unprefixed error messages)
139 task_id = self.seq_to_uuid[int(m.group('seq'))]
141 task_id = 'container'
142 task = self.tasks[task_id]
144 # Use the first and last crunchstat timestamps as
145 # approximations of starttime and finishtime.
146 timestamp = m.group('timestamp')
147 if timestamp[10:11] == '_':
148 timestamp = datetime.datetime.strptime(
149 timestamp, '%Y-%m-%d_%H:%M:%S')
150 elif timestamp[10:11] == 'T':
151 timestamp = datetime.datetime.strptime(
152 timestamp[:19], '%Y-%m-%dT%H:%M:%S')
154 raise ValueError("Cannot parse timestamp {!r}".format(
157 if not task.starttime:
158 task.starttime = timestamp
159 logger.debug('%s: task %s starttime %s',
160 self.label, task_id, timestamp)
161 task.finishtime = timestamp
163 if not self.starttime:
164 self.starttime = timestamp
165 self.finishtime = timestamp
167 this_interval_s = None
168 for group in ['current', 'interval']:
169 if not m.group(group):
171 category = m.group('category')
172 words = m.group(group).split(' ')
175 for val, stat in zip(words[::2], words[1::2]):
177 stats[stat] = float(val)
179 stats[stat] = int(val)
180 except ValueError as e:
181 logger.warning('Error parsing {} stat: {!r}'.format(
184 if 'user' in stats or 'sys' in stats:
185 stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
186 if 'tx' in stats or 'rx' in stats:
187 stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
188 for stat, val in stats.iteritems():
189 if group == 'interval':
190 if stat == 'seconds':
191 this_interval_s = val
193 elif not (this_interval_s > 0):
195 "BUG? interval stat given with duration {!r}".
196 format(this_interval_s))
199 stat = stat + '__rate'
200 val = val / this_interval_s
201 if stat in ['user+sys__rate', 'tx+rx__rate']:
202 task.series[category, stat].append(
203 (timestamp - self.starttime, val))
206 task.series[category, stat].append(
207 (timestamp - self.starttime, val))
208 self.task_stats[task_id][category][stat] = val
209 if val > self.stats_max[category][stat]:
210 self.stats_max[category][stat] = val
211 logger.debug('%s: done parsing', self.label)
213 self.job_tot = collections.defaultdict(
214 functools.partial(collections.defaultdict, int))
215 for task_id, task_stat in self.task_stats.iteritems():
216 for category, stat_last in task_stat.iteritems():
217 for stat, val in stat_last.iteritems():
218 if stat in ['cpus', 'cache', 'swap', 'rss']:
219 # meaningless stats like 16 cpu cores x 5 tasks = 80
221 self.job_tot[category][stat] += val
222 logger.debug('%s: done totals', self.label)
224 def long_label(self):
227 label += ' -- elapsed time '
228 s = (self.finishtime - self.starttime).total_seconds()
230 label += '{}d'.format(int(s/86400))
232 label += '{}h'.format(int(s/3600) % 24)
234 label += '{}m'.format(int(s/60) % 60)
235 label += '{}s'.format(int(s) % 60)
238 def text_report(self):
240 return "(no report generated)\n"
241 return "\n".join(itertools.chain(
242 self._text_report_gen(),
243 self._recommend_gen())) + "\n"
245 def html_report(self):
246 return WEBCHART_CLASS(self.label, [self]).html()
248 def _text_report_gen(self):
249 yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
250 for category, stat_max in sorted(self.stats_max.iteritems()):
251 for stat, val in sorted(stat_max.iteritems()):
252 if stat.endswith('__rate'):
254 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
255 val = self._format(val)
256 tot = self._format(self.job_tot[category].get(stat, '-'))
257 yield "\t".join([category, stat, str(val), max_rate, tot])
259 ('Number of tasks: {}',
262 ('Max CPU time spent by a single task: {}s',
263 self.stats_max['cpu']['user+sys'],
265 ('Max CPU usage in a single interval: {}%',
266 self.stats_max['cpu']['user+sys__rate'],
268 ('Overall CPU usage: {}%',
269 self.job_tot['cpu']['user+sys'] /
270 self.job_tot['time']['elapsed']
271 if self.job_tot['time']['elapsed'] > 0 else 0,
273 ('Max memory used by a single task: {}GB',
274 self.stats_max['mem']['rss'],
276 ('Max network traffic in a single task: {}GB',
277 self.stats_max['net:eth0']['tx+rx'] +
278 self.stats_max['net:keep0']['tx+rx'],
280 ('Max network speed in a single interval: {}MB/s',
281 self.stats_max['net:eth0']['tx+rx__rate'] +
282 self.stats_max['net:keep0']['tx+rx__rate'],
284 ('Keep cache miss rate {}%',
285 (float(self.job_tot['keepcache']['miss']) /
286 float(self.job_tot['keepcalls']['get']))
287 if self.job_tot['keepcalls']['get'] > 0 else 0,
288 lambda x: x * 100.0),
289 ('Keep cache utilization {}%',
290 (float(self.job_tot['blkio:0:0']['read']) /
291 float(self.job_tot['net:keep0']['rx']))
292 if self.job_tot['net:keep0']['rx'] > 0 else 0,
293 lambda x: x * 100.0)):
294 format_string, val, transform = args
295 if val == float('-Inf'):
299 yield "# "+format_string.format(self._format(val))
301 def _recommend_gen(self):
302 return itertools.chain(
303 self._recommend_cpu(),
304 self._recommend_ram(),
305 self._recommend_keep_cache())
307 def _recommend_cpu(self):
308 """Recommend asking for 4 cores if max CPU usage was 333%"""
310 cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
311 if cpu_max_rate == float('-Inf'):
312 logger.warning('%s: no CPU usage data', self.label)
314 used_cores = max(1, int(math.ceil(cpu_max_rate)))
315 asked_cores = self.existing_constraints.get('min_cores_per_node')
316 if asked_cores is None or used_cores < asked_cores:
318 '#!! {} max CPU usage was {}% -- '
319 'try runtime_constraints "min_cores_per_node":{}'
322 int(math.ceil(cpu_max_rate*100)),
325 def _recommend_ram(self):
326 """Recommend an economical RAM constraint for this job.
328 Nodes that are advertised as "8 gibibytes" actually have what
329 we might call "8 nearlygibs" of memory available for jobs.
330 Here, we calculate a whole number of nearlygibs that would
331 have sufficed to run the job, then recommend requesting a node
332 with that number of nearlygibs (expressed as mebibytes).
334 Requesting a node with "nearly 8 gibibytes" is our best hope
335 of getting a node that actually has nearly 8 gibibytes
336 available. If the node manager is smart enough to account for
337 the discrepancy itself when choosing/creating a node, we'll
338 get an 8 GiB node with nearly 8 GiB available. Otherwise, the
339 advertised size of the next-size-smaller node (say, 6 GiB)
340 will be too low to satisfy our request, so we will effectively
341 get rounded up to 8 GiB.
343 For example, if we need 7500 MiB, we can ask for 7500 MiB, and
344 we will generally get a node that is advertised as "8 GiB" and
345 has at least 7500 MiB available. However, asking for 8192 MiB
346 would either result in an unnecessarily expensive 12 GiB node
347 (if node manager knows about the discrepancy), or an 8 GiB
348 node which has less than 8192 MiB available and is therefore
349 considered by crunch-dispatch to be too small to meet our
352 When node manager learns how to predict the available memory
353 for each node type such that crunch-dispatch always agrees
354 that a node is big enough to run the job it was brought up
355 for, all this will be unnecessary. We'll just ask for exactly
356 the memory we want -- even if that happens to be 8192 MiB.
359 used_bytes = self.stats_max['mem']['rss']
360 if used_bytes == float('-Inf'):
361 logger.warning('%s: no memory usage data', self.label)
363 used_mib = math.ceil(float(used_bytes) / 1048576)
364 asked_mib = self.existing_constraints.get('min_ram_mb_per_node')
366 nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
367 if asked_mib is None or (
368 math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib)):
370 '#!! {} max RSS was {} MiB -- '
371 'try runtime_constraints "min_ram_mb_per_node":{}'
375 int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024))
377 def _recommend_keep_cache(self):
378 """Recommend increasing keep cache if utilization < 80%"""
379 if self.job_tot['net:keep0']['rx'] == 0:
381 utilization = (float(self.job_tot['blkio:0:0']['read']) /
382 float(self.job_tot['net:keep0']['rx']))
383 asked_mib = self.existing_constraints.get('keep_cache_mb_per_task', 256)
385 if utilization < 0.8:
387 '#!! {} Keep cache utilization was {:.2f}% -- '
388 'try runtime_constraints "keep_cache_mb_per_task":{} (or more)'
395 def _format(self, val):
396 """Return a string representation of a stat.
398 {:.2f} for floats, default format for everything else."""
399 if isinstance(val, float):
400 return '{:.2f}'.format(val)
402 return '{}'.format(val)
405 class CollectionSummarizer(Summarizer):
406 def __init__(self, collection_id, **kwargs):
407 super(CollectionSummarizer, self).__init__(
408 crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
409 self.label = collection_id
412 def NewSummarizer(process, **kwargs):
413 """Construct with the appropriate subclass for this uuid/object."""
415 if not isinstance(process, dict):
418 arv = arvados.api('v1', model=OrderedJsonModel())
420 if '-dz642-' in uuid:
422 process = arv.containers().get(uuid=uuid).execute()
423 klass = ContainerTreeSummarizer
424 elif '-xvhdp-' in uuid:
426 process = arv.container_requests().get(uuid=uuid).execute()
427 klass = ContainerTreeSummarizer
428 elif '-8i9sb-' in uuid:
430 process = arv.jobs().get(uuid=uuid).execute()
431 klass = JobSummarizer
432 elif '-d1hrv-' in uuid:
434 process = arv.pipeline_instances().get(uuid=uuid).execute()
435 klass = PipelineSummarizer
436 elif '-4zz18-' in uuid:
437 return CollectionSummarizer(collection_id=uuid)
439 raise ArgumentError("Unrecognized uuid %s", uuid)
440 return klass(process, uuid=uuid, **kwargs)
443 class ProcessSummarizer(Summarizer):
444 """Process is a job, pipeline, container, or container request."""
446 def __init__(self, process, label=None, **kwargs):
448 self.process = process
450 label = self.process.get('name', self.process['uuid'])
451 if self.process.get('log'):
453 rdr = crunchstat_summary.reader.CollectionReader(self.process['log'])
454 except arvados.errors.NotFoundError as e:
455 logger.warning("Trying event logs after failing to read "
456 "log collection %s: %s", self.process['log'], e)
458 rdr = crunchstat_summary.reader.LiveLogReader(self.process['uuid'])
459 label = label + ' (partial)'
460 super(ProcessSummarizer, self).__init__(rdr, label=label, **kwargs)
461 self.existing_constraints = self.process.get('runtime_constraints', {})
464 class JobSummarizer(ProcessSummarizer):
468 class ContainerSummarizer(ProcessSummarizer):
472 class MultiSummarizer(object):
473 def __init__(self, children={}, label=None, threads=1, **kwargs):
474 self.throttle = threading.Semaphore(threads)
475 self.children = children
478 def run_and_release(self, target, *args, **kwargs):
480 return target(*args, **kwargs)
482 self.throttle.release()
486 for child in self.children.itervalues():
487 self.throttle.acquire()
488 t = threading.Thread(target=self.run_and_release, args=(child.run, ))
495 def text_report(self):
497 for cname, child in self.children.iteritems():
498 if len(self.children) > 1:
499 txt += '### Summary for {} ({})\n'.format(
500 cname, child.process['uuid'])
501 txt += child.text_report()
505 def html_report(self):
506 return WEBCHART_CLASS(self.label, self.children.itervalues()).html()
509 class PipelineSummarizer(MultiSummarizer):
510 def __init__(self, instance, **kwargs):
511 children = collections.OrderedDict()
512 for cname, component in instance['components'].iteritems():
513 if 'job' not in component:
515 "%s: skipping component with no job assigned", cname)
518 "%s: job %s", cname, component['job']['uuid'])
519 summarizer = JobSummarizer(component['job'], **kwargs)
520 summarizer.label = '{} {}'.format(
521 cname, component['job']['uuid'])
522 children[cname] = summarizer
523 super(PipelineSummarizer, self).__init__(
525 label=instance['uuid'],
529 class ContainerTreeSummarizer(MultiSummarizer):
530 def __init__(self, root, **kwargs):
531 arv = arvados.api('v1', model=OrderedJsonModel())
533 label = kwargs.pop('label', None) or root.get('name') or root['uuid']
536 children = collections.OrderedDict()
537 todo = collections.deque((root, ))
539 current = todo.popleft()
540 label = current['name']
541 if current['uuid'].find('-xvhdp-') > 0:
542 current = arv.containers().get(uuid=current['container_uuid']).execute()
543 children[current['uuid']] = ContainerSummarizer(
544 current, label=label, **kwargs)
547 items = arv.container_requests().index(
549 filters=page_filters+[
550 ['requesting_container_uuid', '=', current['uuid']]],
554 page_filters = [['uuid', '>', items[-1]['uuid']]]
556 if cr['container_uuid']:
557 logger.debug('%s: container req %s', current['uuid'], cr['uuid'])
558 cr['name'] = cr.get('name') or cr['uuid']
560 super(ContainerTreeSummarizer, self).__init__(