1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
5 from __future__ import print_function
9 import crunchstat_summary.dygraphs
10 import crunchstat_summary.reader
20 from arvados.api import OrderedJsonModel
21 from crunchstat_summary import logger
23 # Recommend memory constraints that are this multiple of an integral
24 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
25 # that have amounts like 7.5 GiB according to the kernel.)
26 AVAILABLE_RAM_RATIO = 0.95
29 # Workaround datetime.datetime.strptime() thread-safety bug by calling
30 # it once before starting threads. https://bugs.python.org/issue7980
31 datetime.datetime.strptime('1999-12-31_23:59:59', '%Y-%m-%d_%H:%M:%S')
34 WEBCHART_CLASS = crunchstat_summary.dygraphs.DygraphsChart
40 self.series = collections.defaultdict(list)
43 class Summarizer(object):
44 def __init__(self, logdata, label=None, skip_child_jobs=False):
45 self._logdata = logdata
49 self.finishtime = None
50 self._skip_child_jobs = skip_child_jobs
52 # stats_max: {category: {stat: val}}
53 self.stats_max = collections.defaultdict(
54 functools.partial(collections.defaultdict, lambda: 0))
55 # task_stats: {task_id: {category: {stat: val}}}
56 self.task_stats = collections.defaultdict(
57 functools.partial(collections.defaultdict, dict))
60 self.tasks = collections.defaultdict(Task)
62 # We won't bother recommending new runtime constraints if the
63 # constraints given when running the job are known to us and
64 # are already suitable. If applicable, the subclass
65 # constructor will overwrite this with something useful.
66 self.existing_constraints = {}
68 logger.debug("%s: logdata %s", self.label, logdata)
71 logger.debug("%s: parsing logdata %s", self.label, self._logdata)
72 for line in self._logdata:
73 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
75 seq = int(m.group('seq'))
76 uuid = m.group('task_uuid')
77 self.seq_to_uuid[seq] = uuid
78 logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
81 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
83 task_id = self.seq_to_uuid[int(m.group('seq'))]
84 elapsed = int(m.group('elapsed'))
85 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
86 if elapsed > self.stats_max['time']['elapsed']:
87 self.stats_max['time']['elapsed'] = elapsed
90 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
92 uuid = m.group('uuid')
93 if self._skip_child_jobs:
94 logger.warning('%s: omitting stats from child job %s'
95 ' because --skip-child-jobs flag is on',
98 logger.debug('%s: follow %s', self.label, uuid)
99 child_summarizer = ProcessSummarizer(uuid)
100 child_summarizer.stats_max = self.stats_max
101 child_summarizer.task_stats = self.task_stats
102 child_summarizer.tasks = self.tasks
103 child_summarizer.starttime = self.starttime
104 child_summarizer.run()
105 logger.debug('%s: done %s', self.label, uuid)
108 m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
111 task_id = self.seq_to_uuid[int(m.group('seq'))]
113 m = re.search(r'^(?P<timestamp>\S+) (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
115 # crunch2 container (seq/task don't apply)
116 task_id = 'container'
118 # not a crunchstat log
120 task = self.tasks[task_id]
123 if self.label is None:
124 self.label = m.group('job_uuid')
125 logger.debug('%s: using job uuid as label', self.label)
126 if m.group('category').endswith(':'):
127 # "stderr crunchstat: notice: ..."
129 elif m.group('category') in ('error', 'caught'):
131 elif m.group('category') == 'read':
132 # "stderr crunchstat: read /proc/1234/net/dev: ..."
133 # (crunchstat formatting fixed, but old logs still say this)
136 # Use the first and last crunchstat timestamps as
137 # approximations of starttime and finishtime.
138 timestamp = m.group('timestamp')
139 if timestamp[10:11] == '_':
140 timestamp = datetime.datetime.strptime(
141 timestamp, '%Y-%m-%d_%H:%M:%S')
142 elif timestamp[10:11] == 'T':
143 timestamp = datetime.datetime.strptime(
144 timestamp[:19], '%Y-%m-%dT%H:%M:%S')
146 raise ValueError("Cannot parse timestamp {!r}".format(
149 if not task.starttime:
150 task.starttime = timestamp
151 logger.debug('%s: task %s starttime %s',
152 self.label, task_id, timestamp)
153 task.finishtime = timestamp
155 if not self.starttime:
156 self.starttime = timestamp
157 self.finishtime = timestamp
159 this_interval_s = None
160 for group in ['current', 'interval']:
161 if not m.group(group):
163 category = m.group('category')
164 words = m.group(group).split(' ')
166 for val, stat in zip(words[::2], words[1::2]):
169 stats[stat] = float(val)
171 stats[stat] = int(val)
172 except ValueError as e:
174 'Error parsing {} stat: {!r}'.format(
176 if 'user' in stats or 'sys' in stats:
177 stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
178 if 'tx' in stats or 'rx' in stats:
179 stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
180 for stat, val in stats.iteritems():
181 if group == 'interval':
182 if stat == 'seconds':
183 this_interval_s = val
185 elif not (this_interval_s > 0):
187 "BUG? interval stat given with duration {!r}".
188 format(this_interval_s))
191 stat = stat + '__rate'
192 val = val / this_interval_s
193 if stat in ['user+sys__rate', 'tx+rx__rate']:
194 task.series[category, stat].append(
195 (timestamp - self.starttime, val))
198 task.series[category, stat].append(
199 (timestamp - self.starttime, val))
200 self.task_stats[task_id][category][stat] = val
201 if val > self.stats_max[category][stat]:
202 self.stats_max[category][stat] = val
203 except Exception as e:
204 logger.info('Skipping malformed line: {}Error was: {}\n'.format(line, e))
205 logger.debug('%s: done parsing', self.label)
207 self.job_tot = collections.defaultdict(
208 functools.partial(collections.defaultdict, int))
209 for task_id, task_stat in self.task_stats.iteritems():
210 for category, stat_last in task_stat.iteritems():
211 for stat, val in stat_last.iteritems():
212 if stat in ['cpus', 'cache', 'swap', 'rss']:
213 # meaningless stats like 16 cpu cores x 5 tasks = 80
215 self.job_tot[category][stat] += val
216 logger.debug('%s: done totals', self.label)
218 def long_label(self):
221 label += ' -- elapsed time '
222 s = (self.finishtime - self.starttime).total_seconds()
224 label += '{}d'.format(int(s/86400))
226 label += '{}h'.format(int(s/3600) % 24)
228 label += '{}m'.format(int(s/60) % 60)
229 label += '{}s'.format(int(s) % 60)
232 def text_report(self):
234 return "(no report generated)\n"
235 return "\n".join(itertools.chain(
236 self._text_report_gen(),
237 self._recommend_gen())) + "\n"
239 def html_report(self):
240 return WEBCHART_CLASS(self.label, [self]).html()
242 def _text_report_gen(self):
243 yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
244 for category, stat_max in sorted(self.stats_max.iteritems()):
245 for stat, val in sorted(stat_max.iteritems()):
246 if stat.endswith('__rate'):
248 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
249 val = self._format(val)
250 tot = self._format(self.job_tot[category].get(stat, '-'))
251 yield "\t".join([category, stat, str(val), max_rate, tot])
253 ('Number of tasks: {}',
256 ('Max CPU time spent by a single task: {}s',
257 self.stats_max['cpu']['user+sys'],
259 ('Max CPU usage in a single interval: {}%',
260 self.stats_max['cpu']['user+sys__rate'],
262 ('Overall CPU usage: {}%',
263 self.job_tot['cpu']['user+sys'] /
264 self.job_tot['time']['elapsed']
265 if self.job_tot['time']['elapsed'] > 0 else 0,
267 ('Max memory used by a single task: {}GB',
268 self.stats_max['mem']['rss'],
270 ('Max network traffic in a single task: {}GB',
271 self.stats_max['net:eth0']['tx+rx'] +
272 self.stats_max['net:keep0']['tx+rx'],
274 ('Max network speed in a single interval: {}MB/s',
275 self.stats_max['net:eth0']['tx+rx__rate'] +
276 self.stats_max['net:keep0']['tx+rx__rate'],
278 ('Keep cache miss rate {}%',
279 (float(self.job_tot['keepcache']['miss']) /
280 float(self.job_tot['keepcalls']['get']))
281 if self.job_tot['keepcalls']['get'] > 0 else 0,
282 lambda x: x * 100.0),
283 ('Keep cache utilization {}%',
284 (float(self.job_tot['blkio:0:0']['read']) /
285 float(self.job_tot['net:keep0']['rx']))
286 if self.job_tot['net:keep0']['rx'] > 0 else 0,
287 lambda x: x * 100.0)):
288 format_string, val, transform = args
289 if val == float('-Inf'):
293 yield "# "+format_string.format(self._format(val))
295 def _recommend_gen(self):
296 return itertools.chain(
297 self._recommend_cpu(),
298 self._recommend_ram(),
299 self._recommend_keep_cache())
301 def _recommend_cpu(self):
302 """Recommend asking for 4 cores if max CPU usage was 333%"""
304 cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
305 if cpu_max_rate == float('-Inf'):
306 logger.warning('%s: no CPU usage data', self.label)
308 used_cores = max(1, int(math.ceil(cpu_max_rate)))
309 asked_cores = self.existing_constraints.get('min_cores_per_node')
310 if asked_cores is None or used_cores < asked_cores:
312 '#!! {} max CPU usage was {}% -- '
313 'try runtime_constraints "min_cores_per_node":{}'
316 int(math.ceil(cpu_max_rate*100)),
319 def _recommend_ram(self):
320 """Recommend an economical RAM constraint for this job.
322 Nodes that are advertised as "8 gibibytes" actually have what
323 we might call "8 nearlygibs" of memory available for jobs.
324 Here, we calculate a whole number of nearlygibs that would
325 have sufficed to run the job, then recommend requesting a node
326 with that number of nearlygibs (expressed as mebibytes).
328 Requesting a node with "nearly 8 gibibytes" is our best hope
329 of getting a node that actually has nearly 8 gibibytes
330 available. If the node manager is smart enough to account for
331 the discrepancy itself when choosing/creating a node, we'll
332 get an 8 GiB node with nearly 8 GiB available. Otherwise, the
333 advertised size of the next-size-smaller node (say, 6 GiB)
334 will be too low to satisfy our request, so we will effectively
335 get rounded up to 8 GiB.
337 For example, if we need 7500 MiB, we can ask for 7500 MiB, and
338 we will generally get a node that is advertised as "8 GiB" and
339 has at least 7500 MiB available. However, asking for 8192 MiB
340 would either result in an unnecessarily expensive 12 GiB node
341 (if node manager knows about the discrepancy), or an 8 GiB
342 node which has less than 8192 MiB available and is therefore
343 considered by crunch-dispatch to be too small to meet our
346 When node manager learns how to predict the available memory
347 for each node type such that crunch-dispatch always agrees
348 that a node is big enough to run the job it was brought up
349 for, all this will be unnecessary. We'll just ask for exactly
350 the memory we want -- even if that happens to be 8192 MiB.
353 used_bytes = self.stats_max['mem']['rss']
354 if used_bytes == float('-Inf'):
355 logger.warning('%s: no memory usage data', self.label)
357 used_mib = math.ceil(float(used_bytes) / 1048576)
358 asked_mib = self.existing_constraints.get('min_ram_mb_per_node')
360 nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
361 if asked_mib is None or (
362 math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib)):
364 '#!! {} max RSS was {} MiB -- '
365 'try runtime_constraints "min_ram_mb_per_node":{}'
369 int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024))
371 def _recommend_keep_cache(self):
372 """Recommend increasing keep cache if utilization < 80%"""
373 if self.job_tot['net:keep0']['rx'] == 0:
375 utilization = (float(self.job_tot['blkio:0:0']['read']) /
376 float(self.job_tot['net:keep0']['rx']))
377 asked_mib = self.existing_constraints.get('keep_cache_mb_per_task', 256)
379 if utilization < 0.8:
381 '#!! {} Keep cache utilization was {:.2f}% -- '
382 'try runtime_constraints "keep_cache_mb_per_task":{} (or more)'
389 def _format(self, val):
390 """Return a string representation of a stat.
392 {:.2f} for floats, default format for everything else."""
393 if isinstance(val, float):
394 return '{:.2f}'.format(val)
396 return '{}'.format(val)
399 class CollectionSummarizer(Summarizer):
400 def __init__(self, collection_id, **kwargs):
401 super(CollectionSummarizer, self).__init__(
402 crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
403 self.label = collection_id
406 def NewSummarizer(process, **kwargs):
407 """Construct with the appropriate subclass for this uuid/object."""
409 if not isinstance(process, dict):
412 arv = arvados.api('v1', model=OrderedJsonModel())
414 if re.search('-dz642-', uuid):
416 process = arv.containers().get(uuid=uuid).execute()
417 return ContainerSummarizer(process, **kwargs)
418 elif re.search('-xvhdp-', uuid):
420 ctrReq = arv.container_requests().get(uuid=uuid).execute()
421 ctrUUID = ctrReq['container_uuid']
422 process = arv.containers().get(uuid=ctrUUID).execute()
423 return ContainerSummarizer(process, **kwargs)
424 elif re.search('-8i9sb-', uuid):
426 process = arv.jobs().get(uuid=uuid).execute()
427 return JobSummarizer(process, **kwargs)
428 elif re.search('-d1hrv-', uuid):
430 process = arv.pipeline_instances().get(uuid=uuid).execute()
431 return PipelineSummarizer(process, **kwargs)
433 raise ArgumentError("Unrecognized uuid %s", uuid)
436 class ProcessSummarizer(Summarizer):
437 """Process is a job, pipeline, container, or container request."""
439 def __init__(self, process, **kwargs):
441 self.process = process
442 if self.process.get('log'):
444 rdr = crunchstat_summary.reader.CollectionReader(self.process['log'])
445 except arvados.errors.NotFoundError as e:
446 logger.warning("Trying event logs after failing to read "
447 "log collection %s: %s", self.process['log'], e)
449 label = self.process['uuid']
451 rdr = crunchstat_summary.reader.LiveLogReader(self.process['uuid'])
452 label = self.process['uuid'] + ' (partial)'
453 super(ProcessSummarizer, self).__init__(rdr, **kwargs)
455 self.existing_constraints = self.process.get('runtime_constraints', {})
458 class JobSummarizer(ProcessSummarizer):
462 class ContainerSummarizer(ProcessSummarizer):
466 class PipelineSummarizer(object):
467 def __init__(self, instance, **kwargs):
468 self.summarizers = collections.OrderedDict()
469 for cname, component in instance['components'].iteritems():
470 if 'job' not in component:
472 "%s: skipping component with no job assigned", cname)
475 "%s: job %s", cname, component['job']['uuid'])
476 summarizer = JobSummarizer(component['job'], **kwargs)
477 summarizer.label = '{} {}'.format(
478 cname, component['job']['uuid'])
479 self.summarizers[cname] = summarizer
480 self.label = instance['uuid']
484 for summarizer in self.summarizers.itervalues():
485 t = threading.Thread(target=summarizer.run)
492 def text_report(self):
494 for cname, summarizer in self.summarizers.iteritems():
495 txt += '### Summary for {} ({})\n'.format(
496 cname, summarizer.process['uuid'])
497 txt += summarizer.text_report()
501 def html_report(self):
502 return WEBCHART_CLASS(self.label, self.summarizers.itervalues()).html()