1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
5 from __future__ import print_function
9 import crunchstat_summary.dygraphs
10 import crunchstat_summary.reader
20 from arvados.api import OrderedJsonModel
21 from crunchstat_summary import logger
23 # Recommend memory constraints that are this multiple of an integral
24 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
25 # that have amounts like 7.5 GiB according to the kernel.)
26 AVAILABLE_RAM_RATIO = 0.95
29 # Workaround datetime.datetime.strptime() thread-safety bug by calling
30 # it once before starting threads. https://bugs.python.org/issue7980
31 datetime.datetime.strptime('1999-12-31_23:59:59', '%Y-%m-%d_%H:%M:%S')
34 WEBCHART_CLASS = crunchstat_summary.dygraphs.DygraphsChart
40 self.series = collections.defaultdict(list)
43 class Summarizer(object):
44 def __init__(self, logdata, label=None, skip_child_jobs=False, uuid=None, **kwargs):
45 self._logdata = logdata
50 self.finishtime = None
51 self._skip_child_jobs = skip_child_jobs
53 # stats_max: {category: {stat: val}}
54 self.stats_max = collections.defaultdict(
55 functools.partial(collections.defaultdict, lambda: 0))
56 # task_stats: {task_id: {category: {stat: val}}}
57 self.task_stats = collections.defaultdict(
58 functools.partial(collections.defaultdict, dict))
61 self.tasks = collections.defaultdict(Task)
63 # We won't bother recommending new runtime constraints if the
64 # constraints given when running the job are known to us and
65 # are already suitable. If applicable, the subclass
66 # constructor will overwrite this with something useful.
67 self.existing_constraints = {}
69 logger.debug("%s: logdata %s", self.label, logdata)
72 logger.debug("%s: parsing logdata %s", self.label, self._logdata)
73 self.detected_crunch1 = False
74 for line in self._logdata:
75 if not self.detected_crunch1 and '-8i9sb-' in line:
76 self.detected_crunch1 = True
78 if self.detected_crunch1:
79 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
81 seq = int(m.group('seq'))
82 uuid = m.group('task_uuid')
83 self.seq_to_uuid[seq] = uuid
84 logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
87 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
89 task_id = self.seq_to_uuid[int(m.group('seq'))]
90 elapsed = int(m.group('elapsed'))
91 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
92 if elapsed > self.stats_max['time']['elapsed']:
93 self.stats_max['time']['elapsed'] = elapsed
96 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
98 uuid = m.group('uuid')
99 if self._skip_child_jobs:
100 logger.warning('%s: omitting stats from child job %s'
101 ' because --skip-child-jobs flag is on',
104 logger.debug('%s: follow %s', self.label, uuid)
105 child_summarizer = ProcessSummarizer(uuid)
106 child_summarizer.stats_max = self.stats_max
107 child_summarizer.task_stats = self.task_stats
108 child_summarizer.tasks = self.tasks
109 child_summarizer.starttime = self.starttime
110 child_summarizer.run()
111 logger.debug('%s: done %s', self.label, uuid)
114 m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
119 m = re.search(r'^(?P<timestamp>\S+) (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
123 if self.label is None:
125 self.label = m.group('job_uuid')
127 self.label = 'container'
128 if m.group('category').endswith(':'):
129 # "stderr crunchstat: notice: ..."
131 elif m.group('category') in ('error', 'caught', 'Running'):
133 elif m.group('category') in ['read', 'open', 'cgroup', 'CID']:
134 # "stderr crunchstat: read /proc/1234/net/dev: ..."
135 # (old logs are less careful with unprefixed error messages)
138 if self.detected_crunch1:
139 task_id = self.seq_to_uuid[int(m.group('seq'))]
141 task_id = 'container'
142 task = self.tasks[task_id]
144 # Use the first and last crunchstat timestamps as
145 # approximations of starttime and finishtime.
146 timestamp = m.group('timestamp')
147 if timestamp[10:11] == '_':
148 timestamp = datetime.datetime.strptime(
149 timestamp, '%Y-%m-%d_%H:%M:%S')
150 elif timestamp[10:11] == 'T':
151 timestamp = datetime.datetime.strptime(
152 timestamp[:19], '%Y-%m-%dT%H:%M:%S')
154 raise ValueError("Cannot parse timestamp {!r}".format(
157 if not task.starttime:
158 task.starttime = timestamp
159 logger.debug('%s: task %s starttime %s',
160 self.label, task_id, timestamp)
161 task.finishtime = timestamp
163 if not self.starttime:
164 self.starttime = timestamp
165 self.finishtime = timestamp
167 this_interval_s = None
168 for group in ['current', 'interval']:
169 if not m.group(group):
171 category = m.group('category')
172 words = m.group(group).split(' ')
175 for val, stat in zip(words[::2], words[1::2]):
177 stats[stat] = float(val)
179 stats[stat] = int(val)
180 except ValueError as e:
181 logger.warning('Error parsing {} stat: {!r}'.format(
184 if 'user' in stats or 'sys' in stats:
185 stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
186 if 'tx' in stats or 'rx' in stats:
187 stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
188 for stat, val in stats.iteritems():
189 if group == 'interval':
190 if stat == 'seconds':
191 this_interval_s = val
193 elif not (this_interval_s > 0):
195 "BUG? interval stat given with duration {!r}".
196 format(this_interval_s))
199 stat = stat + '__rate'
200 val = val / this_interval_s
201 if stat in ['user+sys__rate', 'tx+rx__rate']:
202 task.series[category, stat].append(
203 (timestamp - self.starttime, val))
206 task.series[category, stat].append(
207 (timestamp - self.starttime, val))
208 self.task_stats[task_id][category][stat] = val
209 if val > self.stats_max[category][stat]:
210 self.stats_max[category][stat] = val
211 logger.debug('%s: done parsing', self.label)
213 self.job_tot = collections.defaultdict(
214 functools.partial(collections.defaultdict, int))
215 for task_id, task_stat in self.task_stats.iteritems():
216 for category, stat_last in task_stat.iteritems():
217 for stat, val in stat_last.iteritems():
218 if stat in ['cpus', 'cache', 'swap', 'rss']:
219 # meaningless stats like 16 cpu cores x 5 tasks = 80
221 self.job_tot[category][stat] += val
222 logger.debug('%s: done totals', self.label)
224 def long_label(self):
227 label += ' -- elapsed time '
228 s = (self.finishtime - self.starttime).total_seconds()
230 label += '{}d'.format(int(s/86400))
232 label += '{}h'.format(int(s/3600) % 24)
234 label += '{}m'.format(int(s/60) % 60)
235 label += '{}s'.format(int(s) % 60)
238 def text_report(self):
240 return "(no report generated)\n"
241 return "\n".join(itertools.chain(
242 self._text_report_gen(),
243 self._recommend_gen())) + "\n"
245 def html_report(self):
246 return WEBCHART_CLASS(self.label, [self]).html()
248 def _text_report_gen(self):
249 yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
250 for category, stat_max in sorted(self.stats_max.iteritems()):
251 for stat, val in sorted(stat_max.iteritems()):
252 if stat.endswith('__rate'):
254 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
255 val = self._format(val)
256 tot = self._format(self.job_tot[category].get(stat, '-'))
257 yield "\t".join([category, stat, str(val), max_rate, tot])
259 ('Number of tasks: {}',
262 ('Max CPU time spent by a single task: {}s',
263 self.stats_max['cpu']['user+sys'],
265 ('Max CPU usage in a single interval: {}%',
266 self.stats_max['cpu']['user+sys__rate'],
268 ('Overall CPU usage: {}%',
269 self.job_tot['cpu']['user+sys'] /
270 self.job_tot['time']['elapsed']
271 if self.job_tot['time']['elapsed'] > 0 else 0,
273 ('Max memory used by a single task: {}GB',
274 self.stats_max['mem']['rss'],
276 ('Max network traffic in a single task: {}GB',
277 self.stats_max['net:eth0']['tx+rx'] +
278 self.stats_max['net:keep0']['tx+rx'],
280 ('Max network speed in a single interval: {}MB/s',
281 self.stats_max['net:eth0']['tx+rx__rate'] +
282 self.stats_max['net:keep0']['tx+rx__rate'],
284 ('Keep cache miss rate {}%',
285 (float(self.job_tot['keepcache']['miss']) /
286 float(self.job_tot['keepcalls']['get']))
287 if self.job_tot['keepcalls']['get'] > 0 else 0,
288 lambda x: x * 100.0),
289 ('Keep cache utilization {}%',
290 (float(self.job_tot['blkio:0:0']['read']) /
291 float(self.job_tot['net:keep0']['rx']))
292 if self.job_tot['net:keep0']['rx'] > 0 else 0,
293 lambda x: x * 100.0)):
294 format_string, val, transform = args
295 if val == float('-Inf'):
299 yield "# "+format_string.format(self._format(val))
301 def _recommend_gen(self):
302 return itertools.chain(
303 self._recommend_cpu(),
304 self._recommend_ram(),
305 self._recommend_keep_cache())
307 def _recommend_cpu(self):
308 """Recommend asking for 4 cores if max CPU usage was 333%"""
310 constraint_key = self._map_runtime_constraint('vcpus')
311 cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
312 if cpu_max_rate == float('-Inf'):
313 logger.warning('%s: no CPU usage data', self.label)
315 used_cores = max(1, int(math.ceil(cpu_max_rate)))
316 asked_cores = self.existing_constraints.get(constraint_key)
317 if asked_cores is None or used_cores < asked_cores:
319 '#!! {} max CPU usage was {}% -- '
320 'try runtime_constraints "{}":{}'
323 int(math.ceil(cpu_max_rate*100)),
327 def _recommend_ram(self):
328 """Recommend an economical RAM constraint for this job.
330 Nodes that are advertised as "8 gibibytes" actually have what
331 we might call "8 nearlygibs" of memory available for jobs.
332 Here, we calculate a whole number of nearlygibs that would
333 have sufficed to run the job, then recommend requesting a node
334 with that number of nearlygibs (expressed as mebibytes).
336 Requesting a node with "nearly 8 gibibytes" is our best hope
337 of getting a node that actually has nearly 8 gibibytes
338 available. If the node manager is smart enough to account for
339 the discrepancy itself when choosing/creating a node, we'll
340 get an 8 GiB node with nearly 8 GiB available. Otherwise, the
341 advertised size of the next-size-smaller node (say, 6 GiB)
342 will be too low to satisfy our request, so we will effectively
343 get rounded up to 8 GiB.
345 For example, if we need 7500 MiB, we can ask for 7500 MiB, and
346 we will generally get a node that is advertised as "8 GiB" and
347 has at least 7500 MiB available. However, asking for 8192 MiB
348 would either result in an unnecessarily expensive 12 GiB node
349 (if node manager knows about the discrepancy), or an 8 GiB
350 node which has less than 8192 MiB available and is therefore
351 considered by crunch-dispatch to be too small to meet our
354 When node manager learns how to predict the available memory
355 for each node type such that crunch-dispatch always agrees
356 that a node is big enough to run the job it was brought up
357 for, all this will be unnecessary. We'll just ask for exactly
358 the memory we want -- even if that happens to be 8192 MiB.
361 constraint_key = self._map_runtime_constraint('ram')
362 used_bytes = self.stats_max['mem']['rss']
363 if used_bytes == float('-Inf'):
364 logger.warning('%s: no memory usage data', self.label)
366 used_mib = math.ceil(float(used_bytes) / 1048576)
367 asked_mib = self.existing_constraints.get(constraint_key)
369 nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
370 if asked_mib is None or (
371 math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib)):
373 '#!! {} max RSS was {} MiB -- '
374 'try runtime_constraints "{}":{}'
379 int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024*(2**20)/self._runtime_constraint_mem_unit()))
381 def _recommend_keep_cache(self):
382 """Recommend increasing keep cache if utilization < 80%"""
383 constraint_key = self._map_runtime_constraint('keep_cache_ram')
384 if self.job_tot['net:keep0']['rx'] == 0:
386 utilization = (float(self.job_tot['blkio:0:0']['read']) /
387 float(self.job_tot['net:keep0']['rx']))
388 asked_mib = self.existing_constraints.get(constraint_key, 256)
390 if utilization < 0.8:
392 '#!! {} Keep cache utilization was {:.2f}% -- '
393 'try runtime_constraints "{}":{} (or more)'
398 asked_mib*2*(2**20)/self._runtime_constraint_mem_unit())
401 def _format(self, val):
402 """Return a string representation of a stat.
404 {:.2f} for floats, default format for everything else."""
405 if isinstance(val, float):
406 return '{:.2f}'.format(val)
408 return '{}'.format(val)
410 def _runtime_constraint_mem_unit(self):
411 if hasattr(self, 'runtime_constraint_mem_unit'):
412 return self.runtime_constraint_mem_unit
413 elif self.detected_crunch1:
414 return JobSummarizer.runtime_constraint_mem_unit
416 return ContainerSummarizer.runtime_constraint_mem_unit
418 def _map_runtime_constraint(self, key):
419 if hasattr(self, 'map_runtime_constraint'):
420 return self.map_runtime_constraint[key]
421 elif self.detected_crunch1:
422 return JobSummarizer.map_runtime_constraint[key]
427 class CollectionSummarizer(Summarizer):
428 def __init__(self, collection_id, **kwargs):
429 super(CollectionSummarizer, self).__init__(
430 crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
431 self.label = collection_id
434 def NewSummarizer(process_or_uuid, **kwargs):
435 """Construct with the appropriate subclass for this uuid/object."""
437 if isinstance(process_or_uuid, dict):
438 process = process_or_uuid
439 uuid = process['uuid']
441 uuid = process_or_uuid
443 arv = arvados.api('v1', model=OrderedJsonModel())
445 if '-dz642-' in uuid:
447 process = arv.containers().get(uuid=uuid).execute()
448 klass = ContainerTreeSummarizer
449 elif '-xvhdp-' in uuid:
451 process = arv.container_requests().get(uuid=uuid).execute()
452 klass = ContainerTreeSummarizer
453 elif '-8i9sb-' in uuid:
455 process = arv.jobs().get(uuid=uuid).execute()
456 klass = JobSummarizer
457 elif '-d1hrv-' in uuid:
459 process = arv.pipeline_instances().get(uuid=uuid).execute()
460 klass = PipelineSummarizer
461 elif '-4zz18-' in uuid:
462 return CollectionSummarizer(collection_id=uuid)
464 raise ArgumentError("Unrecognized uuid %s", uuid)
465 return klass(process, uuid=uuid, **kwargs)
468 class ProcessSummarizer(Summarizer):
469 """Process is a job, pipeline, container, or container request."""
471 def __init__(self, process, label=None, **kwargs):
473 self.process = process
475 label = self.process.get('name', self.process['uuid'])
476 if self.process.get('log'):
478 rdr = crunchstat_summary.reader.CollectionReader(self.process['log'])
479 except arvados.errors.NotFoundError as e:
480 logger.warning("Trying event logs after failing to read "
481 "log collection %s: %s", self.process['log'], e)
483 rdr = crunchstat_summary.reader.LiveLogReader(self.process['uuid'])
484 label = label + ' (partial)'
485 super(ProcessSummarizer, self).__init__(rdr, label=label, **kwargs)
486 self.existing_constraints = self.process.get('runtime_constraints', {})
489 class JobSummarizer(ProcessSummarizer):
490 runtime_constraint_mem_unit = 1048576
491 map_runtime_constraint = {
492 'keep_cache_ram': 'keep_cache_mb_per_task',
493 'ram': 'min_ram_mb_per_node',
494 'vcpus': 'min_cores_per_node',
498 class ContainerSummarizer(ProcessSummarizer):
499 runtime_constraint_mem_unit = 1
502 class MultiSummarizer(object):
503 def __init__(self, children={}, label=None, threads=1, **kwargs):
504 self.throttle = threading.Semaphore(threads)
505 self.children = children
508 def run_and_release(self, target, *args, **kwargs):
510 return target(*args, **kwargs)
512 self.throttle.release()
516 for child in self.children.itervalues():
517 self.throttle.acquire()
518 t = threading.Thread(target=self.run_and_release, args=(child.run, ))
525 def text_report(self):
527 for cname, child in self.children.iteritems():
528 if len(self.children) > 1:
529 txt += '### Summary for {} ({})\n'.format(
530 cname, child.process['uuid'])
531 txt += child.text_report()
535 def html_report(self):
536 return WEBCHART_CLASS(self.label, self.children.itervalues()).html()
539 class PipelineSummarizer(MultiSummarizer):
540 def __init__(self, instance, **kwargs):
541 children = collections.OrderedDict()
542 for cname, component in instance['components'].iteritems():
543 if 'job' not in component:
545 "%s: skipping component with no job assigned", cname)
548 "%s: job %s", cname, component['job']['uuid'])
549 summarizer = JobSummarizer(component['job'], **kwargs)
550 summarizer.label = '{} {}'.format(
551 cname, component['job']['uuid'])
552 children[cname] = summarizer
553 super(PipelineSummarizer, self).__init__(
555 label=instance['uuid'],
559 class ContainerTreeSummarizer(MultiSummarizer):
560 def __init__(self, root, **kwargs):
561 arv = arvados.api('v1', model=OrderedJsonModel())
563 label = kwargs.pop('label', None) or root.get('name') or root['uuid']
566 children = collections.OrderedDict()
567 todo = collections.deque((root, ))
569 current = todo.popleft()
570 label = current['name']
571 sort_key = current['created_at']
572 if current['uuid'].find('-xvhdp-') > 0:
573 current = arv.containers().get(uuid=current['container_uuid']).execute()
575 summer = ContainerSummarizer(current, label=label, **kwargs)
576 summer.sort_key = sort_key
577 children[current['uuid']] = summer
581 items = arv.container_requests().index(
583 filters=page_filters+[
584 ['requesting_container_uuid', '=', current['uuid']]],
588 page_filters = [['uuid', '>', items[-1]['uuid']]]
590 if cr['container_uuid']:
591 logger.debug('%s: container req %s', current['uuid'], cr['uuid'])
592 cr['name'] = cr.get('name') or cr['uuid']
594 sorted_children = collections.OrderedDict()
595 for uuid in sorted(children.keys(), key=lambda uuid: children[uuid].sort_key):
596 sorted_children[uuid] = children[uuid]
597 super(ContainerTreeSummarizer, self).__init__(
598 children=sorted_children,