1 from __future__ import print_function
5 import crunchstat_summary.chartjs
6 import crunchstat_summary.reader
15 from arvados.api import OrderedJsonModel
16 from crunchstat_summary import logger
18 # Recommend memory constraints that are this multiple of an integral
19 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
20 # that have amounts like 7.5 GiB according to the kernel.)
21 AVAILABLE_RAM_RATIO = 0.95
27 self.series = collections.defaultdict(list)
30 class Summarizer(object):
31 def __init__(self, logdata, label=None, skip_child_jobs=False):
32 self._logdata = logdata
36 self.finishtime = None
37 self._skip_child_jobs = skip_child_jobs
39 # stats_max: {category: {stat: val}}
40 self.stats_max = collections.defaultdict(
41 functools.partial(collections.defaultdict, lambda: 0))
42 # task_stats: {task_id: {category: {stat: val}}}
43 self.task_stats = collections.defaultdict(
44 functools.partial(collections.defaultdict, dict))
47 self.tasks = collections.defaultdict(Task)
49 # We won't bother recommending new runtime constraints if the
50 # constraints given when running the job are known to us and
51 # are already suitable. If applicable, the subclass
52 # constructor will overwrite this with something useful.
53 self.existing_constraints = {}
55 logger.debug("%s: logdata %s", self.label, logdata)
58 logger.debug("%s: parsing logdata %s", self.label, self._logdata)
59 for line in self._logdata:
60 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
62 seq = int(m.group('seq'))
63 uuid = m.group('task_uuid')
64 self.seq_to_uuid[seq] = uuid
65 logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
68 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
70 task_id = self.seq_to_uuid[int(m.group('seq'))]
71 elapsed = int(m.group('elapsed'))
72 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
73 if elapsed > self.stats_max['time']['elapsed']:
74 self.stats_max['time']['elapsed'] = elapsed
77 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
79 uuid = m.group('uuid')
80 if self._skip_child_jobs:
81 logger.warning('%s: omitting stats from child job %s'
82 ' because --skip-child-jobs flag is on',
85 logger.debug('%s: follow %s', self.label, uuid)
86 child_summarizer = JobSummarizer(uuid)
87 child_summarizer.stats_max = self.stats_max
88 child_summarizer.task_stats = self.task_stats
89 child_summarizer.tasks = self.tasks
90 child_summarizer.starttime = self.starttime
91 child_summarizer.run()
92 logger.debug('%s: done %s', self.label, uuid)
95 m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
99 if self.label is None:
100 self.label = m.group('job_uuid')
101 logger.debug('%s: using job uuid as label', self.label)
102 if m.group('category').endswith(':'):
103 # "stderr crunchstat: notice: ..."
105 elif m.group('category') in ('error', 'caught'):
107 elif m.group('category') == 'read':
108 # "stderr crunchstat: read /proc/1234/net/dev: ..."
109 # (crunchstat formatting fixed, but old logs still say this)
111 task_id = self.seq_to_uuid[int(m.group('seq'))]
112 task = self.tasks[task_id]
114 # Use the first and last crunchstat timestamps as
115 # approximations of starttime and finishtime.
116 timestamp = datetime.datetime.strptime(
117 m.group('timestamp'), '%Y-%m-%d_%H:%M:%S')
118 if not task.starttime:
119 task.starttime = timestamp
120 logger.debug('%s: task %s starttime %s',
121 self.label, task_id, timestamp)
122 task.finishtime = timestamp
124 if not self.starttime:
125 self.starttime = timestamp
126 self.finishtime = timestamp
128 this_interval_s = None
129 for group in ['current', 'interval']:
130 if not m.group(group):
132 category = m.group('category')
133 words = m.group(group).split(' ')
135 for val, stat in zip(words[::2], words[1::2]):
138 stats[stat] = float(val)
140 stats[stat] = int(val)
141 except ValueError as e:
143 'Error parsing {} stat in "{}": {!r}'.format(
145 if 'user' in stats or 'sys' in stats:
146 stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
147 if 'tx' in stats or 'rx' in stats:
148 stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
149 for stat, val in stats.iteritems():
150 if group == 'interval':
151 if stat == 'seconds':
152 this_interval_s = val
154 elif not (this_interval_s > 0):
156 "BUG? interval stat given with duration {!r}".
157 format(this_interval_s))
160 stat = stat + '__rate'
161 val = val / this_interval_s
162 if stat in ['user+sys__rate', 'tx+rx__rate']:
163 task.series[category, stat].append(
164 (timestamp - self.starttime, val))
167 task.series[category, stat].append(
168 (timestamp - self.starttime, val))
169 self.task_stats[task_id][category][stat] = val
170 if val > self.stats_max[category][stat]:
171 self.stats_max[category][stat] = val
172 logger.debug('%s: done parsing', self.label)
174 self.job_tot = collections.defaultdict(
175 functools.partial(collections.defaultdict, int))
176 for task_id, task_stat in self.task_stats.iteritems():
177 for category, stat_last in task_stat.iteritems():
178 for stat, val in stat_last.iteritems():
179 if stat in ['cpus', 'cache', 'swap', 'rss']:
180 # meaningless stats like 16 cpu cores x 5 tasks = 80
182 self.job_tot[category][stat] += val
183 logger.debug('%s: done totals', self.label)
185 def long_label(self):
188 label += ' -- elapsed time '
189 s = (self.finishtime - self.starttime).total_seconds()
191 label += '{}d'.format(int(s/86400))
193 label += '{}h'.format(int(s/3600) % 24)
195 label += '{}m'.format(int(s/60) % 60)
196 label += '{}s'.format(int(s) % 60)
199 def text_report(self):
201 return "(no report generated)\n"
202 return "\n".join(itertools.chain(
203 self._text_report_gen(),
204 self._recommend_gen())) + "\n"
206 def html_report(self):
207 return crunchstat_summary.chartjs.ChartJS(self.label, [self]).html()
209 def _text_report_gen(self):
210 yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
211 for category, stat_max in sorted(self.stats_max.iteritems()):
212 for stat, val in sorted(stat_max.iteritems()):
213 if stat.endswith('__rate'):
215 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
216 val = self._format(val)
217 tot = self._format(self.job_tot[category].get(stat, '-'))
218 yield "\t".join([category, stat, str(val), max_rate, tot])
220 ('Number of tasks: {}',
223 ('Max CPU time spent by a single task: {}s',
224 self.stats_max['cpu']['user+sys'],
226 ('Max CPU usage in a single interval: {}%',
227 self.stats_max['cpu']['user+sys__rate'],
229 ('Overall CPU usage: {}%',
230 self.job_tot['cpu']['user+sys'] /
231 self.job_tot['time']['elapsed']
232 if self.job_tot['time']['elapsed'] > 0 else 0,
234 ('Max memory used by a single task: {}GB',
235 self.stats_max['mem']['rss'],
237 ('Max network traffic in a single task: {}GB',
238 self.stats_max['net:eth0']['tx+rx'] +
239 self.stats_max['net:keep0']['tx+rx'],
241 ('Max network speed in a single interval: {}MB/s',
242 self.stats_max['net:eth0']['tx+rx__rate'] +
243 self.stats_max['net:keep0']['tx+rx__rate'],
245 ('Keep cache miss rate {}%',
246 (float(self.job_tot['keepcache']['miss']) /
247 float(self.job_tot['keepcalls']['get']))
248 if self.job_tot['keepcalls']['get'] > 0 else 0,
249 lambda x: x * 100.0),
250 ('Keep cache utilization {}%',
251 (float(self.job_tot['blkio:0:0']['read']) /
252 float(self.job_tot['net:keep0']['rx']))
253 if self.job_tot['net:keep0']['rx'] > 0 else 0,
254 lambda x: x * 100.0)):
255 format_string, val, transform = args
256 if val == float('-Inf'):
260 yield "# "+format_string.format(self._format(val))
262 def _recommend_gen(self):
263 return itertools.chain(
264 self._recommend_cpu(),
265 self._recommend_ram(),
266 self._recommend_keep_cache())
268 def _recommend_cpu(self):
269 """Recommend asking for 4 cores if max CPU usage was 333%"""
271 cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
272 if cpu_max_rate == float('-Inf'):
273 logger.warning('%s: no CPU usage data', self.label)
275 used_cores = max(1, int(math.ceil(cpu_max_rate)))
276 asked_cores = self.existing_constraints.get('min_cores_per_node')
277 if asked_cores is None or used_cores < asked_cores:
279 '#!! {} max CPU usage was {}% -- '
280 'try runtime_constraints "min_cores_per_node":{}'
283 int(math.ceil(cpu_max_rate*100)),
286 def _recommend_ram(self):
287 """Recommend an economical RAM constraint for this job.
289 Nodes that are advertised as "8 gibibytes" actually have what
290 we might call "8 nearlygibs" of memory available for jobs.
291 Here, we calculate a whole number of nearlygibs that would
292 have sufficed to run the job, then recommend requesting a node
293 with that number of nearlygibs (expressed as mebibytes).
295 Requesting a node with "nearly 8 gibibytes" is our best hope
296 of getting a node that actually has nearly 8 gibibytes
297 available. If the node manager is smart enough to account for
298 the discrepancy itself when choosing/creating a node, we'll
299 get an 8 GiB node with nearly 8 GiB available. Otherwise, the
300 advertised size of the next-size-smaller node (say, 6 GiB)
301 will be too low to satisfy our request, so we will effectively
302 get rounded up to 8 GiB.
304 For example, if we need 7500 MiB, we can ask for 7500 MiB, and
305 we will generally get a node that is advertised as "8 GiB" and
306 has at least 7500 MiB available. However, asking for 8192 MiB
307 would either result in an unnecessarily expensive 12 GiB node
308 (if node manager knows about the discrepancy), or an 8 GiB
309 node which has less than 8192 MiB available and is therefore
310 considered by crunch-dispatch to be too small to meet our
313 When node manager learns how to predict the available memory
314 for each node type such that crunch-dispatch always agrees
315 that a node is big enough to run the job it was brought up
316 for, all this will be unnecessary. We'll just ask for exactly
317 the memory we want -- even if that happens to be 8192 MiB.
320 used_bytes = self.stats_max['mem']['rss']
321 if used_bytes == float('-Inf'):
322 logger.warning('%s: no memory usage data', self.label)
324 used_mib = math.ceil(float(used_bytes) / 1048576)
325 asked_mib = self.existing_constraints.get('min_ram_mb_per_node')
327 nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
328 if asked_mib is None or (
329 math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib)):
331 '#!! {} max RSS was {} MiB -- '
332 'try runtime_constraints "min_ram_mb_per_node":{}'
336 int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024))
338 def _recommend_keep_cache(self):
339 """Recommend increasing keep cache if utilization < 80%"""
340 if self.job_tot['net:keep0']['rx'] == 0:
342 utilization = (float(self.job_tot['blkio:0:0']['read']) /
343 float(self.job_tot['net:keep0']['rx']))
344 asked_mib = self.existing_constraints.get('keep_cache_mb_per_task', 256)
346 if utilization < 0.8:
348 '#!! {} Keep cache utilization was {:.2f}% -- '
349 'try runtime_constraints "keep_cache_mb_per_task":{} (or more)'
356 def _format(self, val):
357 """Return a string representation of a stat.
359 {:.2f} for floats, default format for everything else."""
360 if isinstance(val, float):
361 return '{:.2f}'.format(val)
363 return '{}'.format(val)
366 class CollectionSummarizer(Summarizer):
367 def __init__(self, collection_id, **kwargs):
368 super(CollectionSummarizer, self).__init__(
369 crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
370 self.label = collection_id
373 class JobSummarizer(Summarizer):
374 def __init__(self, job, **kwargs):
375 arv = arvados.api('v1')
376 if isinstance(job, basestring):
377 self.job = arv.jobs().get(uuid=job).execute()
381 if self.job.get('log'):
383 rdr = crunchstat_summary.reader.CollectionReader(self.job['log'])
384 except arvados.errors.NotFoundError as e:
385 logger.warning("Trying event logs after failing to read "
386 "log collection %s: %s", self.job['log'], e)
388 label = self.job['uuid']
390 rdr = crunchstat_summary.reader.LiveLogReader(self.job['uuid'])
391 label = self.job['uuid'] + ' (partial)'
392 super(JobSummarizer, self).__init__(rdr, **kwargs)
394 self.existing_constraints = self.job.get('runtime_constraints', {})
397 class PipelineSummarizer(object):
398 def __init__(self, pipeline_instance_uuid, **kwargs):
399 arv = arvados.api('v1', model=OrderedJsonModel())
400 instance = arv.pipeline_instances().get(
401 uuid=pipeline_instance_uuid).execute()
402 self.summarizers = collections.OrderedDict()
403 for cname, component in instance['components'].iteritems():
404 if 'job' not in component:
406 "%s: skipping component with no job assigned", cname)
409 "%s: job %s", cname, component['job']['uuid'])
410 summarizer = JobSummarizer(component['job'], **kwargs)
411 summarizer.label = '{} {}'.format(
412 cname, component['job']['uuid'])
413 self.summarizers[cname] = summarizer
414 self.label = pipeline_instance_uuid
418 for summarizer in self.summarizers.itervalues():
419 t = threading.Thread(target=summarizer.run)
426 def text_report(self):
428 for cname, summarizer in self.summarizers.iteritems():
429 txt += '### Summary for {} ({})\n'.format(
430 cname, summarizer.job['uuid'])
431 txt += summarizer.text_report()
435 def html_report(self):
436 return crunchstat_summary.chartjs.ChartJS(
437 self.label, self.summarizers.itervalues()).html()