1 from __future__ import print_function
5 import crunchstat_summary.chartjs
6 import crunchstat_summary.reader
15 from arvados.api import OrderedJsonModel
16 from crunchstat_summary import logger
18 # Recommend memory constraints that are this multiple of an integral
19 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
20 # that have amounts like 7.5 GiB according to the kernel.)
21 AVAILABLE_RAM_RATIO = 0.95
24 # Workaround datetime.datetime.strptime() thread-safety bug by calling
25 # it once before starting threads. https://bugs.python.org/issue7980
26 datetime.datetime.strptime('1999-12-31_23:59:59', '%Y-%m-%d_%H:%M:%S')
32 self.series = collections.defaultdict(list)
35 class Summarizer(object):
36 def __init__(self, logdata, label=None, skip_child_jobs=False):
37 self._logdata = logdata
41 self.finishtime = None
42 self._skip_child_jobs = skip_child_jobs
44 # stats_max: {category: {stat: val}}
45 self.stats_max = collections.defaultdict(
46 functools.partial(collections.defaultdict, lambda: 0))
47 # task_stats: {task_id: {category: {stat: val}}}
48 self.task_stats = collections.defaultdict(
49 functools.partial(collections.defaultdict, dict))
52 self.tasks = collections.defaultdict(Task)
54 # We won't bother recommending new runtime constraints if the
55 # constraints given when running the job are known to us and
56 # are already suitable. If applicable, the subclass
57 # constructor will overwrite this with something useful.
58 self.existing_constraints = {}
60 logger.debug("%s: logdata %s", self.label, logdata)
63 logger.debug("%s: parsing logdata %s", self.label, self._logdata)
64 for line in self._logdata:
65 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
67 seq = int(m.group('seq'))
68 uuid = m.group('task_uuid')
69 self.seq_to_uuid[seq] = uuid
70 logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
73 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
75 task_id = self.seq_to_uuid[int(m.group('seq'))]
76 elapsed = int(m.group('elapsed'))
77 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
78 if elapsed > self.stats_max['time']['elapsed']:
79 self.stats_max['time']['elapsed'] = elapsed
82 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
84 uuid = m.group('uuid')
85 if self._skip_child_jobs:
86 logger.warning('%s: omitting stats from child job %s'
87 ' because --skip-child-jobs flag is on',
90 logger.debug('%s: follow %s', self.label, uuid)
91 child_summarizer = JobSummarizer(uuid)
92 child_summarizer.stats_max = self.stats_max
93 child_summarizer.task_stats = self.task_stats
94 child_summarizer.tasks = self.tasks
95 child_summarizer.starttime = self.starttime
96 child_summarizer.run()
97 logger.debug('%s: done %s', self.label, uuid)
100 m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
104 if self.label is None:
105 self.label = m.group('job_uuid')
106 logger.debug('%s: using job uuid as label', self.label)
107 if m.group('category').endswith(':'):
108 # "stderr crunchstat: notice: ..."
110 elif m.group('category') in ('error', 'caught'):
112 elif m.group('category') == 'read':
113 # "stderr crunchstat: read /proc/1234/net/dev: ..."
114 # (crunchstat formatting fixed, but old logs still say this)
116 task_id = self.seq_to_uuid[int(m.group('seq'))]
117 task = self.tasks[task_id]
119 # Use the first and last crunchstat timestamps as
120 # approximations of starttime and finishtime.
121 timestamp = datetime.datetime.strptime(
122 m.group('timestamp'), '%Y-%m-%d_%H:%M:%S')
123 if not task.starttime:
124 task.starttime = timestamp
125 logger.debug('%s: task %s starttime %s',
126 self.label, task_id, timestamp)
127 task.finishtime = timestamp
129 if not self.starttime:
130 self.starttime = timestamp
131 self.finishtime = timestamp
133 this_interval_s = None
134 for group in ['current', 'interval']:
135 if not m.group(group):
137 category = m.group('category')
138 words = m.group(group).split(' ')
140 for val, stat in zip(words[::2], words[1::2]):
143 stats[stat] = float(val)
145 stats[stat] = int(val)
146 except ValueError as e:
148 'Error parsing {} stat in "{}": {!r}'.format(
150 if 'user' in stats or 'sys' in stats:
151 stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
152 if 'tx' in stats or 'rx' in stats:
153 stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
154 for stat, val in stats.iteritems():
155 if group == 'interval':
156 if stat == 'seconds':
157 this_interval_s = val
159 elif not (this_interval_s > 0):
161 "BUG? interval stat given with duration {!r}".
162 format(this_interval_s))
165 stat = stat + '__rate'
166 val = val / this_interval_s
167 if stat in ['user+sys__rate', 'tx+rx__rate']:
168 task.series[category, stat].append(
169 (timestamp - self.starttime, val))
172 task.series[category, stat].append(
173 (timestamp - self.starttime, val))
174 self.task_stats[task_id][category][stat] = val
175 if val > self.stats_max[category][stat]:
176 self.stats_max[category][stat] = val
177 logger.debug('%s: done parsing', self.label)
179 self.job_tot = collections.defaultdict(
180 functools.partial(collections.defaultdict, int))
181 for task_id, task_stat in self.task_stats.iteritems():
182 for category, stat_last in task_stat.iteritems():
183 for stat, val in stat_last.iteritems():
184 if stat in ['cpus', 'cache', 'swap', 'rss']:
185 # meaningless stats like 16 cpu cores x 5 tasks = 80
187 self.job_tot[category][stat] += val
188 logger.debug('%s: done totals', self.label)
190 def long_label(self):
193 label += ' -- elapsed time '
194 s = (self.finishtime - self.starttime).total_seconds()
196 label += '{}d'.format(int(s/86400))
198 label += '{}h'.format(int(s/3600) % 24)
200 label += '{}m'.format(int(s/60) % 60)
201 label += '{}s'.format(int(s) % 60)
204 def text_report(self):
206 return "(no report generated)\n"
207 return "\n".join(itertools.chain(
208 self._text_report_gen(),
209 self._recommend_gen())) + "\n"
211 def html_report(self):
212 return crunchstat_summary.chartjs.ChartJS(self.label, [self]).html()
214 def _text_report_gen(self):
215 yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
216 for category, stat_max in sorted(self.stats_max.iteritems()):
217 for stat, val in sorted(stat_max.iteritems()):
218 if stat.endswith('__rate'):
220 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
221 val = self._format(val)
222 tot = self._format(self.job_tot[category].get(stat, '-'))
223 yield "\t".join([category, stat, str(val), max_rate, tot])
225 ('Number of tasks: {}',
228 ('Max CPU time spent by a single task: {}s',
229 self.stats_max['cpu']['user+sys'],
231 ('Max CPU usage in a single interval: {}%',
232 self.stats_max['cpu']['user+sys__rate'],
234 ('Overall CPU usage: {}%',
235 self.job_tot['cpu']['user+sys'] /
236 self.job_tot['time']['elapsed']
237 if self.job_tot['time']['elapsed'] > 0 else 0,
239 ('Max memory used by a single task: {}GB',
240 self.stats_max['mem']['rss'],
242 ('Max network traffic in a single task: {}GB',
243 self.stats_max['net:eth0']['tx+rx'] +
244 self.stats_max['net:keep0']['tx+rx'],
246 ('Max network speed in a single interval: {}MB/s',
247 self.stats_max['net:eth0']['tx+rx__rate'] +
248 self.stats_max['net:keep0']['tx+rx__rate'],
250 ('Keep cache miss rate {}%',
251 (float(self.job_tot['keepcache']['miss']) /
252 float(self.job_tot['keepcalls']['get']))
253 if self.job_tot['keepcalls']['get'] > 0 else 0,
254 lambda x: x * 100.0),
255 ('Keep cache utilization {}%',
256 (float(self.job_tot['blkio:0:0']['read']) /
257 float(self.job_tot['net:keep0']['rx']))
258 if self.job_tot['net:keep0']['rx'] > 0 else 0,
259 lambda x: x * 100.0)):
260 format_string, val, transform = args
261 if val == float('-Inf'):
265 yield "# "+format_string.format(self._format(val))
267 def _recommend_gen(self):
268 return itertools.chain(
269 self._recommend_cpu(),
270 self._recommend_ram(),
271 self._recommend_keep_cache())
273 def _recommend_cpu(self):
274 """Recommend asking for 4 cores if max CPU usage was 333%"""
276 cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
277 if cpu_max_rate == float('-Inf'):
278 logger.warning('%s: no CPU usage data', self.label)
280 used_cores = max(1, int(math.ceil(cpu_max_rate)))
281 asked_cores = self.existing_constraints.get('min_cores_per_node')
282 if asked_cores is None or used_cores < asked_cores:
284 '#!! {} max CPU usage was {}% -- '
285 'try runtime_constraints "min_cores_per_node":{}'
288 int(math.ceil(cpu_max_rate*100)),
291 def _recommend_ram(self):
292 """Recommend an economical RAM constraint for this job.
294 Nodes that are advertised as "8 gibibytes" actually have what
295 we might call "8 nearlygibs" of memory available for jobs.
296 Here, we calculate a whole number of nearlygibs that would
297 have sufficed to run the job, then recommend requesting a node
298 with that number of nearlygibs (expressed as mebibytes).
300 Requesting a node with "nearly 8 gibibytes" is our best hope
301 of getting a node that actually has nearly 8 gibibytes
302 available. If the node manager is smart enough to account for
303 the discrepancy itself when choosing/creating a node, we'll
304 get an 8 GiB node with nearly 8 GiB available. Otherwise, the
305 advertised size of the next-size-smaller node (say, 6 GiB)
306 will be too low to satisfy our request, so we will effectively
307 get rounded up to 8 GiB.
309 For example, if we need 7500 MiB, we can ask for 7500 MiB, and
310 we will generally get a node that is advertised as "8 GiB" and
311 has at least 7500 MiB available. However, asking for 8192 MiB
312 would either result in an unnecessarily expensive 12 GiB node
313 (if node manager knows about the discrepancy), or an 8 GiB
314 node which has less than 8192 MiB available and is therefore
315 considered by crunch-dispatch to be too small to meet our
318 When node manager learns how to predict the available memory
319 for each node type such that crunch-dispatch always agrees
320 that a node is big enough to run the job it was brought up
321 for, all this will be unnecessary. We'll just ask for exactly
322 the memory we want -- even if that happens to be 8192 MiB.
325 used_bytes = self.stats_max['mem']['rss']
326 if used_bytes == float('-Inf'):
327 logger.warning('%s: no memory usage data', self.label)
329 used_mib = math.ceil(float(used_bytes) / 1048576)
330 asked_mib = self.existing_constraints.get('min_ram_mb_per_node')
332 nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
333 if asked_mib is None or (
334 math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib)):
336 '#!! {} max RSS was {} MiB -- '
337 'try runtime_constraints "min_ram_mb_per_node":{}'
341 int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024))
343 def _recommend_keep_cache(self):
344 """Recommend increasing keep cache if utilization < 80%"""
345 if self.job_tot['net:keep0']['rx'] == 0:
347 utilization = (float(self.job_tot['blkio:0:0']['read']) /
348 float(self.job_tot['net:keep0']['rx']))
349 asked_mib = self.existing_constraints.get('keep_cache_mb_per_task', 256)
351 if utilization < 0.8:
353 '#!! {} Keep cache utilization was {:.2f}% -- '
354 'try runtime_constraints "keep_cache_mb_per_task":{} (or more)'
361 def _format(self, val):
362 """Return a string representation of a stat.
364 {:.2f} for floats, default format for everything else."""
365 if isinstance(val, float):
366 return '{:.2f}'.format(val)
368 return '{}'.format(val)
371 class CollectionSummarizer(Summarizer):
372 def __init__(self, collection_id, **kwargs):
373 super(CollectionSummarizer, self).__init__(
374 crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
375 self.label = collection_id
378 class JobSummarizer(Summarizer):
379 def __init__(self, job, **kwargs):
380 arv = arvados.api('v1')
381 if isinstance(job, basestring):
382 self.job = arv.jobs().get(uuid=job).execute()
386 if self.job.get('log'):
388 rdr = crunchstat_summary.reader.CollectionReader(self.job['log'])
389 except arvados.errors.NotFoundError as e:
390 logger.warning("Trying event logs after failing to read "
391 "log collection %s: %s", self.job['log'], e)
393 label = self.job['uuid']
395 rdr = crunchstat_summary.reader.LiveLogReader(self.job['uuid'])
396 label = self.job['uuid'] + ' (partial)'
397 super(JobSummarizer, self).__init__(rdr, **kwargs)
399 self.existing_constraints = self.job.get('runtime_constraints', {})
402 class PipelineSummarizer(object):
403 def __init__(self, pipeline_instance_uuid, **kwargs):
404 arv = arvados.api('v1', model=OrderedJsonModel())
405 instance = arv.pipeline_instances().get(
406 uuid=pipeline_instance_uuid).execute()
407 self.summarizers = collections.OrderedDict()
408 for cname, component in instance['components'].iteritems():
409 if 'job' not in component:
411 "%s: skipping component with no job assigned", cname)
414 "%s: job %s", cname, component['job']['uuid'])
415 summarizer = JobSummarizer(component['job'], **kwargs)
416 summarizer.label = '{} {}'.format(
417 cname, component['job']['uuid'])
418 self.summarizers[cname] = summarizer
419 self.label = pipeline_instance_uuid
423 for summarizer in self.summarizers.itervalues():
424 t = threading.Thread(target=summarizer.run)
431 def text_report(self):
433 for cname, summarizer in self.summarizers.iteritems():
434 txt += '### Summary for {} ({})\n'.format(
435 cname, summarizer.job['uuid'])
436 txt += summarizer.text_report()
440 def html_report(self):
441 return crunchstat_summary.chartjs.ChartJS(
442 self.label, self.summarizers.itervalues()).html()