1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
5 from __future__ import print_function
9 import crunchstat_summary.dygraphs
10 import crunchstat_summary.reader
20 from arvados.api import OrderedJsonModel
21 from crunchstat_summary import logger
23 # Recommend memory constraints that are this multiple of an integral
24 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
25 # that have amounts like 7.5 GiB according to the kernel.)
26 AVAILABLE_RAM_RATIO = 0.95
29 # Workaround datetime.datetime.strptime() thread-safety bug by calling
30 # it once before starting threads. https://bugs.python.org/issue7980
31 datetime.datetime.strptime('1999-12-31_23:59:59', '%Y-%m-%d_%H:%M:%S')
34 WEBCHART_CLASS = crunchstat_summary.dygraphs.DygraphsChart
40 self.series = collections.defaultdict(list)
43 class Summarizer(object):
44 def __init__(self, logdata, label=None, skip_child_jobs=False):
45 self._logdata = logdata
49 self.finishtime = None
50 self._skip_child_jobs = skip_child_jobs
52 # stats_max: {category: {stat: val}}
53 self.stats_max = collections.defaultdict(
54 functools.partial(collections.defaultdict, lambda: 0))
55 # task_stats: {task_id: {category: {stat: val}}}
56 self.task_stats = collections.defaultdict(
57 functools.partial(collections.defaultdict, dict))
60 self.tasks = collections.defaultdict(Task)
62 # We won't bother recommending new runtime constraints if the
63 # constraints given when running the job are known to us and
64 # are already suitable. If applicable, the subclass
65 # constructor will overwrite this with something useful.
66 self.existing_constraints = {}
68 logger.debug("%s: logdata %s", self.label, logdata)
71 logger.debug("%s: parsing logdata %s", self.label, self._logdata)
72 for line in self._logdata:
73 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
75 seq = int(m.group('seq'))
76 uuid = m.group('task_uuid')
77 self.seq_to_uuid[seq] = uuid
78 logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
81 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
83 task_id = self.seq_to_uuid[int(m.group('seq'))]
84 elapsed = int(m.group('elapsed'))
85 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
86 if elapsed > self.stats_max['time']['elapsed']:
87 self.stats_max['time']['elapsed'] = elapsed
90 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
92 uuid = m.group('uuid')
93 if self._skip_child_jobs:
94 logger.warning('%s: omitting stats from child job %s'
95 ' because --skip-child-jobs flag is on',
98 logger.debug('%s: follow %s', self.label, uuid)
99 child_summarizer = ProcessSummarizer(uuid)
100 child_summarizer.stats_max = self.stats_max
101 child_summarizer.task_stats = self.task_stats
102 child_summarizer.tasks = self.tasks
103 child_summarizer.starttime = self.starttime
104 child_summarizer.run()
105 logger.debug('%s: done %s', self.label, uuid)
108 m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
111 task_id = self.seq_to_uuid[int(m.group('seq'))]
113 m = re.search(r'^(?P<timestamp>\S+) (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
115 # crunch2 container (seq/task don't apply)
116 task_id = 'container'
118 # not a crunchstat log
120 task = self.tasks[task_id]
122 if self.label is None:
124 self.label = m.group('job_uuid')
126 self.label = 'container'
127 if m.group('category').endswith(':'):
128 # "stderr crunchstat: notice: ..."
130 elif m.group('category') in ('error', 'caught'):
132 elif m.group('category') == 'read':
133 # "stderr crunchstat: read /proc/1234/net/dev: ..."
134 # (crunchstat formatting fixed, but old logs still say this)
137 # Use the first and last crunchstat timestamps as
138 # approximations of starttime and finishtime.
139 timestamp = m.group('timestamp')
140 if timestamp[10:11] == '_':
141 timestamp = datetime.datetime.strptime(
142 timestamp, '%Y-%m-%d_%H:%M:%S')
143 elif timestamp[10:11] == 'T':
144 timestamp = datetime.datetime.strptime(
145 timestamp[:19], '%Y-%m-%dT%H:%M:%S')
147 raise ValueError("Cannot parse timestamp {!r}".format(
150 if not task.starttime:
151 task.starttime = timestamp
152 logger.debug('%s: task %s starttime %s',
153 self.label, task_id, timestamp)
154 task.finishtime = timestamp
156 if not self.starttime:
157 self.starttime = timestamp
158 self.finishtime = timestamp
160 this_interval_s = None
161 for group in ['current', 'interval']:
162 if not m.group(group):
164 category = m.group('category')
165 words = m.group(group).split(' ')
168 for val, stat in zip(words[::2], words[1::2]):
170 stats[stat] = float(val)
172 stats[stat] = int(val)
173 except ValueError as e:
174 logger.warning('Error parsing {} stat: {!r}'.format(
177 if 'user' in stats or 'sys' in stats:
178 stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
179 if 'tx' in stats or 'rx' in stats:
180 stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
181 for stat, val in stats.iteritems():
182 if group == 'interval':
183 if stat == 'seconds':
184 this_interval_s = val
186 elif not (this_interval_s > 0):
188 "BUG? interval stat given with duration {!r}".
189 format(this_interval_s))
192 stat = stat + '__rate'
193 val = val / this_interval_s
194 if stat in ['user+sys__rate', 'tx+rx__rate']:
195 task.series[category, stat].append(
196 (timestamp - self.starttime, val))
199 task.series[category, stat].append(
200 (timestamp - self.starttime, val))
201 self.task_stats[task_id][category][stat] = val
202 if val > self.stats_max[category][stat]:
203 self.stats_max[category][stat] = val
204 logger.debug('%s: done parsing', self.label)
206 self.job_tot = collections.defaultdict(
207 functools.partial(collections.defaultdict, int))
208 for task_id, task_stat in self.task_stats.iteritems():
209 for category, stat_last in task_stat.iteritems():
210 for stat, val in stat_last.iteritems():
211 if stat in ['cpus', 'cache', 'swap', 'rss']:
212 # meaningless stats like 16 cpu cores x 5 tasks = 80
214 self.job_tot[category][stat] += val
215 logger.debug('%s: done totals', self.label)
217 def long_label(self):
220 label += ' -- elapsed time '
221 s = (self.finishtime - self.starttime).total_seconds()
223 label += '{}d'.format(int(s/86400))
225 label += '{}h'.format(int(s/3600) % 24)
227 label += '{}m'.format(int(s/60) % 60)
228 label += '{}s'.format(int(s) % 60)
231 def text_report(self):
233 return "(no report generated)\n"
234 return "\n".join(itertools.chain(
235 self._text_report_gen(),
236 self._recommend_gen())) + "\n"
238 def html_report(self):
239 return WEBCHART_CLASS(self.label, [self]).html()
241 def _text_report_gen(self):
242 yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
243 for category, stat_max in sorted(self.stats_max.iteritems()):
244 for stat, val in sorted(stat_max.iteritems()):
245 if stat.endswith('__rate'):
247 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
248 val = self._format(val)
249 tot = self._format(self.job_tot[category].get(stat, '-'))
250 yield "\t".join([category, stat, str(val), max_rate, tot])
252 ('Number of tasks: {}',
255 ('Max CPU time spent by a single task: {}s',
256 self.stats_max['cpu']['user+sys'],
258 ('Max CPU usage in a single interval: {}%',
259 self.stats_max['cpu']['user+sys__rate'],
261 ('Overall CPU usage: {}%',
262 self.job_tot['cpu']['user+sys'] /
263 self.job_tot['time']['elapsed']
264 if self.job_tot['time']['elapsed'] > 0 else 0,
266 ('Max memory used by a single task: {}GB',
267 self.stats_max['mem']['rss'],
269 ('Max network traffic in a single task: {}GB',
270 self.stats_max['net:eth0']['tx+rx'] +
271 self.stats_max['net:keep0']['tx+rx'],
273 ('Max network speed in a single interval: {}MB/s',
274 self.stats_max['net:eth0']['tx+rx__rate'] +
275 self.stats_max['net:keep0']['tx+rx__rate'],
277 ('Keep cache miss rate {}%',
278 (float(self.job_tot['keepcache']['miss']) /
279 float(self.job_tot['keepcalls']['get']))
280 if self.job_tot['keepcalls']['get'] > 0 else 0,
281 lambda x: x * 100.0),
282 ('Keep cache utilization {}%',
283 (float(self.job_tot['blkio:0:0']['read']) /
284 float(self.job_tot['net:keep0']['rx']))
285 if self.job_tot['net:keep0']['rx'] > 0 else 0,
286 lambda x: x * 100.0)):
287 format_string, val, transform = args
288 if val == float('-Inf'):
292 yield "# "+format_string.format(self._format(val))
294 def _recommend_gen(self):
295 return itertools.chain(
296 self._recommend_cpu(),
297 self._recommend_ram(),
298 self._recommend_keep_cache())
300 def _recommend_cpu(self):
301 """Recommend asking for 4 cores if max CPU usage was 333%"""
303 cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
304 if cpu_max_rate == float('-Inf'):
305 logger.warning('%s: no CPU usage data', self.label)
307 used_cores = max(1, int(math.ceil(cpu_max_rate)))
308 asked_cores = self.existing_constraints.get('min_cores_per_node')
309 if asked_cores is None or used_cores < asked_cores:
311 '#!! {} max CPU usage was {}% -- '
312 'try runtime_constraints "min_cores_per_node":{}'
315 int(math.ceil(cpu_max_rate*100)),
318 def _recommend_ram(self):
319 """Recommend an economical RAM constraint for this job.
321 Nodes that are advertised as "8 gibibytes" actually have what
322 we might call "8 nearlygibs" of memory available for jobs.
323 Here, we calculate a whole number of nearlygibs that would
324 have sufficed to run the job, then recommend requesting a node
325 with that number of nearlygibs (expressed as mebibytes).
327 Requesting a node with "nearly 8 gibibytes" is our best hope
328 of getting a node that actually has nearly 8 gibibytes
329 available. If the node manager is smart enough to account for
330 the discrepancy itself when choosing/creating a node, we'll
331 get an 8 GiB node with nearly 8 GiB available. Otherwise, the
332 advertised size of the next-size-smaller node (say, 6 GiB)
333 will be too low to satisfy our request, so we will effectively
334 get rounded up to 8 GiB.
336 For example, if we need 7500 MiB, we can ask for 7500 MiB, and
337 we will generally get a node that is advertised as "8 GiB" and
338 has at least 7500 MiB available. However, asking for 8192 MiB
339 would either result in an unnecessarily expensive 12 GiB node
340 (if node manager knows about the discrepancy), or an 8 GiB
341 node which has less than 8192 MiB available and is therefore
342 considered by crunch-dispatch to be too small to meet our
345 When node manager learns how to predict the available memory
346 for each node type such that crunch-dispatch always agrees
347 that a node is big enough to run the job it was brought up
348 for, all this will be unnecessary. We'll just ask for exactly
349 the memory we want -- even if that happens to be 8192 MiB.
352 used_bytes = self.stats_max['mem']['rss']
353 if used_bytes == float('-Inf'):
354 logger.warning('%s: no memory usage data', self.label)
356 used_mib = math.ceil(float(used_bytes) / 1048576)
357 asked_mib = self.existing_constraints.get('min_ram_mb_per_node')
359 nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
360 if asked_mib is None or (
361 math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib)):
363 '#!! {} max RSS was {} MiB -- '
364 'try runtime_constraints "min_ram_mb_per_node":{}'
368 int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024))
370 def _recommend_keep_cache(self):
371 """Recommend increasing keep cache if utilization < 80%"""
372 if self.job_tot['net:keep0']['rx'] == 0:
374 utilization = (float(self.job_tot['blkio:0:0']['read']) /
375 float(self.job_tot['net:keep0']['rx']))
376 asked_mib = self.existing_constraints.get('keep_cache_mb_per_task', 256)
378 if utilization < 0.8:
380 '#!! {} Keep cache utilization was {:.2f}% -- '
381 'try runtime_constraints "keep_cache_mb_per_task":{} (or more)'
388 def _format(self, val):
389 """Return a string representation of a stat.
391 {:.2f} for floats, default format for everything else."""
392 if isinstance(val, float):
393 return '{:.2f}'.format(val)
395 return '{}'.format(val)
398 class CollectionSummarizer(Summarizer):
399 def __init__(self, collection_id, **kwargs):
400 super(CollectionSummarizer, self).__init__(
401 crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
402 self.label = collection_id
405 def NewSummarizer(process, **kwargs):
406 """Construct with the appropriate subclass for this uuid/object."""
408 if not isinstance(process, dict):
411 arv = arvados.api('v1', model=OrderedJsonModel())
413 if re.search('-dz642-', uuid):
415 process = arv.containers().get(uuid=uuid).execute()
416 return ContainerSummarizer(process, **kwargs)
417 elif re.search('-xvhdp-', uuid):
419 ctrReq = arv.container_requests().get(uuid=uuid).execute()
420 ctrUUID = ctrReq['container_uuid']
421 process = arv.containers().get(uuid=ctrUUID).execute()
422 return ContainerSummarizer(process, **kwargs)
423 elif re.search('-8i9sb-', uuid):
425 process = arv.jobs().get(uuid=uuid).execute()
426 return JobSummarizer(process, **kwargs)
427 elif re.search('-d1hrv-', uuid):
429 process = arv.pipeline_instances().get(uuid=uuid).execute()
430 return PipelineSummarizer(process, **kwargs)
432 raise ArgumentError("Unrecognized uuid %s", uuid)
435 class ProcessSummarizer(Summarizer):
436 """Process is a job, pipeline, container, or container request."""
438 def __init__(self, process, **kwargs):
440 self.process = process
441 if self.process.get('log'):
443 rdr = crunchstat_summary.reader.CollectionReader(self.process['log'])
444 except arvados.errors.NotFoundError as e:
445 logger.warning("Trying event logs after failing to read "
446 "log collection %s: %s", self.process['log'], e)
448 label = self.process['uuid']
450 rdr = crunchstat_summary.reader.LiveLogReader(self.process['uuid'])
451 label = self.process['uuid'] + ' (partial)'
452 super(ProcessSummarizer, self).__init__(rdr, **kwargs)
454 self.existing_constraints = self.process.get('runtime_constraints', {})
457 class JobSummarizer(ProcessSummarizer):
461 class ContainerSummarizer(ProcessSummarizer):
465 class PipelineSummarizer(object):
466 def __init__(self, instance, **kwargs):
467 self.summarizers = collections.OrderedDict()
468 for cname, component in instance['components'].iteritems():
469 if 'job' not in component:
471 "%s: skipping component with no job assigned", cname)
474 "%s: job %s", cname, component['job']['uuid'])
475 summarizer = JobSummarizer(component['job'], **kwargs)
476 summarizer.label = '{} {}'.format(
477 cname, component['job']['uuid'])
478 self.summarizers[cname] = summarizer
479 self.label = instance['uuid']
483 for summarizer in self.summarizers.itervalues():
484 t = threading.Thread(target=summarizer.run)
491 def text_report(self):
493 for cname, summarizer in self.summarizers.iteritems():
494 txt += '### Summary for {} ({})\n'.format(
495 cname, summarizer.process['uuid'])
496 txt += summarizer.text_report()
500 def html_report(self):
501 return WEBCHART_CLASS(self.label, self.summarizers.itervalues()).html()