1 from __future__ import print_function
5 import crunchstat_summary.chartjs
6 import crunchstat_summary.reader
16 from arvados.api import OrderedJsonModel
17 from crunchstat_summary import logger
19 # Recommend memory constraints that are this multiple of an integral
20 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
21 # that have amounts like 7.5 GiB according to the kernel.)
22 AVAILABLE_RAM_RATIO = 0.95
25 # Workaround datetime.datetime.strptime() thread-safety bug by calling
26 # it once before starting threads. https://bugs.python.org/issue7980
27 datetime.datetime.strptime('1999-12-31_23:59:59', '%Y-%m-%d_%H:%M:%S')
33 self.series = collections.defaultdict(list)
36 class Summarizer(object):
37 def __init__(self, logdata, label=None, skip_child_jobs=False):
38 self._logdata = logdata
42 self.finishtime = None
43 self._skip_child_jobs = skip_child_jobs
45 # stats_max: {category: {stat: val}}
46 self.stats_max = collections.defaultdict(
47 functools.partial(collections.defaultdict, lambda: 0))
48 # task_stats: {task_id: {category: {stat: val}}}
49 self.task_stats = collections.defaultdict(
50 functools.partial(collections.defaultdict, dict))
53 self.tasks = collections.defaultdict(Task)
55 # We won't bother recommending new runtime constraints if the
56 # constraints given when running the job are known to us and
57 # are already suitable. If applicable, the subclass
58 # constructor will overwrite this with something useful.
59 self.existing_constraints = {}
61 logger.debug("%s: logdata %s", self.label, logdata)
64 logger.debug("%s: parsing logdata %s", self.label, self._logdata)
65 for line in self._logdata:
66 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
68 seq = int(m.group('seq'))
69 uuid = m.group('task_uuid')
70 self.seq_to_uuid[seq] = uuid
71 logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
74 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
76 task_id = self.seq_to_uuid[int(m.group('seq'))]
77 elapsed = int(m.group('elapsed'))
78 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
79 if elapsed > self.stats_max['time']['elapsed']:
80 self.stats_max['time']['elapsed'] = elapsed
83 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
85 uuid = m.group('uuid')
86 if self._skip_child_jobs:
87 logger.warning('%s: omitting stats from child job %s'
88 ' because --skip-child-jobs flag is on',
91 logger.debug('%s: follow %s', self.label, uuid)
92 child_summarizer = JobSummarizer(uuid)
93 child_summarizer.stats_max = self.stats_max
94 child_summarizer.task_stats = self.task_stats
95 child_summarizer.tasks = self.tasks
96 child_summarizer.starttime = self.starttime
97 child_summarizer.run()
98 logger.debug('%s: done %s', self.label, uuid)
101 m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
105 if self.label is None:
106 self.label = m.group('job_uuid')
107 logger.debug('%s: using job uuid as label', self.label)
108 if m.group('category').endswith(':'):
109 # "stderr crunchstat: notice: ..."
111 elif m.group('category') in ('error', 'caught'):
113 elif m.group('category') == 'read':
114 # "stderr crunchstat: read /proc/1234/net/dev: ..."
115 # (crunchstat formatting fixed, but old logs still say this)
117 task_id = self.seq_to_uuid[int(m.group('seq'))]
118 task = self.tasks[task_id]
120 # Use the first and last crunchstat timestamps as
121 # approximations of starttime and finishtime.
122 timestamp = datetime.datetime.strptime(
123 m.group('timestamp'), '%Y-%m-%d_%H:%M:%S')
124 if not task.starttime:
125 task.starttime = timestamp
126 logger.debug('%s: task %s starttime %s',
127 self.label, task_id, timestamp)
128 task.finishtime = timestamp
130 if not self.starttime:
131 self.starttime = timestamp
132 self.finishtime = timestamp
134 this_interval_s = None
135 for group in ['current', 'interval']:
136 if not m.group(group):
138 category = m.group('category')
139 words = m.group(group).split(' ')
141 for val, stat in zip(words[::2], words[1::2]):
144 stats[stat] = float(val)
146 stats[stat] = int(val)
147 except ValueError as e:
149 'Error parsing {} stat in "{}": {!r}'.format(
151 if 'user' in stats or 'sys' in stats:
152 stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
153 if 'tx' in stats or 'rx' in stats:
154 stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
155 for stat, val in stats.iteritems():
156 if group == 'interval':
157 if stat == 'seconds':
158 this_interval_s = val
160 elif not (this_interval_s > 0):
162 "BUG? interval stat given with duration {!r}".
163 format(this_interval_s))
166 stat = stat + '__rate'
167 val = val / this_interval_s
168 if stat in ['user+sys__rate', 'tx+rx__rate']:
169 task.series[category, stat].append(
170 (timestamp - self.starttime, val))
173 task.series[category, stat].append(
174 (timestamp - self.starttime, val))
175 self.task_stats[task_id][category][stat] = val
176 if val > self.stats_max[category][stat]:
177 self.stats_max[category][stat] = val
178 logger.debug('%s: done parsing', self.label)
180 self.job_tot = collections.defaultdict(
181 functools.partial(collections.defaultdict, int))
182 for task_id, task_stat in self.task_stats.iteritems():
183 for category, stat_last in task_stat.iteritems():
184 for stat, val in stat_last.iteritems():
185 if stat in ['cpus', 'cache', 'swap', 'rss']:
186 # meaningless stats like 16 cpu cores x 5 tasks = 80
188 self.job_tot[category][stat] += val
189 logger.debug('%s: done totals', self.label)
191 def long_label(self):
194 label += ' -- elapsed time '
195 s = (self.finishtime - self.starttime).total_seconds()
197 label += '{}d'.format(int(s/86400))
199 label += '{}h'.format(int(s/3600) % 24)
201 label += '{}m'.format(int(s/60) % 60)
202 label += '{}s'.format(int(s) % 60)
205 def text_report(self):
207 return "(no report generated)\n"
208 return "\n".join(itertools.chain(
209 self._text_report_gen(),
210 self._recommend_gen())) + "\n"
212 def html_report(self):
213 return crunchstat_summary.chartjs.ChartJS(self.label, [self]).html()
215 def _text_report_gen(self):
216 yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
217 for category, stat_max in sorted(self.stats_max.iteritems()):
218 for stat, val in sorted(stat_max.iteritems()):
219 if stat.endswith('__rate'):
221 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
222 val = self._format(val)
223 tot = self._format(self.job_tot[category].get(stat, '-'))
224 yield "\t".join([category, stat, str(val), max_rate, tot])
226 ('Number of tasks: {}',
229 ('Max CPU time spent by a single task: {}s',
230 self.stats_max['cpu']['user+sys'],
232 ('Max CPU usage in a single interval: {}%',
233 self.stats_max['cpu']['user+sys__rate'],
235 ('Overall CPU usage: {}%',
236 self.job_tot['cpu']['user+sys'] /
237 self.job_tot['time']['elapsed']
238 if self.job_tot['time']['elapsed'] > 0 else 0,
240 ('Max memory used by a single task: {}GB',
241 self.stats_max['mem']['rss'],
243 ('Max network traffic in a single task: {}GB',
244 self.stats_max['net:eth0']['tx+rx'] +
245 self.stats_max['net:keep0']['tx+rx'],
247 ('Max network speed in a single interval: {}MB/s',
248 self.stats_max['net:eth0']['tx+rx__rate'] +
249 self.stats_max['net:keep0']['tx+rx__rate'],
251 ('Keep cache miss rate {}%',
252 (float(self.job_tot['keepcache']['miss']) /
253 float(self.job_tot['keepcalls']['get']))
254 if self.job_tot['keepcalls']['get'] > 0 else 0,
255 lambda x: x * 100.0),
256 ('Keep cache utilization {}%',
257 (float(self.job_tot['blkio:0:0']['read']) /
258 float(self.job_tot['net:keep0']['rx']))
259 if self.job_tot['net:keep0']['rx'] > 0 else 0,
260 lambda x: x * 100.0)):
261 format_string, val, transform = args
262 if val == float('-Inf'):
266 yield "# "+format_string.format(self._format(val))
268 def _recommend_gen(self):
269 return itertools.chain(
270 self._recommend_cpu(),
271 self._recommend_ram(),
272 self._recommend_keep_cache())
274 def _recommend_cpu(self):
275 """Recommend asking for 4 cores if max CPU usage was 333%"""
277 cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
278 if cpu_max_rate == float('-Inf'):
279 logger.warning('%s: no CPU usage data', self.label)
281 used_cores = max(1, int(math.ceil(cpu_max_rate)))
282 asked_cores = self.existing_constraints.get('min_cores_per_node')
283 if asked_cores is None or used_cores < asked_cores:
285 '#!! {} max CPU usage was {}% -- '
286 'try runtime_constraints "min_cores_per_node":{}'
289 int(math.ceil(cpu_max_rate*100)),
292 def _recommend_ram(self):
293 """Recommend an economical RAM constraint for this job.
295 Nodes that are advertised as "8 gibibytes" actually have what
296 we might call "8 nearlygibs" of memory available for jobs.
297 Here, we calculate a whole number of nearlygibs that would
298 have sufficed to run the job, then recommend requesting a node
299 with that number of nearlygibs (expressed as mebibytes).
301 Requesting a node with "nearly 8 gibibytes" is our best hope
302 of getting a node that actually has nearly 8 gibibytes
303 available. If the node manager is smart enough to account for
304 the discrepancy itself when choosing/creating a node, we'll
305 get an 8 GiB node with nearly 8 GiB available. Otherwise, the
306 advertised size of the next-size-smaller node (say, 6 GiB)
307 will be too low to satisfy our request, so we will effectively
308 get rounded up to 8 GiB.
310 For example, if we need 7500 MiB, we can ask for 7500 MiB, and
311 we will generally get a node that is advertised as "8 GiB" and
312 has at least 7500 MiB available. However, asking for 8192 MiB
313 would either result in an unnecessarily expensive 12 GiB node
314 (if node manager knows about the discrepancy), or an 8 GiB
315 node which has less than 8192 MiB available and is therefore
316 considered by crunch-dispatch to be too small to meet our
319 When node manager learns how to predict the available memory
320 for each node type such that crunch-dispatch always agrees
321 that a node is big enough to run the job it was brought up
322 for, all this will be unnecessary. We'll just ask for exactly
323 the memory we want -- even if that happens to be 8192 MiB.
326 used_bytes = self.stats_max['mem']['rss']
327 if used_bytes == float('-Inf'):
328 logger.warning('%s: no memory usage data', self.label)
330 used_mib = math.ceil(float(used_bytes) / 1048576)
331 asked_mib = self.existing_constraints.get('min_ram_mb_per_node')
333 nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
334 if asked_mib is None or (
335 math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib)):
337 '#!! {} max RSS was {} MiB -- '
338 'try runtime_constraints "min_ram_mb_per_node":{}'
342 int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024))
344 def _recommend_keep_cache(self):
345 """Recommend increasing keep cache if utilization < 80%"""
346 if self.job_tot['net:keep0']['rx'] == 0:
348 utilization = (float(self.job_tot['blkio:0:0']['read']) /
349 float(self.job_tot['net:keep0']['rx']))
350 asked_mib = self.existing_constraints.get('keep_cache_mb_per_task', 256)
352 if utilization < 0.8:
354 '#!! {} Keep cache utilization was {:.2f}% -- '
355 'try runtime_constraints "keep_cache_mb_per_task":{} (or more)'
362 def _format(self, val):
363 """Return a string representation of a stat.
365 {:.2f} for floats, default format for everything else."""
366 if isinstance(val, float):
367 return '{:.2f}'.format(val)
369 return '{}'.format(val)
372 class CollectionSummarizer(Summarizer):
373 def __init__(self, collection_id, **kwargs):
374 super(CollectionSummarizer, self).__init__(
375 crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
376 self.label = collection_id
379 class JobSummarizer(Summarizer):
380 def __init__(self, job, **kwargs):
381 arv = arvados.api('v1')
382 if isinstance(job, basestring):
383 self.job = arv.jobs().get(uuid=job).execute()
387 if self.job.get('log'):
389 rdr = crunchstat_summary.reader.CollectionReader(self.job['log'])
390 except arvados.errors.NotFoundError as e:
391 logger.warning("Trying event logs after failing to read "
392 "log collection %s: %s", self.job['log'], e)
394 label = self.job['uuid']
396 rdr = crunchstat_summary.reader.LiveLogReader(self.job['uuid'])
397 label = self.job['uuid'] + ' (partial)'
398 super(JobSummarizer, self).__init__(rdr, **kwargs)
400 self.existing_constraints = self.job.get('runtime_constraints', {})
403 class PipelineSummarizer(object):
404 def __init__(self, pipeline_instance_uuid, **kwargs):
405 arv = arvados.api('v1', model=OrderedJsonModel())
406 instance = arv.pipeline_instances().get(
407 uuid=pipeline_instance_uuid).execute()
408 self.summarizers = collections.OrderedDict()
409 for cname, component in instance['components'].iteritems():
410 if 'job' not in component:
412 "%s: skipping component with no job assigned", cname)
415 "%s: job %s", cname, component['job']['uuid'])
416 summarizer = JobSummarizer(component['job'], **kwargs)
417 summarizer.label = '{} {}'.format(
418 cname, component['job']['uuid'])
419 self.summarizers[cname] = summarizer
420 self.label = pipeline_instance_uuid
424 for summarizer in self.summarizers.itervalues():
425 t = threading.Thread(target=summarizer.run)
432 def text_report(self):
434 for cname, summarizer in self.summarizers.iteritems():
435 txt += '### Summary for {} ({})\n'.format(
436 cname, summarizer.job['uuid'])
437 txt += summarizer.text_report()
441 def html_report(self):
442 return crunchstat_summary.chartjs.ChartJS(
443 self.label, self.summarizers.itervalues()).html()