1 from __future__ import print_function
5 import crunchstat_summary.chartjs
6 import crunchstat_summary.reader
16 from arvados.api import OrderedJsonModel
17 from crunchstat_summary import logger
19 # Recommend memory constraints that are this multiple of an integral
20 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
21 # that have amounts like 7.5 GiB according to the kernel.)
22 AVAILABLE_RAM_RATIO = 0.95
25 # Workaround datetime.datetime.strptime() thread-safety bug by calling
26 # it once before starting threads. https://bugs.python.org/issue7980
27 datetime.datetime.strptime('1999-12-31_23:59:59', '%Y-%m-%d_%H:%M:%S')
33 self.series = collections.defaultdict(list)
36 class Summarizer(object):
37 def __init__(self, logdata, label=None, skip_child_jobs=False):
38 self._logdata = logdata
42 self.finishtime = None
43 self._skip_child_jobs = skip_child_jobs
45 # stats_max: {category: {stat: val}}
46 self.stats_max = collections.defaultdict(
47 functools.partial(collections.defaultdict, lambda: 0))
48 # task_stats: {task_id: {category: {stat: val}}}
49 self.task_stats = collections.defaultdict(
50 functools.partial(collections.defaultdict, dict))
53 self.tasks = collections.defaultdict(Task)
55 # We won't bother recommending new runtime constraints if the
56 # constraints given when running the job are known to us and
57 # are already suitable. If applicable, the subclass
58 # constructor will overwrite this with something useful.
59 self.existing_constraints = {}
61 logger.debug("%s: logdata %s", self.label, logdata)
64 logger.debug("%s: parsing logdata %s", self.label, self._logdata)
65 for line in self._logdata:
66 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
68 seq = int(m.group('seq'))
69 uuid = m.group('task_uuid')
70 self.seq_to_uuid[seq] = uuid
71 logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
74 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
76 task_id = self.seq_to_uuid[int(m.group('seq'))]
77 elapsed = int(m.group('elapsed'))
78 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
79 if elapsed > self.stats_max['time']['elapsed']:
80 self.stats_max['time']['elapsed'] = elapsed
83 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
85 uuid = m.group('uuid')
86 if self._skip_child_jobs:
87 logger.warning('%s: omitting stats from child job %s'
88 ' because --skip-child-jobs flag is on',
91 logger.debug('%s: follow %s', self.label, uuid)
92 child_summarizer = JobSummarizer(uuid)
93 child_summarizer.stats_max = self.stats_max
94 child_summarizer.task_stats = self.task_stats
95 child_summarizer.tasks = self.tasks
96 child_summarizer.starttime = self.starttime
97 child_summarizer.run()
98 logger.debug('%s: done %s', self.label, uuid)
101 m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
106 if self.label is None:
107 self.label = m.group('job_uuid')
108 logger.debug('%s: using job uuid as label', self.label)
109 if m.group('category').endswith(':'):
110 # "stderr crunchstat: notice: ..."
112 elif m.group('category') in ('error', 'caught'):
114 elif m.group('category') == 'read':
115 # "stderr crunchstat: read /proc/1234/net/dev: ..."
116 # (crunchstat formatting fixed, but old logs still say this)
118 task_id = self.seq_to_uuid[int(m.group('seq'))]
119 task = self.tasks[task_id]
121 # Use the first and last crunchstat timestamps as
122 # approximations of starttime and finishtime.
123 timestamp = datetime.datetime.strptime(
124 m.group('timestamp'), '%Y-%m-%d_%H:%M:%S')
125 if not task.starttime:
126 task.starttime = timestamp
127 logger.debug('%s: task %s starttime %s',
128 self.label, task_id, timestamp)
129 task.finishtime = timestamp
131 if not self.starttime:
132 self.starttime = timestamp
133 self.finishtime = timestamp
135 this_interval_s = None
136 for group in ['current', 'interval']:
137 if not m.group(group):
139 category = m.group('category')
140 words = m.group(group).split(' ')
142 for val, stat in zip(words[::2], words[1::2]):
145 stats[stat] = float(val)
147 stats[stat] = int(val)
148 except ValueError as e:
150 'Error parsing {} stat: {!r}'.format(
152 if 'user' in stats or 'sys' in stats:
153 stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
154 if 'tx' in stats or 'rx' in stats:
155 stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
156 for stat, val in stats.iteritems():
157 if group == 'interval':
158 if stat == 'seconds':
159 this_interval_s = val
161 elif not (this_interval_s > 0):
163 "BUG? interval stat given with duration {!r}".
164 format(this_interval_s))
167 stat = stat + '__rate'
168 val = val / this_interval_s
169 if stat in ['user+sys__rate', 'tx+rx__rate']:
170 task.series[category, stat].append(
171 (timestamp - self.starttime, val))
174 task.series[category, stat].append(
175 (timestamp - self.starttime, val))
176 self.task_stats[task_id][category][stat] = val
177 if val > self.stats_max[category][stat]:
178 self.stats_max[category][stat] = val
179 except Exception as e:
180 logger.info('Skipping malformed line: {}Error was: {}\n'.format(line, e))
181 logger.debug('%s: done parsing', self.label)
183 self.job_tot = collections.defaultdict(
184 functools.partial(collections.defaultdict, int))
185 for task_id, task_stat in self.task_stats.iteritems():
186 for category, stat_last in task_stat.iteritems():
187 for stat, val in stat_last.iteritems():
188 if stat in ['cpus', 'cache', 'swap', 'rss']:
189 # meaningless stats like 16 cpu cores x 5 tasks = 80
191 self.job_tot[category][stat] += val
192 logger.debug('%s: done totals', self.label)
194 def long_label(self):
197 label += ' -- elapsed time '
198 s = (self.finishtime - self.starttime).total_seconds()
200 label += '{}d'.format(int(s/86400))
202 label += '{}h'.format(int(s/3600) % 24)
204 label += '{}m'.format(int(s/60) % 60)
205 label += '{}s'.format(int(s) % 60)
208 def text_report(self):
210 return "(no report generated)\n"
211 return "\n".join(itertools.chain(
212 self._text_report_gen(),
213 self._recommend_gen())) + "\n"
215 def html_report(self):
216 return crunchstat_summary.chartjs.ChartJS(self.label, [self]).html()
218 def _text_report_gen(self):
219 yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
220 for category, stat_max in sorted(self.stats_max.iteritems()):
221 for stat, val in sorted(stat_max.iteritems()):
222 if stat.endswith('__rate'):
224 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
225 val = self._format(val)
226 tot = self._format(self.job_tot[category].get(stat, '-'))
227 yield "\t".join([category, stat, str(val), max_rate, tot])
229 ('Number of tasks: {}',
232 ('Max CPU time spent by a single task: {}s',
233 self.stats_max['cpu']['user+sys'],
235 ('Max CPU usage in a single interval: {}%',
236 self.stats_max['cpu']['user+sys__rate'],
238 ('Overall CPU usage: {}%',
239 self.job_tot['cpu']['user+sys'] /
240 self.job_tot['time']['elapsed']
241 if self.job_tot['time']['elapsed'] > 0 else 0,
243 ('Max memory used by a single task: {}GB',
244 self.stats_max['mem']['rss'],
246 ('Max network traffic in a single task: {}GB',
247 self.stats_max['net:eth0']['tx+rx'] +
248 self.stats_max['net:keep0']['tx+rx'],
250 ('Max network speed in a single interval: {}MB/s',
251 self.stats_max['net:eth0']['tx+rx__rate'] +
252 self.stats_max['net:keep0']['tx+rx__rate'],
254 ('Keep cache miss rate {}%',
255 (float(self.job_tot['keepcache']['miss']) /
256 float(self.job_tot['keepcalls']['get']))
257 if self.job_tot['keepcalls']['get'] > 0 else 0,
258 lambda x: x * 100.0),
259 ('Keep cache utilization {}%',
260 (float(self.job_tot['blkio:0:0']['read']) /
261 float(self.job_tot['net:keep0']['rx']))
262 if self.job_tot['net:keep0']['rx'] > 0 else 0,
263 lambda x: x * 100.0)):
264 format_string, val, transform = args
265 if val == float('-Inf'):
269 yield "# "+format_string.format(self._format(val))
271 def _recommend_gen(self):
272 return itertools.chain(
273 self._recommend_cpu(),
274 self._recommend_ram(),
275 self._recommend_keep_cache())
277 def _recommend_cpu(self):
278 """Recommend asking for 4 cores if max CPU usage was 333%"""
280 cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
281 if cpu_max_rate == float('-Inf'):
282 logger.warning('%s: no CPU usage data', self.label)
284 used_cores = max(1, int(math.ceil(cpu_max_rate)))
285 asked_cores = self.existing_constraints.get('min_cores_per_node')
286 if asked_cores is None or used_cores < asked_cores:
288 '#!! {} max CPU usage was {}% -- '
289 'try runtime_constraints "min_cores_per_node":{}'
292 int(math.ceil(cpu_max_rate*100)),
295 def _recommend_ram(self):
296 """Recommend an economical RAM constraint for this job.
298 Nodes that are advertised as "8 gibibytes" actually have what
299 we might call "8 nearlygibs" of memory available for jobs.
300 Here, we calculate a whole number of nearlygibs that would
301 have sufficed to run the job, then recommend requesting a node
302 with that number of nearlygibs (expressed as mebibytes).
304 Requesting a node with "nearly 8 gibibytes" is our best hope
305 of getting a node that actually has nearly 8 gibibytes
306 available. If the node manager is smart enough to account for
307 the discrepancy itself when choosing/creating a node, we'll
308 get an 8 GiB node with nearly 8 GiB available. Otherwise, the
309 advertised size of the next-size-smaller node (say, 6 GiB)
310 will be too low to satisfy our request, so we will effectively
311 get rounded up to 8 GiB.
313 For example, if we need 7500 MiB, we can ask for 7500 MiB, and
314 we will generally get a node that is advertised as "8 GiB" and
315 has at least 7500 MiB available. However, asking for 8192 MiB
316 would either result in an unnecessarily expensive 12 GiB node
317 (if node manager knows about the discrepancy), or an 8 GiB
318 node which has less than 8192 MiB available and is therefore
319 considered by crunch-dispatch to be too small to meet our
322 When node manager learns how to predict the available memory
323 for each node type such that crunch-dispatch always agrees
324 that a node is big enough to run the job it was brought up
325 for, all this will be unnecessary. We'll just ask for exactly
326 the memory we want -- even if that happens to be 8192 MiB.
329 used_bytes = self.stats_max['mem']['rss']
330 if used_bytes == float('-Inf'):
331 logger.warning('%s: no memory usage data', self.label)
333 used_mib = math.ceil(float(used_bytes) / 1048576)
334 asked_mib = self.existing_constraints.get('min_ram_mb_per_node')
336 nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
337 if asked_mib is None or (
338 math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib)):
340 '#!! {} max RSS was {} MiB -- '
341 'try runtime_constraints "min_ram_mb_per_node":{}'
345 int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024))
347 def _recommend_keep_cache(self):
348 """Recommend increasing keep cache if utilization < 80%"""
349 if self.job_tot['net:keep0']['rx'] == 0:
351 utilization = (float(self.job_tot['blkio:0:0']['read']) /
352 float(self.job_tot['net:keep0']['rx']))
353 asked_mib = self.existing_constraints.get('keep_cache_mb_per_task', 256)
355 if utilization < 0.8:
357 '#!! {} Keep cache utilization was {:.2f}% -- '
358 'try runtime_constraints "keep_cache_mb_per_task":{} (or more)'
365 def _format(self, val):
366 """Return a string representation of a stat.
368 {:.2f} for floats, default format for everything else."""
369 if isinstance(val, float):
370 return '{:.2f}'.format(val)
372 return '{}'.format(val)
375 class CollectionSummarizer(Summarizer):
376 def __init__(self, collection_id, **kwargs):
377 super(CollectionSummarizer, self).__init__(
378 crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
379 self.label = collection_id
382 class JobSummarizer(Summarizer):
383 def __init__(self, job, **kwargs):
384 arv = arvados.api('v1')
385 if isinstance(job, basestring):
386 self.job = arv.jobs().get(uuid=job).execute()
390 if self.job.get('log'):
392 rdr = crunchstat_summary.reader.CollectionReader(self.job['log'])
393 except arvados.errors.NotFoundError as e:
394 logger.warning("Trying event logs after failing to read "
395 "log collection %s: %s", self.job['log'], e)
397 label = self.job['uuid']
399 rdr = crunchstat_summary.reader.LiveLogReader(self.job['uuid'])
400 label = self.job['uuid'] + ' (partial)'
401 super(JobSummarizer, self).__init__(rdr, **kwargs)
403 self.existing_constraints = self.job.get('runtime_constraints', {})
406 class PipelineSummarizer(object):
407 def __init__(self, pipeline_instance_uuid, **kwargs):
408 arv = arvados.api('v1', model=OrderedJsonModel())
409 instance = arv.pipeline_instances().get(
410 uuid=pipeline_instance_uuid).execute()
411 self.summarizers = collections.OrderedDict()
412 for cname, component in instance['components'].iteritems():
413 if 'job' not in component:
415 "%s: skipping component with no job assigned", cname)
418 "%s: job %s", cname, component['job']['uuid'])
419 summarizer = JobSummarizer(component['job'], **kwargs)
420 summarizer.label = '{} {}'.format(
421 cname, component['job']['uuid'])
422 self.summarizers[cname] = summarizer
423 self.label = pipeline_instance_uuid
427 for summarizer in self.summarizers.itervalues():
428 t = threading.Thread(target=summarizer.run)
435 def text_report(self):
437 for cname, summarizer in self.summarizers.iteritems():
438 txt += '### Summary for {} ({})\n'.format(
439 cname, summarizer.job['uuid'])
440 txt += summarizer.text_report()
444 def html_report(self):
445 return crunchstat_summary.chartjs.ChartJS(
446 self.label, self.summarizers.itervalues()).html()