1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
7 import crunchstat_summary.dygraphs
8 import crunchstat_summary.reader
18 from arvados.api import OrderedJsonModel
19 from crunchstat_summary import logger
21 # Recommend memory constraints that are this multiple of an integral
22 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
23 # that have amounts like 7.5 GiB according to the kernel.)
24 AVAILABLE_RAM_RATIO = 0.95
27 # Workaround datetime.datetime.strptime() thread-safety bug by calling
28 # it once before starting threads. https://bugs.python.org/issue7980
29 datetime.datetime.strptime('1999-12-31_23:59:59', '%Y-%m-%d_%H:%M:%S')
32 WEBCHART_CLASS = crunchstat_summary.dygraphs.DygraphsChart
38 self.finishtime = None
39 self.series = collections.defaultdict(list)
42 class Summarizer(object):
43 def __init__(self, logdata, label=None, skip_child_jobs=False, uuid=None, **kwargs):
44 self._logdata = logdata
49 self.finishtime = None
50 self._skip_child_jobs = skip_child_jobs
52 # stats_max: {category: {stat: val}}
53 self.stats_max = collections.defaultdict(
54 functools.partial(collections.defaultdict, lambda: 0))
55 # task_stats: {task_id: {category: {stat: val}}}
56 self.task_stats = collections.defaultdict(
57 functools.partial(collections.defaultdict, dict))
60 self.tasks = collections.defaultdict(Task)
62 # We won't bother recommending new runtime constraints if the
63 # constraints given when running the job are known to us and
64 # are already suitable. If applicable, the subclass
65 # constructor will overwrite this with something useful.
66 self.existing_constraints = {}
68 logger.debug("%s: logdata %s", self.label, logdata)
71 logger.debug("%s: parsing logdata %s", self.label, self._logdata)
72 with self._logdata as logdata:
75 def _run(self, logdata):
76 self.detected_crunch1 = False
78 if not self.detected_crunch1 and '-8i9sb-' in line:
79 self.detected_crunch1 = True
81 if self.detected_crunch1:
82 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
84 seq = int(m.group('seq'))
85 uuid = m.group('task_uuid')
86 self.seq_to_uuid[seq] = uuid
87 logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
90 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
92 task_id = self.seq_to_uuid[int(m.group('seq'))]
93 elapsed = int(m.group('elapsed'))
94 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
95 if elapsed > self.stats_max['time']['elapsed']:
96 self.stats_max['time']['elapsed'] = elapsed
99 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
101 uuid = m.group('uuid')
102 if self._skip_child_jobs:
103 logger.warning('%s: omitting stats from child job %s'
104 ' because --skip-child-jobs flag is on',
107 logger.debug('%s: follow %s', self.label, uuid)
108 child_summarizer = ProcessSummarizer(uuid)
109 child_summarizer.stats_max = self.stats_max
110 child_summarizer.task_stats = self.task_stats
111 child_summarizer.tasks = self.tasks
112 child_summarizer.starttime = self.starttime
113 child_summarizer.run()
114 logger.debug('%s: done %s', self.label, uuid)
117 # 2017-12-02_17:15:08 e51c5-8i9sb-mfp68stkxnqdd6m 63676 0 stderr crunchstat: keepcalls 0 put 2576 get -- interval 10.0000 seconds 0 put 2576 get
118 m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr (?P<crunchstat>crunchstat: )(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
123 # 2017-12-01T16:56:24.723509200Z crunchstat: keepcalls 0 put 3 get -- interval 10.0000 seconds 0 put 3 get
124 m = re.search(r'^(?P<timestamp>\S+) (?P<crunchstat>crunchstat: )?(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
128 if self.label is None:
130 self.label = m.group('job_uuid')
132 self.label = 'label #1'
133 category = m.group('category')
134 if category.endswith(':'):
135 # "stderr crunchstat: notice: ..."
137 elif category in ('error', 'caught'):
139 elif category in ('read', 'open', 'cgroup', 'CID', 'Running'):
140 # "stderr crunchstat: read /proc/1234/net/dev: ..."
141 # (old logs are less careful with unprefixed error messages)
144 if self.detected_crunch1:
145 task_id = self.seq_to_uuid[int(m.group('seq'))]
147 task_id = 'container'
148 task = self.tasks[task_id]
150 # Use the first and last crunchstat timestamps as
151 # approximations of starttime and finishtime.
152 timestamp = m.group('timestamp')
153 if timestamp[10:11] == '_':
154 timestamp = datetime.datetime.strptime(
155 timestamp, '%Y-%m-%d_%H:%M:%S')
156 elif timestamp[10:11] == 'T':
157 timestamp = datetime.datetime.strptime(
158 timestamp[:19], '%Y-%m-%dT%H:%M:%S')
160 raise ValueError("Cannot parse timestamp {!r}".format(
163 if task.starttime is None:
164 logger.debug('%s: task %s starttime %s',
165 self.label, task_id, timestamp)
166 if task.starttime is None or timestamp < task.starttime:
167 task.starttime = timestamp
168 if task.finishtime is None or timestamp > task.finishtime:
169 task.finishtime = timestamp
171 if self.starttime is None or timestamp < self.starttime:
172 self.starttime = timestamp
173 if self.finishtime is None or timestamp > self.finishtime:
174 self.finishtime = timestamp
176 if (not self.detected_crunch1) and task.starttime is not None and task.finishtime is not None:
177 elapsed = (task.finishtime - task.starttime).seconds
178 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
179 if elapsed > self.stats_max['time']['elapsed']:
180 self.stats_max['time']['elapsed'] = elapsed
182 this_interval_s = None
183 for group in ['current', 'interval']:
184 if not m.group(group):
186 category = m.group('category')
187 words = m.group(group).split(' ')
190 for val, stat in zip(words[::2], words[1::2]):
192 stats[stat] = float(val)
194 stats[stat] = int(val)
195 except ValueError as e:
196 # If the line doesn't start with 'crunchstat:' we
197 # might have mistaken an error message for a
198 # structured crunchstat line.
199 if m.group("crunchstat") is None or m.group("category") == "crunchstat":
200 logger.warning("%s: log contains message\n %s", self.label, line)
203 '%s: Error parsing value %r (stat %r, category %r): %r',
204 self.label, val, stat, category, e)
205 logger.warning('%s', line)
207 if 'user' in stats or 'sys' in stats:
208 stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
209 if 'tx' in stats or 'rx' in stats:
210 stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
211 if group == 'interval':
212 if 'seconds' in stats:
213 this_interval_s = stats.get('seconds',0)
215 if this_interval_s <= 0:
217 "BUG? interval stat given with duration {!r}".
218 format(this_interval_s))
220 logger.error('BUG? interval stat missing duration')
221 for stat, val in stats.items():
222 if group == 'interval' and this_interval_s:
223 stat = stat + '__rate'
224 val = val / this_interval_s
225 if stat in ['user+sys__rate', 'user__rate', 'sys__rate', 'tx+rx__rate', 'rx__rate', 'tx__rate']:
226 task.series[category, stat].append(
227 (timestamp - self.starttime, val))
229 if stat in ['rss','used','total']:
230 task.series[category, stat].append(
231 (timestamp - self.starttime, val))
232 self.task_stats[task_id][category][stat] = val
233 if val > self.stats_max[category][stat]:
234 self.stats_max[category][stat] = val
235 logger.debug('%s: done parsing', self.label)
237 self.job_tot = collections.defaultdict(
238 functools.partial(collections.defaultdict, int))
239 for task_id, task_stat in self.task_stats.items():
240 for category, stat_last in task_stat.items():
241 for stat, val in stat_last.items():
242 if stat in ['cpus', 'cache', 'swap', 'rss']:
243 # meaningless stats like 16 cpu cores x 5 tasks = 80
245 self.job_tot[category][stat] += val
246 logger.debug('%s: done totals', self.label)
248 def long_label(self):
250 if hasattr(self, 'process') and self.process['uuid'] not in label:
251 label = '{} ({})'.format(label, self.process['uuid'])
253 label += ' -- elapsed time '
254 s = (self.finishtime - self.starttime).total_seconds()
256 label += '{}d'.format(int(s/86400))
258 label += '{}h'.format(int(s/3600) % 24)
260 label += '{}m'.format(int(s/60) % 60)
261 label += '{}s'.format(int(s) % 60)
264 def text_report(self):
266 return "(no report generated)\n"
267 return "\n".join(itertools.chain(
268 self._text_report_gen(),
269 self._recommend_gen())) + "\n"
271 def html_report(self):
272 return WEBCHART_CLASS(self.label, [self]).html()
274 def _text_report_gen(self):
275 yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
276 for category, stat_max in sorted(self.stats_max.items()):
277 for stat, val in sorted(stat_max.items()):
278 if stat.endswith('__rate'):
280 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
281 val = self._format(val)
282 tot = self._format(self.job_tot[category].get(stat, '-'))
283 yield "\t".join([category, stat, str(val), max_rate, tot])
285 ('Number of tasks: {}',
288 ('Max CPU time spent by a single task: {}s',
289 self.stats_max['cpu']['user+sys'],
291 ('Max CPU usage in a single interval: {}%',
292 self.stats_max['cpu']['user+sys__rate'],
294 ('Overall CPU usage: {}%',
295 float(self.job_tot['cpu']['user+sys']) /
296 self.job_tot['time']['elapsed']
297 if self.job_tot['time']['elapsed'] > 0 else 0,
299 ('Max memory used by a single task: {}GB',
300 self.stats_max['mem']['rss'],
302 ('Max network traffic in a single task: {}GB',
303 self.stats_max['net:eth0']['tx+rx'] +
304 self.stats_max['net:keep0']['tx+rx'],
306 ('Max network speed in a single interval: {}MB/s',
307 self.stats_max['net:eth0']['tx+rx__rate'] +
308 self.stats_max['net:keep0']['tx+rx__rate'],
310 ('Keep cache miss rate {}%',
311 (float(self.job_tot['keepcache']['miss']) /
312 float(self.job_tot['keepcalls']['get']))
313 if self.job_tot['keepcalls']['get'] > 0 else 0,
314 lambda x: x * 100.0),
315 ('Keep cache utilization {}%',
316 (float(self.job_tot['blkio:0:0']['read']) /
317 float(self.job_tot['net:keep0']['rx']))
318 if self.job_tot['net:keep0']['rx'] > 0 else 0,
319 lambda x: x * 100.0),
320 ('Temp disk utilization {}%',
321 (float(self.job_tot['statfs']['used']) /
322 float(self.job_tot['statfs']['total']))
323 if self.job_tot['statfs']['total'] > 0 else 0,
324 lambda x: x * 100.0),
326 format_string, val, transform = args
327 if val == float('-Inf'):
331 yield "# "+format_string.format(self._format(val))
333 def _recommend_gen(self):
334 # TODO recommend fixing job granularity if elapsed time is too short
335 return itertools.chain(
336 self._recommend_cpu(),
337 self._recommend_ram(),
338 self._recommend_keep_cache(),
339 self._recommend_temp_disk(),
342 def _recommend_cpu(self):
343 """Recommend asking for 4 cores if max CPU usage was 333%"""
345 constraint_key = self._map_runtime_constraint('vcpus')
346 cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
347 if cpu_max_rate == float('-Inf') or cpu_max_rate == 0.0:
348 logger.warning('%s: no CPU usage data', self.label)
350 # TODO Don't necessarily want to recommend on isolated max peak
351 # take average CPU usage into account as well or % time at max
352 used_cores = max(1, int(math.ceil(cpu_max_rate)))
353 asked_cores = self.existing_constraints.get(constraint_key)
354 if asked_cores is None:
356 # TODO: This should be more nuanced in cases where max >> avg
357 if used_cores < asked_cores:
359 '#!! {} max CPU usage was {}% -- '
360 'try reducing runtime_constraints to "{}":{}'
363 math.ceil(cpu_max_rate*100),
367 # FIXME: This needs to be updated to account for current nodemanager algorithms
368 def _recommend_ram(self):
369 """Recommend an economical RAM constraint for this job.
371 Nodes that are advertised as "8 gibibytes" actually have what
372 we might call "8 nearlygibs" of memory available for jobs.
373 Here, we calculate a whole number of nearlygibs that would
374 have sufficed to run the job, then recommend requesting a node
375 with that number of nearlygibs (expressed as mebibytes).
377 Requesting a node with "nearly 8 gibibytes" is our best hope
378 of getting a node that actually has nearly 8 gibibytes
379 available. If the node manager is smart enough to account for
380 the discrepancy itself when choosing/creating a node, we'll
381 get an 8 GiB node with nearly 8 GiB available. Otherwise, the
382 advertised size of the next-size-smaller node (say, 6 GiB)
383 will be too low to satisfy our request, so we will effectively
384 get rounded up to 8 GiB.
386 For example, if we need 7500 MiB, we can ask for 7500 MiB, and
387 we will generally get a node that is advertised as "8 GiB" and
388 has at least 7500 MiB available. However, asking for 8192 MiB
389 would either result in an unnecessarily expensive 12 GiB node
390 (if node manager knows about the discrepancy), or an 8 GiB
391 node which has less than 8192 MiB available and is therefore
392 considered by crunch-dispatch to be too small to meet our
395 When node manager learns how to predict the available memory
396 for each node type such that crunch-dispatch always agrees
397 that a node is big enough to run the job it was brought up
398 for, all this will be unnecessary. We'll just ask for exactly
399 the memory we want -- even if that happens to be 8192 MiB.
402 constraint_key = self._map_runtime_constraint('ram')
403 used_bytes = self.stats_max['mem']['rss']
404 if used_bytes == float('-Inf'):
405 logger.warning('%s: no memory usage data', self.label)
407 used_mib = math.ceil(float(used_bytes) / MB)
408 asked_mib = self.existing_constraints.get(constraint_key)
410 nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
411 if used_mib > 0 and (asked_mib is None or (
412 math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib))):
414 '#!! {} max RSS was {} MiB -- '
415 'try reducing runtime_constraints to "{}":{}'
420 int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024*(MB)/self._runtime_constraint_mem_unit()))
422 def _recommend_keep_cache(self):
423 """Recommend increasing keep cache if utilization < 80%"""
424 constraint_key = self._map_runtime_constraint('keep_cache_ram')
425 if self.job_tot['net:keep0']['rx'] == 0:
427 utilization = (float(self.job_tot['blkio:0:0']['read']) /
428 float(self.job_tot['net:keep0']['rx']))
429 # FIXME: the default on this get won't work correctly
430 asked_cache = self.existing_constraints.get(constraint_key, 256) * self._runtime_constraint_mem_unit()
432 if utilization < 0.8:
434 '#!! {} Keep cache utilization was {:.2f}% -- '
435 'try doubling runtime_constraints to "{}":{} (or more)'
440 math.ceil(asked_cache * 2 / self._runtime_constraint_mem_unit()))
443 def _recommend_temp_disk(self):
444 """Recommend decreasing temp disk if utilization < 50%"""
445 total = float(self.job_tot['statfs']['total'])
446 utilization = (float(self.job_tot['statfs']['used']) / total) if total > 0 else 0.0
448 if utilization < 50.8 and total > 0:
450 '#!! {} max temp disk utilization was {:.0f}% of {:.0f} MiB -- '
451 'consider reducing "tmpdirMin" and/or "outdirMin"'
458 def _format(self, val):
459 """Return a string representation of a stat.
461 {:.2f} for floats, default format for everything else."""
462 if isinstance(val, float):
463 return '{:.2f}'.format(val)
465 return '{}'.format(val)
467 def _runtime_constraint_mem_unit(self):
468 if hasattr(self, 'runtime_constraint_mem_unit'):
469 return self.runtime_constraint_mem_unit
470 elif self.detected_crunch1:
471 return JobSummarizer.runtime_constraint_mem_unit
473 return ContainerSummarizer.runtime_constraint_mem_unit
475 def _map_runtime_constraint(self, key):
476 if hasattr(self, 'map_runtime_constraint'):
477 return self.map_runtime_constraint[key]
478 elif self.detected_crunch1:
479 return JobSummarizer.map_runtime_constraint[key]
484 class CollectionSummarizer(Summarizer):
485 def __init__(self, collection_id, **kwargs):
486 super(CollectionSummarizer, self).__init__(
487 crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
488 self.label = collection_id
491 def NewSummarizer(process_or_uuid, **kwargs):
492 """Construct with the appropriate subclass for this uuid/object."""
494 if isinstance(process_or_uuid, dict):
495 process = process_or_uuid
496 uuid = process['uuid']
498 uuid = process_or_uuid
500 arv = arvados.api('v1', model=OrderedJsonModel())
502 if '-dz642-' in uuid:
504 process = arv.containers().get(uuid=uuid).execute()
505 klass = ContainerTreeSummarizer
506 elif '-xvhdp-' in uuid:
508 process = arv.container_requests().get(uuid=uuid).execute()
509 klass = ContainerTreeSummarizer
510 elif '-8i9sb-' in uuid:
512 process = arv.jobs().get(uuid=uuid).execute()
513 klass = JobTreeSummarizer
514 elif '-d1hrv-' in uuid:
516 process = arv.pipeline_instances().get(uuid=uuid).execute()
517 klass = PipelineSummarizer
518 elif '-4zz18-' in uuid:
519 return CollectionSummarizer(collection_id=uuid)
521 raise ArgumentError("Unrecognized uuid %s", uuid)
522 return klass(process, uuid=uuid, **kwargs)
525 class ProcessSummarizer(Summarizer):
526 """Process is a job, pipeline, container, or container request."""
528 def __init__(self, process, label=None, **kwargs):
530 self.process = process
532 label = self.process.get('name', self.process['uuid'])
533 if self.process.get('log'):
535 rdr = crunchstat_summary.reader.CollectionReader(self.process['log'])
536 except arvados.errors.NotFoundError as e:
537 logger.warning("Trying event logs after failing to read "
538 "log collection %s: %s", self.process['log'], e)
540 rdr = crunchstat_summary.reader.LiveLogReader(self.process['uuid'])
541 label = label + ' (partial)'
542 super(ProcessSummarizer, self).__init__(rdr, label=label, **kwargs)
543 self.existing_constraints = self.process.get('runtime_constraints', {})
546 class JobSummarizer(ProcessSummarizer):
547 runtime_constraint_mem_unit = MB
548 map_runtime_constraint = {
549 'keep_cache_ram': 'keep_cache_mb_per_task',
550 'ram': 'min_ram_mb_per_node',
551 'vcpus': 'min_cores_per_node',
555 class ContainerSummarizer(ProcessSummarizer):
556 runtime_constraint_mem_unit = 1
559 class MultiSummarizer(object):
560 def __init__(self, children={}, label=None, threads=1, **kwargs):
561 self.throttle = threading.Semaphore(threads)
562 self.children = children
565 def run_and_release(self, target, *args, **kwargs):
567 return target(*args, **kwargs)
569 self.throttle.release()
573 for child in self.children.values():
574 self.throttle.acquire()
575 t = threading.Thread(target=self.run_and_release, args=(child.run, ))
582 def text_report(self):
584 d = self._descendants()
585 for child in d.values():
587 txt += '### Summary for {} ({})\n'.format(
588 child.label, child.process['uuid'])
589 txt += child.text_report()
593 def _descendants(self):
594 """Dict of self and all descendants.
596 Nodes with nothing of their own to report (like
597 MultiSummarizers) are omitted.
599 d = collections.OrderedDict()
600 for key, child in self.children.items():
601 if isinstance(child, Summarizer):
603 if isinstance(child, MultiSummarizer):
604 d.update(child._descendants())
607 def html_report(self):
608 return WEBCHART_CLASS(self.label, iter(self._descendants().values())).html()
611 class JobTreeSummarizer(MultiSummarizer):
612 """Summarizes a job and all children listed in its components field."""
613 def __init__(self, job, label=None, **kwargs):
614 arv = arvados.api('v1', model=OrderedJsonModel())
615 label = label or job.get('name', job['uuid'])
616 children = collections.OrderedDict()
617 children[job['uuid']] = JobSummarizer(job, label=label, **kwargs)
618 if job.get('components', None):
620 for j in arv.jobs().index(
621 limit=len(job['components']),
622 filters=[['uuid','in',list(job['components'].values())]]).execute()['items']:
623 preloaded[j['uuid']] = j
624 for cname in sorted(job['components'].keys()):
625 child_uuid = job['components'][cname]
626 j = (preloaded.get(child_uuid) or
627 arv.jobs().get(uuid=child_uuid).execute())
628 children[child_uuid] = JobTreeSummarizer(job=j, label=cname, **kwargs)
630 super(JobTreeSummarizer, self).__init__(
636 class PipelineSummarizer(MultiSummarizer):
637 def __init__(self, instance, **kwargs):
638 children = collections.OrderedDict()
639 for cname, component in instance['components'].items():
640 if 'job' not in component:
642 "%s: skipping component with no job assigned", cname)
645 "%s: job %s", cname, component['job']['uuid'])
646 summarizer = JobTreeSummarizer(component['job'], label=cname, **kwargs)
647 summarizer.label = '{} {}'.format(
648 cname, component['job']['uuid'])
649 children[cname] = summarizer
650 super(PipelineSummarizer, self).__init__(
652 label=instance['uuid'],
656 class ContainerTreeSummarizer(MultiSummarizer):
657 def __init__(self, root, skip_child_jobs=False, **kwargs):
658 arv = arvados.api('v1', model=OrderedJsonModel())
660 label = kwargs.pop('label', None) or root.get('name') or root['uuid']
663 children = collections.OrderedDict()
664 todo = collections.deque((root, ))
666 current = todo.popleft()
667 label = current['name']
668 sort_key = current['created_at']
669 if current['uuid'].find('-xvhdp-') > 0:
670 current = arv.containers().get(uuid=current['container_uuid']).execute()
672 summer = ContainerSummarizer(current, label=label, **kwargs)
673 summer.sort_key = sort_key
674 children[current['uuid']] = summer
678 child_crs = arv.container_requests().index(
680 filters=page_filters+[
681 ['requesting_container_uuid', '=', current['uuid']]],
683 if not child_crs['items']:
685 elif skip_child_jobs:
686 logger.warning('%s: omitting stats from %d child containers'
687 ' because --skip-child-jobs flag is on',
688 label, child_crs['items_available'])
690 page_filters = [['uuid', '>', child_crs['items'][-1]['uuid']]]
691 for cr in child_crs['items']:
692 if cr['container_uuid']:
693 logger.debug('%s: container req %s', current['uuid'], cr['uuid'])
694 cr['name'] = cr.get('name') or cr['uuid']
696 sorted_children = collections.OrderedDict()
697 for uuid in sorted(list(children.keys()), key=lambda uuid: children[uuid].sort_key):
698 sorted_children[uuid] = children[uuid]
699 super(ContainerTreeSummarizer, self).__init__(
700 children=sorted_children,