1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
5 from __future__ import print_function
9 import crunchstat_summary.dygraphs
10 import crunchstat_summary.reader
20 from arvados.api import OrderedJsonModel
21 from crunchstat_summary import logger
23 # Recommend memory constraints that are this multiple of an integral
24 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
25 # that have amounts like 7.5 GiB according to the kernel.)
26 AVAILABLE_RAM_RATIO = 0.95
29 # Workaround datetime.datetime.strptime() thread-safety bug by calling
30 # it once before starting threads. https://bugs.python.org/issue7980
31 datetime.datetime.strptime('1999-12-31_23:59:59', '%Y-%m-%d_%H:%M:%S')
34 WEBCHART_CLASS = crunchstat_summary.dygraphs.DygraphsChart
40 self.series = collections.defaultdict(list)
43 class Summarizer(object):
44 def __init__(self, logdata, label=None, skip_child_jobs=False):
45 self._logdata = logdata
49 self.finishtime = None
50 self._skip_child_jobs = skip_child_jobs
52 # stats_max: {category: {stat: val}}
53 self.stats_max = collections.defaultdict(
54 functools.partial(collections.defaultdict, lambda: 0))
55 # task_stats: {task_id: {category: {stat: val}}}
56 self.task_stats = collections.defaultdict(
57 functools.partial(collections.defaultdict, dict))
60 self.tasks = collections.defaultdict(Task)
62 # We won't bother recommending new runtime constraints if the
63 # constraints given when running the job are known to us and
64 # are already suitable. If applicable, the subclass
65 # constructor will overwrite this with something useful.
66 self.existing_constraints = {}
68 logger.debug("%s: logdata %s", self.label, logdata)
71 logger.debug("%s: parsing logdata %s", self.label, self._logdata)
72 for line in self._logdata:
73 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
75 seq = int(m.group('seq'))
76 uuid = m.group('task_uuid')
77 self.seq_to_uuid[seq] = uuid
78 logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
81 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
83 task_id = self.seq_to_uuid[int(m.group('seq'))]
84 elapsed = int(m.group('elapsed'))
85 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
86 if elapsed > self.stats_max['time']['elapsed']:
87 self.stats_max['time']['elapsed'] = elapsed
90 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
92 uuid = m.group('uuid')
93 if self._skip_child_jobs:
94 logger.warning('%s: omitting stats from child job %s'
95 ' because --skip-child-jobs flag is on',
98 logger.debug('%s: follow %s', self.label, uuid)
99 child_summarizer = JobSummarizer(uuid)
100 child_summarizer.stats_max = self.stats_max
101 child_summarizer.task_stats = self.task_stats
102 child_summarizer.tasks = self.tasks
103 child_summarizer.starttime = self.starttime
104 child_summarizer.run()
105 logger.debug('%s: done %s', self.label, uuid)
108 m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
113 if self.label is None:
114 self.label = m.group('job_uuid')
115 logger.debug('%s: using job uuid as label', self.label)
116 if m.group('category').endswith(':'):
117 # "stderr crunchstat: notice: ..."
119 elif m.group('category') in ('error', 'caught'):
121 elif m.group('category') == 'read':
122 # "stderr crunchstat: read /proc/1234/net/dev: ..."
123 # (crunchstat formatting fixed, but old logs still say this)
125 task_id = self.seq_to_uuid[int(m.group('seq'))]
126 task = self.tasks[task_id]
128 # Use the first and last crunchstat timestamps as
129 # approximations of starttime and finishtime.
130 timestamp = datetime.datetime.strptime(
131 m.group('timestamp'), '%Y-%m-%d_%H:%M:%S')
132 if not task.starttime:
133 task.starttime = timestamp
134 logger.debug('%s: task %s starttime %s',
135 self.label, task_id, timestamp)
136 task.finishtime = timestamp
138 if not self.starttime:
139 self.starttime = timestamp
140 self.finishtime = timestamp
142 this_interval_s = None
143 for group in ['current', 'interval']:
144 if not m.group(group):
146 category = m.group('category')
147 words = m.group(group).split(' ')
149 for val, stat in zip(words[::2], words[1::2]):
152 stats[stat] = float(val)
154 stats[stat] = int(val)
155 except ValueError as e:
157 'Error parsing {} stat: {!r}'.format(
159 if 'user' in stats or 'sys' in stats:
160 stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
161 if 'tx' in stats or 'rx' in stats:
162 stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
163 for stat, val in stats.iteritems():
164 if group == 'interval':
165 if stat == 'seconds':
166 this_interval_s = val
168 elif not (this_interval_s > 0):
170 "BUG? interval stat given with duration {!r}".
171 format(this_interval_s))
174 stat = stat + '__rate'
175 val = val / this_interval_s
176 if stat in ['user+sys__rate', 'tx+rx__rate']:
177 task.series[category, stat].append(
178 (timestamp - self.starttime, val))
181 task.series[category, stat].append(
182 (timestamp - self.starttime, val))
183 self.task_stats[task_id][category][stat] = val
184 if val > self.stats_max[category][stat]:
185 self.stats_max[category][stat] = val
186 except Exception as e:
187 logger.info('Skipping malformed line: {}Error was: {}\n'.format(line, e))
188 logger.debug('%s: done parsing', self.label)
190 self.job_tot = collections.defaultdict(
191 functools.partial(collections.defaultdict, int))
192 for task_id, task_stat in self.task_stats.iteritems():
193 for category, stat_last in task_stat.iteritems():
194 for stat, val in stat_last.iteritems():
195 if stat in ['cpus', 'cache', 'swap', 'rss']:
196 # meaningless stats like 16 cpu cores x 5 tasks = 80
198 self.job_tot[category][stat] += val
199 logger.debug('%s: done totals', self.label)
201 def long_label(self):
204 label += ' -- elapsed time '
205 s = (self.finishtime - self.starttime).total_seconds()
207 label += '{}d'.format(int(s/86400))
209 label += '{}h'.format(int(s/3600) % 24)
211 label += '{}m'.format(int(s/60) % 60)
212 label += '{}s'.format(int(s) % 60)
215 def text_report(self):
217 return "(no report generated)\n"
218 return "\n".join(itertools.chain(
219 self._text_report_gen(),
220 self._recommend_gen())) + "\n"
222 def html_report(self):
223 return WEBCHART_CLASS(self.label, [self]).html()
225 def _text_report_gen(self):
226 yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
227 for category, stat_max in sorted(self.stats_max.iteritems()):
228 for stat, val in sorted(stat_max.iteritems()):
229 if stat.endswith('__rate'):
231 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
232 val = self._format(val)
233 tot = self._format(self.job_tot[category].get(stat, '-'))
234 yield "\t".join([category, stat, str(val), max_rate, tot])
236 ('Number of tasks: {}',
239 ('Max CPU time spent by a single task: {}s',
240 self.stats_max['cpu']['user+sys'],
242 ('Max CPU usage in a single interval: {}%',
243 self.stats_max['cpu']['user+sys__rate'],
245 ('Overall CPU usage: {}%',
246 self.job_tot['cpu']['user+sys'] /
247 self.job_tot['time']['elapsed']
248 if self.job_tot['time']['elapsed'] > 0 else 0,
250 ('Max memory used by a single task: {}GB',
251 self.stats_max['mem']['rss'],
253 ('Max network traffic in a single task: {}GB',
254 self.stats_max['net:eth0']['tx+rx'] +
255 self.stats_max['net:keep0']['tx+rx'],
257 ('Max network speed in a single interval: {}MB/s',
258 self.stats_max['net:eth0']['tx+rx__rate'] +
259 self.stats_max['net:keep0']['tx+rx__rate'],
261 ('Keep cache miss rate {}%',
262 (float(self.job_tot['keepcache']['miss']) /
263 float(self.job_tot['keepcalls']['get']))
264 if self.job_tot['keepcalls']['get'] > 0 else 0,
265 lambda x: x * 100.0),
266 ('Keep cache utilization {}%',
267 (float(self.job_tot['blkio:0:0']['read']) /
268 float(self.job_tot['net:keep0']['rx']))
269 if self.job_tot['net:keep0']['rx'] > 0 else 0,
270 lambda x: x * 100.0)):
271 format_string, val, transform = args
272 if val == float('-Inf'):
276 yield "# "+format_string.format(self._format(val))
278 def _recommend_gen(self):
279 return itertools.chain(
280 self._recommend_cpu(),
281 self._recommend_ram(),
282 self._recommend_keep_cache())
284 def _recommend_cpu(self):
285 """Recommend asking for 4 cores if max CPU usage was 333%"""
287 cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
288 if cpu_max_rate == float('-Inf'):
289 logger.warning('%s: no CPU usage data', self.label)
291 used_cores = max(1, int(math.ceil(cpu_max_rate)))
292 asked_cores = self.existing_constraints.get('min_cores_per_node')
293 if asked_cores is None or used_cores < asked_cores:
295 '#!! {} max CPU usage was {}% -- '
296 'try runtime_constraints "min_cores_per_node":{}'
299 int(math.ceil(cpu_max_rate*100)),
302 def _recommend_ram(self):
303 """Recommend an economical RAM constraint for this job.
305 Nodes that are advertised as "8 gibibytes" actually have what
306 we might call "8 nearlygibs" of memory available for jobs.
307 Here, we calculate a whole number of nearlygibs that would
308 have sufficed to run the job, then recommend requesting a node
309 with that number of nearlygibs (expressed as mebibytes).
311 Requesting a node with "nearly 8 gibibytes" is our best hope
312 of getting a node that actually has nearly 8 gibibytes
313 available. If the node manager is smart enough to account for
314 the discrepancy itself when choosing/creating a node, we'll
315 get an 8 GiB node with nearly 8 GiB available. Otherwise, the
316 advertised size of the next-size-smaller node (say, 6 GiB)
317 will be too low to satisfy our request, so we will effectively
318 get rounded up to 8 GiB.
320 For example, if we need 7500 MiB, we can ask for 7500 MiB, and
321 we will generally get a node that is advertised as "8 GiB" and
322 has at least 7500 MiB available. However, asking for 8192 MiB
323 would either result in an unnecessarily expensive 12 GiB node
324 (if node manager knows about the discrepancy), or an 8 GiB
325 node which has less than 8192 MiB available and is therefore
326 considered by crunch-dispatch to be too small to meet our
329 When node manager learns how to predict the available memory
330 for each node type such that crunch-dispatch always agrees
331 that a node is big enough to run the job it was brought up
332 for, all this will be unnecessary. We'll just ask for exactly
333 the memory we want -- even if that happens to be 8192 MiB.
336 used_bytes = self.stats_max['mem']['rss']
337 if used_bytes == float('-Inf'):
338 logger.warning('%s: no memory usage data', self.label)
340 used_mib = math.ceil(float(used_bytes) / 1048576)
341 asked_mib = self.existing_constraints.get('min_ram_mb_per_node')
343 nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
344 if asked_mib is None or (
345 math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib)):
347 '#!! {} max RSS was {} MiB -- '
348 'try runtime_constraints "min_ram_mb_per_node":{}'
352 int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024))
354 def _recommend_keep_cache(self):
355 """Recommend increasing keep cache if utilization < 80%"""
356 if self.job_tot['net:keep0']['rx'] == 0:
358 utilization = (float(self.job_tot['blkio:0:0']['read']) /
359 float(self.job_tot['net:keep0']['rx']))
360 asked_mib = self.existing_constraints.get('keep_cache_mb_per_task', 256)
362 if utilization < 0.8:
364 '#!! {} Keep cache utilization was {:.2f}% -- '
365 'try runtime_constraints "keep_cache_mb_per_task":{} (or more)'
372 def _format(self, val):
373 """Return a string representation of a stat.
375 {:.2f} for floats, default format for everything else."""
376 if isinstance(val, float):
377 return '{:.2f}'.format(val)
379 return '{}'.format(val)
382 class CollectionSummarizer(Summarizer):
383 def __init__(self, collection_id, **kwargs):
384 super(CollectionSummarizer, self).__init__(
385 crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
386 self.label = collection_id
389 class JobSummarizer(Summarizer):
390 def __init__(self, job, **kwargs):
391 arv = arvados.api('v1')
392 if isinstance(job, basestring):
393 self.job = arv.jobs().get(uuid=job).execute()
397 if self.job.get('log'):
399 rdr = crunchstat_summary.reader.CollectionReader(self.job['log'])
400 except arvados.errors.NotFoundError as e:
401 logger.warning("Trying event logs after failing to read "
402 "log collection %s: %s", self.job['log'], e)
404 label = self.job['uuid']
406 rdr = crunchstat_summary.reader.LiveLogReader(self.job['uuid'])
407 label = self.job['uuid'] + ' (partial)'
408 super(JobSummarizer, self).__init__(rdr, **kwargs)
410 self.existing_constraints = self.job.get('runtime_constraints', {})
413 class PipelineSummarizer(object):
414 def __init__(self, pipeline_instance_uuid, **kwargs):
415 arv = arvados.api('v1', model=OrderedJsonModel())
416 instance = arv.pipeline_instances().get(
417 uuid=pipeline_instance_uuid).execute()
418 self.summarizers = collections.OrderedDict()
419 for cname, component in instance['components'].iteritems():
420 if 'job' not in component:
422 "%s: skipping component with no job assigned", cname)
425 "%s: job %s", cname, component['job']['uuid'])
426 summarizer = JobSummarizer(component['job'], **kwargs)
427 summarizer.label = '{} {}'.format(
428 cname, component['job']['uuid'])
429 self.summarizers[cname] = summarizer
430 self.label = pipeline_instance_uuid
434 for summarizer in self.summarizers.itervalues():
435 t = threading.Thread(target=summarizer.run)
442 def text_report(self):
444 for cname, summarizer in self.summarizers.iteritems():
445 txt += '### Summary for {} ({})\n'.format(
446 cname, summarizer.job['uuid'])
447 txt += summarizer.text_report()
451 def html_report(self):
452 return WEBCHART_CLASS(self.label, self.summarizers.itervalues()).html()