1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
7 import crunchstat_summary.dygraphs
8 import crunchstat_summary.reader
18 from arvados.api import OrderedJsonModel
19 from crunchstat_summary import logger
21 # Recommend memory constraints that are this multiple of an integral
22 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
23 # that have amounts like 7.5 GiB according to the kernel.)
24 AVAILABLE_RAM_RATIO = 0.95
27 # Workaround datetime.datetime.strptime() thread-safety bug by calling
28 # it once before starting threads. https://bugs.python.org/issue7980
29 datetime.datetime.strptime('1999-12-31_23:59:59', '%Y-%m-%d_%H:%M:%S')
32 WEBCHART_CLASS = crunchstat_summary.dygraphs.DygraphsChart
38 self.finishtime = None
39 self.series = collections.defaultdict(list)
42 class Summarizer(object):
43 def __init__(self, logdata, label=None, skip_child_jobs=False, uuid=None, **kwargs):
44 self._logdata = logdata
49 self.finishtime = None
50 self._skip_child_jobs = skip_child_jobs
52 # stats_max: {category: {stat: val}}
53 self.stats_max = collections.defaultdict(
54 functools.partial(collections.defaultdict, lambda: 0))
55 # task_stats: {task_id: {category: {stat: val}}}
56 self.task_stats = collections.defaultdict(
57 functools.partial(collections.defaultdict, dict))
60 self.tasks = collections.defaultdict(Task)
62 # We won't bother recommending new runtime constraints if the
63 # constraints given when running the job are known to us and
64 # are already suitable. If applicable, the subclass
65 # constructor will overwrite this with something useful.
66 self.existing_constraints = {}
68 logger.debug("%s: logdata %s", self.label, logdata)
71 logger.debug("%s: parsing logdata %s", self.label, self._logdata)
72 with self._logdata as logdata:
75 def _run(self, logdata):
76 self.detected_crunch1 = False
78 if not self.detected_crunch1 and '-8i9sb-' in line:
79 self.detected_crunch1 = True
81 if self.detected_crunch1:
82 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
84 seq = int(m.group('seq'))
85 uuid = m.group('task_uuid')
86 self.seq_to_uuid[seq] = uuid
87 logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
90 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
92 task_id = self.seq_to_uuid[int(m.group('seq'))]
93 elapsed = int(m.group('elapsed'))
94 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
95 if elapsed > self.stats_max['time']['elapsed']:
96 self.stats_max['time']['elapsed'] = elapsed
99 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
101 uuid = m.group('uuid')
102 if self._skip_child_jobs:
103 logger.warning('%s: omitting stats from child job %s'
104 ' because --skip-child-jobs flag is on',
107 logger.debug('%s: follow %s', self.label, uuid)
108 child_summarizer = ProcessSummarizer(uuid)
109 child_summarizer.stats_max = self.stats_max
110 child_summarizer.task_stats = self.task_stats
111 child_summarizer.tasks = self.tasks
112 child_summarizer.starttime = self.starttime
113 child_summarizer.run()
114 logger.debug('%s: done %s', self.label, uuid)
117 # 2017-12-02_17:15:08 e51c5-8i9sb-mfp68stkxnqdd6m 63676 0 stderr crunchstat: keepcalls 0 put 2576 get -- interval 10.0000 seconds 0 put 2576 get
118 m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr (?P<crunchstat>crunchstat: )(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
123 # 2017-12-01T16:56:24.723509200Z crunchstat: keepcalls 0 put 3 get -- interval 10.0000 seconds 0 put 3 get
124 m = re.search(r'^(?P<timestamp>\S+) (?P<crunchstat>crunchstat: )?(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
128 if self.label is None:
130 self.label = m.group('job_uuid')
132 self.label = 'label #1'
133 category = m.group('category')
134 if category.endswith(':'):
135 # "stderr crunchstat: notice: ..."
137 elif category in ('error', 'caught'):
139 elif category in ('read', 'open', 'cgroup', 'CID', 'Running'):
140 # "stderr crunchstat: read /proc/1234/net/dev: ..."
141 # (old logs are less careful with unprefixed error messages)
144 if self.detected_crunch1:
145 task_id = self.seq_to_uuid[int(m.group('seq'))]
147 task_id = 'container'
148 task = self.tasks[task_id]
150 # Use the first and last crunchstat timestamps as
151 # approximations of starttime and finishtime.
152 timestamp = m.group('timestamp')
153 if timestamp[10:11] == '_':
154 timestamp = datetime.datetime.strptime(
155 timestamp, '%Y-%m-%d_%H:%M:%S')
156 elif timestamp[10:11] == 'T':
157 timestamp = datetime.datetime.strptime(
158 timestamp[:19], '%Y-%m-%dT%H:%M:%S')
160 raise ValueError("Cannot parse timestamp {!r}".format(
163 if task.starttime is None:
164 logger.debug('%s: task %s starttime %s',
165 self.label, task_id, timestamp)
166 if task.starttime is None or timestamp < task.starttime:
167 task.starttime = timestamp
168 if task.finishtime is None or timestamp > task.finishtime:
169 task.finishtime = timestamp
171 if self.starttime is None or timestamp < self.starttime:
172 self.starttime = timestamp
173 if self.finishtime is None or timestamp > self.finishtime:
174 self.finishtime = timestamp
176 if (not self.detected_crunch1) and task.starttime is not None and task.finishtime is not None:
177 elapsed = (task.finishtime - task.starttime).seconds
178 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
179 if elapsed > self.stats_max['time']['elapsed']:
180 self.stats_max['time']['elapsed'] = elapsed
182 this_interval_s = None
183 for group in ['current', 'interval']:
184 if not m.group(group):
186 category = m.group('category')
187 words = m.group(group).split(' ')
190 for val, stat in zip(words[::2], words[1::2]):
192 stats[stat] = float(val)
194 stats[stat] = int(val)
195 except ValueError as e:
196 # If the line doesn't start with 'crunchstat:' we
197 # might have mistaken an error message for a
198 # structured crunchstat line.
199 if m.group("crunchstat") is None or m.group("category") == "crunchstat":
200 logger.warning("%s: log contains message\n %s", self.label, line)
203 '%s: Error parsing value %r (stat %r, category %r): %r',
204 self.label, val, stat, category, e)
205 logger.warning('%s', line)
207 if 'user' in stats or 'sys' in stats:
208 stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
209 if 'tx' in stats or 'rx' in stats:
210 stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
211 if group == 'interval':
212 if 'seconds' in stats:
213 this_interval_s = stats.get('seconds',0)
215 if this_interval_s <= 0:
217 "BUG? interval stat given with duration {!r}".
218 format(this_interval_s))
220 logger.error('BUG? interval stat missing duration')
221 for stat, val in stats.items():
222 if group == 'interval' and this_interval_s:
223 stat = stat + '__rate'
224 val = val / this_interval_s
225 if stat in ['user+sys__rate', 'user__rate', 'sys__rate', 'tx+rx__rate', 'rx__rate', 'tx__rate']:
226 task.series[category, stat].append(
227 (timestamp - self.starttime, val))
229 if stat in ['rss','used','total']:
230 task.series[category, stat].append(
231 (timestamp - self.starttime, val))
232 self.task_stats[task_id][category][stat] = val
233 if val > self.stats_max[category][stat]:
234 self.stats_max[category][stat] = val
235 logger.debug('%s: done parsing', self.label)
237 self.job_tot = collections.defaultdict(
238 functools.partial(collections.defaultdict, int))
239 for task_id, task_stat in self.task_stats.items():
240 for category, stat_last in task_stat.items():
241 for stat, val in stat_last.items():
242 if stat in ['cpus', 'cache', 'swap', 'rss']:
243 # meaningless stats like 16 cpu cores x 5 tasks = 80
245 self.job_tot[category][stat] += val
246 logger.debug('%s: done totals', self.label)
248 def long_label(self):
250 if hasattr(self, 'process') and self.process['uuid'] not in label:
251 label = '{} ({})'.format(label, self.process['uuid'])
253 label += ' -- elapsed time '
254 s = (self.finishtime - self.starttime).total_seconds()
256 label += '{}d'.format(int(s/86400))
258 label += '{}h'.format(int(s/3600) % 24)
260 label += '{}m'.format(int(s/60) % 60)
261 label += '{}s'.format(int(s) % 60)
264 def text_report(self):
266 return "(no report generated)\n"
267 return "\n".join(itertools.chain(
268 self._text_report_gen(),
269 self._recommend_gen())) + "\n"
271 def html_report(self):
272 return WEBCHART_CLASS(self.label, [self]).html()
274 def _text_report_gen(self):
275 yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
276 for category, stat_max in sorted(self.stats_max.items()):
277 for stat, val in sorted(stat_max.items()):
278 if stat.endswith('__rate'):
280 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
281 val = self._format(val)
282 tot = self._format(self.job_tot[category].get(stat, '-'))
283 yield "\t".join([category, stat, str(val), max_rate, tot])
285 ('Number of tasks: {}',
288 ('Max CPU time spent by a single task: {}s',
289 self.stats_max['cpu']['user+sys'],
291 ('Max CPU usage in a single interval: {}%',
292 self.stats_max['cpu']['user+sys__rate'],
294 ('Overall CPU usage: {}%',
295 float(self.job_tot['cpu']['user+sys']) /
296 self.job_tot['time']['elapsed']
297 if self.job_tot['time']['elapsed'] > 0 else 0,
299 ('Max memory used by a single task: {}GB',
300 self.stats_max['mem']['rss'],
302 ('Max network traffic in a single task: {}GB',
303 self.stats_max['net:eth0']['tx+rx'] +
304 self.stats_max['net:keep0']['tx+rx'],
306 ('Max network speed in a single interval: {}MB/s',
307 self.stats_max['net:eth0']['tx+rx__rate'] +
308 self.stats_max['net:keep0']['tx+rx__rate'],
310 ('Keep cache miss rate {}%',
311 (float(self.job_tot['keepcache']['miss']) /
312 float(self.job_tot['keepcalls']['get']))
313 if self.job_tot['keepcalls']['get'] > 0 else 0,
314 lambda x: x * 100.0),
315 ('Keep cache utilization {}%',
316 (float(self.job_tot['blkio:0:0']['read']) /
317 float(self.job_tot['net:keep0']['rx']))
318 if self.job_tot['net:keep0']['rx'] > 0 else 0,
319 lambda x: x * 100.0),
320 ('Temp disk utilization {}%',
321 (float(self.job_tot['statfs']['used']) /
322 float(self.job_tot['statfs']['total']))
323 if self.job_tot['statfs']['total'] > 0 else 0,
324 lambda x: x * 100.0),
326 format_string, val, transform = args
327 if val == float('-Inf'):
331 yield "# "+format_string.format(self._format(val))
333 def _recommend_gen(self):
334 # TODO recommend fixing job granularity if elapsed time is too short
335 return itertools.chain(
336 self._recommend_cpu(),
337 self._recommend_ram(),
338 self._recommend_keep_cache())
340 def _recommend_cpu(self):
341 """Recommend asking for 4 cores if max CPU usage was 333%"""
343 constraint_key = self._map_runtime_constraint('vcpus')
344 cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
345 if cpu_max_rate == float('-Inf') or cpu_max_rate == 0.0:
346 logger.warning('%s: no CPU usage data', self.label)
348 # TODO Don't necessarily want to recommend on isolated max peak
349 # take average CPU usage into account as well or % time at max
350 used_cores = max(1, int(math.ceil(cpu_max_rate)))
351 asked_cores = self.existing_constraints.get(constraint_key)
352 if asked_cores is None:
354 # TODO: This should be more nuanced in cases where max >> avg
355 if used_cores < asked_cores:
357 '#!! {} max CPU usage was {}% -- '
358 'try reducing runtime_constraints to "{}":{}'
361 math.ceil(cpu_max_rate*100),
365 # FIXME: This needs to be updated to account for current nodemanager algorithms
366 def _recommend_ram(self):
367 """Recommend an economical RAM constraint for this job.
369 Nodes that are advertised as "8 gibibytes" actually have what
370 we might call "8 nearlygibs" of memory available for jobs.
371 Here, we calculate a whole number of nearlygibs that would
372 have sufficed to run the job, then recommend requesting a node
373 with that number of nearlygibs (expressed as mebibytes).
375 Requesting a node with "nearly 8 gibibytes" is our best hope
376 of getting a node that actually has nearly 8 gibibytes
377 available. If the node manager is smart enough to account for
378 the discrepancy itself when choosing/creating a node, we'll
379 get an 8 GiB node with nearly 8 GiB available. Otherwise, the
380 advertised size of the next-size-smaller node (say, 6 GiB)
381 will be too low to satisfy our request, so we will effectively
382 get rounded up to 8 GiB.
384 For example, if we need 7500 MiB, we can ask for 7500 MiB, and
385 we will generally get a node that is advertised as "8 GiB" and
386 has at least 7500 MiB available. However, asking for 8192 MiB
387 would either result in an unnecessarily expensive 12 GiB node
388 (if node manager knows about the discrepancy), or an 8 GiB
389 node which has less than 8192 MiB available and is therefore
390 considered by crunch-dispatch to be too small to meet our
393 When node manager learns how to predict the available memory
394 for each node type such that crunch-dispatch always agrees
395 that a node is big enough to run the job it was brought up
396 for, all this will be unnecessary. We'll just ask for exactly
397 the memory we want -- even if that happens to be 8192 MiB.
400 constraint_key = self._map_runtime_constraint('ram')
401 used_bytes = self.stats_max['mem']['rss']
402 if used_bytes == float('-Inf'):
403 logger.warning('%s: no memory usage data', self.label)
405 used_mib = math.ceil(float(used_bytes) / MB)
406 asked_mib = self.existing_constraints.get(constraint_key)
408 nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
409 if used_mib > 0 and (asked_mib is None or (
410 math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib))):
412 '#!! {} max RSS was {} MiB -- '
413 'try reducing runtime_constraints to "{}":{}'
418 int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024*(MB)/self._runtime_constraint_mem_unit()))
420 def _recommend_keep_cache(self):
421 """Recommend increasing keep cache if utilization < 80%"""
422 constraint_key = self._map_runtime_constraint('keep_cache_ram')
423 if self.job_tot['net:keep0']['rx'] == 0:
425 utilization = (float(self.job_tot['blkio:0:0']['read']) /
426 float(self.job_tot['net:keep0']['rx']))
427 # FIXME: the default on this get won't work correctly
428 asked_cache = self.existing_constraints.get(constraint_key, 256) * self._runtime_constraint_mem_unit()
430 if utilization < 0.8:
432 '#!! {} Keep cache utilization was {:.2f}% -- '
433 'try doubling runtime_constraints to "{}":{} (or more)'
438 math.ceil(asked_cache * 2 / self._runtime_constraint_mem_unit()))
441 def _format(self, val):
442 """Return a string representation of a stat.
444 {:.2f} for floats, default format for everything else."""
445 if isinstance(val, float):
446 return '{:.2f}'.format(val)
448 return '{}'.format(val)
450 def _runtime_constraint_mem_unit(self):
451 if hasattr(self, 'runtime_constraint_mem_unit'):
452 return self.runtime_constraint_mem_unit
453 elif self.detected_crunch1:
454 return JobSummarizer.runtime_constraint_mem_unit
456 return ContainerSummarizer.runtime_constraint_mem_unit
458 def _map_runtime_constraint(self, key):
459 if hasattr(self, 'map_runtime_constraint'):
460 return self.map_runtime_constraint[key]
461 elif self.detected_crunch1:
462 return JobSummarizer.map_runtime_constraint[key]
467 class CollectionSummarizer(Summarizer):
468 def __init__(self, collection_id, **kwargs):
469 super(CollectionSummarizer, self).__init__(
470 crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
471 self.label = collection_id
474 def NewSummarizer(process_or_uuid, **kwargs):
475 """Construct with the appropriate subclass for this uuid/object."""
477 if isinstance(process_or_uuid, dict):
478 process = process_or_uuid
479 uuid = process['uuid']
481 uuid = process_or_uuid
483 arv = arvados.api('v1', model=OrderedJsonModel())
485 if '-dz642-' in uuid:
487 process = arv.containers().get(uuid=uuid).execute()
488 klass = ContainerTreeSummarizer
489 elif '-xvhdp-' in uuid:
491 process = arv.container_requests().get(uuid=uuid).execute()
492 klass = ContainerTreeSummarizer
493 elif '-8i9sb-' in uuid:
495 process = arv.jobs().get(uuid=uuid).execute()
496 klass = JobTreeSummarizer
497 elif '-d1hrv-' in uuid:
499 process = arv.pipeline_instances().get(uuid=uuid).execute()
500 klass = PipelineSummarizer
501 elif '-4zz18-' in uuid:
502 return CollectionSummarizer(collection_id=uuid)
504 raise ArgumentError("Unrecognized uuid %s", uuid)
505 return klass(process, uuid=uuid, **kwargs)
508 class ProcessSummarizer(Summarizer):
509 """Process is a job, pipeline, container, or container request."""
511 def __init__(self, process, label=None, **kwargs):
513 self.process = process
515 label = self.process.get('name', self.process['uuid'])
516 if self.process.get('log'):
518 rdr = crunchstat_summary.reader.CollectionReader(self.process['log'])
519 except arvados.errors.NotFoundError as e:
520 logger.warning("Trying event logs after failing to read "
521 "log collection %s: %s", self.process['log'], e)
523 rdr = crunchstat_summary.reader.LiveLogReader(self.process['uuid'])
524 label = label + ' (partial)'
525 super(ProcessSummarizer, self).__init__(rdr, label=label, **kwargs)
526 self.existing_constraints = self.process.get('runtime_constraints', {})
529 class JobSummarizer(ProcessSummarizer):
530 runtime_constraint_mem_unit = MB
531 map_runtime_constraint = {
532 'keep_cache_ram': 'keep_cache_mb_per_task',
533 'ram': 'min_ram_mb_per_node',
534 'vcpus': 'min_cores_per_node',
538 class ContainerSummarizer(ProcessSummarizer):
539 runtime_constraint_mem_unit = 1
542 class MultiSummarizer(object):
543 def __init__(self, children={}, label=None, threads=1, **kwargs):
544 self.throttle = threading.Semaphore(threads)
545 self.children = children
548 def run_and_release(self, target, *args, **kwargs):
550 return target(*args, **kwargs)
552 self.throttle.release()
556 for child in self.children.values():
557 self.throttle.acquire()
558 t = threading.Thread(target=self.run_and_release, args=(child.run, ))
565 def text_report(self):
567 d = self._descendants()
568 for child in d.values():
570 txt += '### Summary for {} ({})\n'.format(
571 child.label, child.process['uuid'])
572 txt += child.text_report()
576 def _descendants(self):
577 """Dict of self and all descendants.
579 Nodes with nothing of their own to report (like
580 MultiSummarizers) are omitted.
582 d = collections.OrderedDict()
583 for key, child in self.children.items():
584 if isinstance(child, Summarizer):
586 if isinstance(child, MultiSummarizer):
587 d.update(child._descendants())
590 def html_report(self):
591 return WEBCHART_CLASS(self.label, iter(self._descendants().values())).html()
594 class JobTreeSummarizer(MultiSummarizer):
595 """Summarizes a job and all children listed in its components field."""
596 def __init__(self, job, label=None, **kwargs):
597 arv = arvados.api('v1', model=OrderedJsonModel())
598 label = label or job.get('name', job['uuid'])
599 children = collections.OrderedDict()
600 children[job['uuid']] = JobSummarizer(job, label=label, **kwargs)
601 if job.get('components', None):
603 for j in arv.jobs().index(
604 limit=len(job['components']),
605 filters=[['uuid','in',list(job['components'].values())]]).execute()['items']:
606 preloaded[j['uuid']] = j
607 for cname in sorted(job['components'].keys()):
608 child_uuid = job['components'][cname]
609 j = (preloaded.get(child_uuid) or
610 arv.jobs().get(uuid=child_uuid).execute())
611 children[child_uuid] = JobTreeSummarizer(job=j, label=cname, **kwargs)
613 super(JobTreeSummarizer, self).__init__(
619 class PipelineSummarizer(MultiSummarizer):
620 def __init__(self, instance, **kwargs):
621 children = collections.OrderedDict()
622 for cname, component in instance['components'].items():
623 if 'job' not in component:
625 "%s: skipping component with no job assigned", cname)
628 "%s: job %s", cname, component['job']['uuid'])
629 summarizer = JobTreeSummarizer(component['job'], label=cname, **kwargs)
630 summarizer.label = '{} {}'.format(
631 cname, component['job']['uuid'])
632 children[cname] = summarizer
633 super(PipelineSummarizer, self).__init__(
635 label=instance['uuid'],
639 class ContainerTreeSummarizer(MultiSummarizer):
640 def __init__(self, root, skip_child_jobs=False, **kwargs):
641 arv = arvados.api('v1', model=OrderedJsonModel())
643 label = kwargs.pop('label', None) or root.get('name') or root['uuid']
646 children = collections.OrderedDict()
647 todo = collections.deque((root, ))
649 current = todo.popleft()
650 label = current['name']
651 sort_key = current['created_at']
652 if current['uuid'].find('-xvhdp-') > 0:
653 current = arv.containers().get(uuid=current['container_uuid']).execute()
655 summer = ContainerSummarizer(current, label=label, **kwargs)
656 summer.sort_key = sort_key
657 children[current['uuid']] = summer
661 child_crs = arv.container_requests().index(
663 filters=page_filters+[
664 ['requesting_container_uuid', '=', current['uuid']]],
666 if not child_crs['items']:
668 elif skip_child_jobs:
669 logger.warning('%s: omitting stats from %d child containers'
670 ' because --skip-child-jobs flag is on',
671 label, child_crs['items_available'])
673 page_filters = [['uuid', '>', child_crs['items'][-1]['uuid']]]
674 for cr in child_crs['items']:
675 if cr['container_uuid']:
676 logger.debug('%s: container req %s', current['uuid'], cr['uuid'])
677 cr['name'] = cr.get('name') or cr['uuid']
679 sorted_children = collections.OrderedDict()
680 for uuid in sorted(list(children.keys()), key=lambda uuid: children[uuid].sort_key):
681 sorted_children[uuid] = children[uuid]
682 super(ContainerTreeSummarizer, self).__init__(
683 children=sorted_children,