1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
5 from __future__ import print_function
9 import crunchstat_summary.chartjs
10 import crunchstat_summary.reader
20 from arvados.api import OrderedJsonModel
21 from crunchstat_summary import logger
23 # Recommend memory constraints that are this multiple of an integral
24 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
25 # that have amounts like 7.5 GiB according to the kernel.)
26 AVAILABLE_RAM_RATIO = 0.95
29 # Workaround datetime.datetime.strptime() thread-safety bug by calling
30 # it once before starting threads. https://bugs.python.org/issue7980
31 datetime.datetime.strptime('1999-12-31_23:59:59', '%Y-%m-%d_%H:%M:%S')
37 self.series = collections.defaultdict(list)
40 class Summarizer(object):
41 def __init__(self, logdata, label=None, skip_child_jobs=False):
42 self._logdata = logdata
46 self.finishtime = None
47 self._skip_child_jobs = skip_child_jobs
49 # stats_max: {category: {stat: val}}
50 self.stats_max = collections.defaultdict(
51 functools.partial(collections.defaultdict, lambda: 0))
52 # task_stats: {task_id: {category: {stat: val}}}
53 self.task_stats = collections.defaultdict(
54 functools.partial(collections.defaultdict, dict))
57 self.tasks = collections.defaultdict(Task)
59 # We won't bother recommending new runtime constraints if the
60 # constraints given when running the job are known to us and
61 # are already suitable. If applicable, the subclass
62 # constructor will overwrite this with something useful.
63 self.existing_constraints = {}
65 logger.debug("%s: logdata %s", self.label, logdata)
68 logger.debug("%s: parsing logdata %s", self.label, self._logdata)
69 for line in self._logdata:
70 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
72 seq = int(m.group('seq'))
73 uuid = m.group('task_uuid')
74 self.seq_to_uuid[seq] = uuid
75 logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
78 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
80 task_id = self.seq_to_uuid[int(m.group('seq'))]
81 elapsed = int(m.group('elapsed'))
82 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
83 if elapsed > self.stats_max['time']['elapsed']:
84 self.stats_max['time']['elapsed'] = elapsed
87 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
89 uuid = m.group('uuid')
90 if self._skip_child_jobs:
91 logger.warning('%s: omitting stats from child job %s'
92 ' because --skip-child-jobs flag is on',
95 logger.debug('%s: follow %s', self.label, uuid)
96 child_summarizer = JobSummarizer(uuid)
97 child_summarizer.stats_max = self.stats_max
98 child_summarizer.task_stats = self.task_stats
99 child_summarizer.tasks = self.tasks
100 child_summarizer.starttime = self.starttime
101 child_summarizer.run()
102 logger.debug('%s: done %s', self.label, uuid)
105 m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
110 if self.label is None:
111 self.label = m.group('job_uuid')
112 logger.debug('%s: using job uuid as label', self.label)
113 if m.group('category').endswith(':'):
114 # "stderr crunchstat: notice: ..."
116 elif m.group('category') in ('error', 'caught'):
118 elif m.group('category') == 'read':
119 # "stderr crunchstat: read /proc/1234/net/dev: ..."
120 # (crunchstat formatting fixed, but old logs still say this)
122 task_id = self.seq_to_uuid[int(m.group('seq'))]
123 task = self.tasks[task_id]
125 # Use the first and last crunchstat timestamps as
126 # approximations of starttime and finishtime.
127 timestamp = datetime.datetime.strptime(
128 m.group('timestamp'), '%Y-%m-%d_%H:%M:%S')
129 if not task.starttime:
130 task.starttime = timestamp
131 logger.debug('%s: task %s starttime %s',
132 self.label, task_id, timestamp)
133 task.finishtime = timestamp
135 if not self.starttime:
136 self.starttime = timestamp
137 self.finishtime = timestamp
139 this_interval_s = None
140 for group in ['current', 'interval']:
141 if not m.group(group):
143 category = m.group('category')
144 words = m.group(group).split(' ')
146 for val, stat in zip(words[::2], words[1::2]):
149 stats[stat] = float(val)
151 stats[stat] = int(val)
152 except ValueError as e:
154 'Error parsing {} stat: {!r}'.format(
156 if 'user' in stats or 'sys' in stats:
157 stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
158 if 'tx' in stats or 'rx' in stats:
159 stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
160 for stat, val in stats.iteritems():
161 if group == 'interval':
162 if stat == 'seconds':
163 this_interval_s = val
165 elif not (this_interval_s > 0):
167 "BUG? interval stat given with duration {!r}".
168 format(this_interval_s))
171 stat = stat + '__rate'
172 val = val / this_interval_s
173 if stat in ['user+sys__rate', 'tx+rx__rate']:
174 task.series[category, stat].append(
175 (timestamp - self.starttime, val))
178 task.series[category, stat].append(
179 (timestamp - self.starttime, val))
180 self.task_stats[task_id][category][stat] = val
181 if val > self.stats_max[category][stat]:
182 self.stats_max[category][stat] = val
183 except Exception as e:
184 logger.info('Skipping malformed line: {}Error was: {}\n'.format(line, e))
185 logger.debug('%s: done parsing', self.label)
187 self.job_tot = collections.defaultdict(
188 functools.partial(collections.defaultdict, int))
189 for task_id, task_stat in self.task_stats.iteritems():
190 for category, stat_last in task_stat.iteritems():
191 for stat, val in stat_last.iteritems():
192 if stat in ['cpus', 'cache', 'swap', 'rss']:
193 # meaningless stats like 16 cpu cores x 5 tasks = 80
195 self.job_tot[category][stat] += val
196 logger.debug('%s: done totals', self.label)
198 def long_label(self):
201 label += ' -- elapsed time '
202 s = (self.finishtime - self.starttime).total_seconds()
204 label += '{}d'.format(int(s/86400))
206 label += '{}h'.format(int(s/3600) % 24)
208 label += '{}m'.format(int(s/60) % 60)
209 label += '{}s'.format(int(s) % 60)
212 def text_report(self):
214 return "(no report generated)\n"
215 return "\n".join(itertools.chain(
216 self._text_report_gen(),
217 self._recommend_gen())) + "\n"
219 def html_report(self):
220 return crunchstat_summary.chartjs.ChartJS(self.label, [self]).html()
222 def _text_report_gen(self):
223 yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
224 for category, stat_max in sorted(self.stats_max.iteritems()):
225 for stat, val in sorted(stat_max.iteritems()):
226 if stat.endswith('__rate'):
228 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
229 val = self._format(val)
230 tot = self._format(self.job_tot[category].get(stat, '-'))
231 yield "\t".join([category, stat, str(val), max_rate, tot])
233 ('Number of tasks: {}',
236 ('Max CPU time spent by a single task: {}s',
237 self.stats_max['cpu']['user+sys'],
239 ('Max CPU usage in a single interval: {}%',
240 self.stats_max['cpu']['user+sys__rate'],
242 ('Overall CPU usage: {}%',
243 self.job_tot['cpu']['user+sys'] /
244 self.job_tot['time']['elapsed']
245 if self.job_tot['time']['elapsed'] > 0 else 0,
247 ('Max memory used by a single task: {}GB',
248 self.stats_max['mem']['rss'],
250 ('Max network traffic in a single task: {}GB',
251 self.stats_max['net:eth0']['tx+rx'] +
252 self.stats_max['net:keep0']['tx+rx'],
254 ('Max network speed in a single interval: {}MB/s',
255 self.stats_max['net:eth0']['tx+rx__rate'] +
256 self.stats_max['net:keep0']['tx+rx__rate'],
258 ('Keep cache miss rate {}%',
259 (float(self.job_tot['keepcache']['miss']) /
260 float(self.job_tot['keepcalls']['get']))
261 if self.job_tot['keepcalls']['get'] > 0 else 0,
262 lambda x: x * 100.0),
263 ('Keep cache utilization {}%',
264 (float(self.job_tot['blkio:0:0']['read']) /
265 float(self.job_tot['net:keep0']['rx']))
266 if self.job_tot['net:keep0']['rx'] > 0 else 0,
267 lambda x: x * 100.0)):
268 format_string, val, transform = args
269 if val == float('-Inf'):
273 yield "# "+format_string.format(self._format(val))
275 def _recommend_gen(self):
276 return itertools.chain(
277 self._recommend_cpu(),
278 self._recommend_ram(),
279 self._recommend_keep_cache())
281 def _recommend_cpu(self):
282 """Recommend asking for 4 cores if max CPU usage was 333%"""
284 cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
285 if cpu_max_rate == float('-Inf'):
286 logger.warning('%s: no CPU usage data', self.label)
288 used_cores = max(1, int(math.ceil(cpu_max_rate)))
289 asked_cores = self.existing_constraints.get('min_cores_per_node')
290 if asked_cores is None or used_cores < asked_cores:
292 '#!! {} max CPU usage was {}% -- '
293 'try runtime_constraints "min_cores_per_node":{}'
296 int(math.ceil(cpu_max_rate*100)),
299 def _recommend_ram(self):
300 """Recommend an economical RAM constraint for this job.
302 Nodes that are advertised as "8 gibibytes" actually have what
303 we might call "8 nearlygibs" of memory available for jobs.
304 Here, we calculate a whole number of nearlygibs that would
305 have sufficed to run the job, then recommend requesting a node
306 with that number of nearlygibs (expressed as mebibytes).
308 Requesting a node with "nearly 8 gibibytes" is our best hope
309 of getting a node that actually has nearly 8 gibibytes
310 available. If the node manager is smart enough to account for
311 the discrepancy itself when choosing/creating a node, we'll
312 get an 8 GiB node with nearly 8 GiB available. Otherwise, the
313 advertised size of the next-size-smaller node (say, 6 GiB)
314 will be too low to satisfy our request, so we will effectively
315 get rounded up to 8 GiB.
317 For example, if we need 7500 MiB, we can ask for 7500 MiB, and
318 we will generally get a node that is advertised as "8 GiB" and
319 has at least 7500 MiB available. However, asking for 8192 MiB
320 would either result in an unnecessarily expensive 12 GiB node
321 (if node manager knows about the discrepancy), or an 8 GiB
322 node which has less than 8192 MiB available and is therefore
323 considered by crunch-dispatch to be too small to meet our
326 When node manager learns how to predict the available memory
327 for each node type such that crunch-dispatch always agrees
328 that a node is big enough to run the job it was brought up
329 for, all this will be unnecessary. We'll just ask for exactly
330 the memory we want -- even if that happens to be 8192 MiB.
333 used_bytes = self.stats_max['mem']['rss']
334 if used_bytes == float('-Inf'):
335 logger.warning('%s: no memory usage data', self.label)
337 used_mib = math.ceil(float(used_bytes) / 1048576)
338 asked_mib = self.existing_constraints.get('min_ram_mb_per_node')
340 nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
341 if asked_mib is None or (
342 math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib)):
344 '#!! {} max RSS was {} MiB -- '
345 'try runtime_constraints "min_ram_mb_per_node":{}'
349 int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024))
351 def _recommend_keep_cache(self):
352 """Recommend increasing keep cache if utilization < 80%"""
353 if self.job_tot['net:keep0']['rx'] == 0:
355 utilization = (float(self.job_tot['blkio:0:0']['read']) /
356 float(self.job_tot['net:keep0']['rx']))
357 asked_mib = self.existing_constraints.get('keep_cache_mb_per_task', 256)
359 if utilization < 0.8:
361 '#!! {} Keep cache utilization was {:.2f}% -- '
362 'try runtime_constraints "keep_cache_mb_per_task":{} (or more)'
369 def _format(self, val):
370 """Return a string representation of a stat.
372 {:.2f} for floats, default format for everything else."""
373 if isinstance(val, float):
374 return '{:.2f}'.format(val)
376 return '{}'.format(val)
379 class CollectionSummarizer(Summarizer):
380 def __init__(self, collection_id, **kwargs):
381 super(CollectionSummarizer, self).__init__(
382 crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
383 self.label = collection_id
386 class JobSummarizer(Summarizer):
387 def __init__(self, job, **kwargs):
388 arv = arvados.api('v1')
389 if isinstance(job, basestring):
390 self.job = arv.jobs().get(uuid=job).execute()
394 if self.job.get('log'):
396 rdr = crunchstat_summary.reader.CollectionReader(self.job['log'])
397 except arvados.errors.NotFoundError as e:
398 logger.warning("Trying event logs after failing to read "
399 "log collection %s: %s", self.job['log'], e)
401 label = self.job['uuid']
403 rdr = crunchstat_summary.reader.LiveLogReader(self.job['uuid'])
404 label = self.job['uuid'] + ' (partial)'
405 super(JobSummarizer, self).__init__(rdr, **kwargs)
407 self.existing_constraints = self.job.get('runtime_constraints', {})
410 class PipelineSummarizer(object):
411 def __init__(self, pipeline_instance_uuid, **kwargs):
412 arv = arvados.api('v1', model=OrderedJsonModel())
413 instance = arv.pipeline_instances().get(
414 uuid=pipeline_instance_uuid).execute()
415 self.summarizers = collections.OrderedDict()
416 for cname, component in instance['components'].iteritems():
417 if 'job' not in component:
419 "%s: skipping component with no job assigned", cname)
422 "%s: job %s", cname, component['job']['uuid'])
423 summarizer = JobSummarizer(component['job'], **kwargs)
424 summarizer.label = '{} {}'.format(
425 cname, component['job']['uuid'])
426 self.summarizers[cname] = summarizer
427 self.label = pipeline_instance_uuid
431 for summarizer in self.summarizers.itervalues():
432 t = threading.Thread(target=summarizer.run)
439 def text_report(self):
441 for cname, summarizer in self.summarizers.iteritems():
442 txt += '### Summary for {} ({})\n'.format(
443 cname, summarizer.job['uuid'])
444 txt += summarizer.text_report()
448 def html_report(self):
449 return crunchstat_summary.chartjs.ChartJS(
450 self.label, self.summarizers.itervalues()).html()