1 from __future__ import print_function
5 import crunchstat_summary.chartjs
6 import crunchstat_summary.reader
16 from arvados.api import OrderedJsonModel
17 from crunchstat_summary import logger
19 # Recommend memory constraints that are this multiple of an integral
20 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
21 # that have amounts like 7.5 GiB according to the kernel.)
22 AVAILABLE_RAM_RATIO = 0.95
25 # Workaround datetime.datetime.strptime() thread-safety bug by calling
26 # it once before starting threads. https://bugs.python.org/issue7980
27 datetime.datetime.strptime('1999-12-31_23:59:59', '%Y-%m-%d_%H:%M:%S')
33 self.series = collections.defaultdict(list)
36 class Summarizer(object):
37 def __init__(self, logdata, label=None, skip_child_jobs=False):
38 self._logdata = logdata
42 self.finishtime = None
43 self._skip_child_jobs = skip_child_jobs
45 # stats_max: {category: {stat: val}}
46 self.stats_max = collections.defaultdict(
47 functools.partial(collections.defaultdict, lambda: 0))
48 # task_stats: {task_id: {category: {stat: val}}}
49 self.task_stats = collections.defaultdict(
50 functools.partial(collections.defaultdict, dict))
53 self.tasks = collections.defaultdict(Task)
55 # We won't bother recommending new runtime constraints if the
56 # constraints given when running the job are known to us and
57 # are already suitable. If applicable, the subclass
58 # constructor will overwrite this with something useful.
59 self.existing_constraints = {}
61 logger.debug("%s: logdata %s", self.label, logdata)
63 def run_child(self, uuid):
64 if self._skip_child_jobs:
65 logger.warning('%s: omitting stats from child job %s'
66 ' because --skip-child-jobs flag is on',
69 logger.debug('%s: follow %s', self.label, uuid)
70 child_summarizer = JobSummarizer(uuid)
71 child_summarizer.stats_max = self.stats_max
72 child_summarizer.task_stats = self.task_stats
73 child_summarizer.tasks = self.tasks
74 child_summarizer.starttime = self.starttime
75 child_summarizer.run()
76 logger.debug('%s: done %s', self.label, uuid)
79 logger.debug("%s: parsing logdata %s", self.label, self._logdata)
80 for line in self._logdata:
81 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
83 seq = int(m.group('seq'))
84 uuid = m.group('task_uuid')
85 self.seq_to_uuid[seq] = uuid
86 logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
89 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
91 task_id = self.seq_to_uuid[int(m.group('seq'))]
92 elapsed = int(m.group('elapsed'))
93 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
94 if elapsed > self.stats_max['time']['elapsed']:
95 self.stats_max['time']['elapsed'] = elapsed
98 # Old style job logs only - newer style uses job['components']
100 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
102 self.run_child(m.group('uuid'))
105 m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
110 if self.label is None:
111 self.label = m.group('job_uuid')
112 logger.debug('%s: using job uuid as label', self.label)
113 if m.group('category').endswith(':'):
114 # "stderr crunchstat: notice: ..."
116 elif m.group('category') in ('error', 'caught'):
118 elif m.group('category') == 'read':
119 # "stderr crunchstat: read /proc/1234/net/dev: ..."
120 # (crunchstat formatting fixed, but old logs still say this)
122 task_id = self.seq_to_uuid[int(m.group('seq'))]
123 task = self.tasks[task_id]
125 # Use the first and last crunchstat timestamps as
126 # approximations of starttime and finishtime.
127 timestamp = datetime.datetime.strptime(
128 m.group('timestamp'), '%Y-%m-%d_%H:%M:%S')
129 if not task.starttime:
130 task.starttime = timestamp
131 logger.debug('%s: task %s starttime %s',
132 self.label, task_id, timestamp)
133 task.finishtime = timestamp
135 if not self.starttime:
136 self.starttime = timestamp
137 self.finishtime = timestamp
139 this_interval_s = None
140 for group in ['current', 'interval']:
141 if not m.group(group):
143 category = m.group('category')
144 words = m.group(group).split(' ')
146 for val, stat in zip(words[::2], words[1::2]):
149 stats[stat] = float(val)
151 stats[stat] = int(val)
152 except ValueError as e:
154 'Error parsing {} stat: {!r}'.format(
156 if 'user' in stats or 'sys' in stats:
157 stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
158 if 'tx' in stats or 'rx' in stats:
159 stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
160 for stat, val in stats.iteritems():
161 if group == 'interval':
162 if stat == 'seconds':
163 this_interval_s = val
165 elif not (this_interval_s > 0):
167 "BUG? interval stat given with duration {!r}".
168 format(this_interval_s))
171 stat = stat + '__rate'
172 val = val / this_interval_s
173 if stat in ['user+sys__rate', 'tx+rx__rate']:
174 task.series[category, stat].append(
175 (timestamp - self.starttime, val))
178 task.series[category, stat].append(
179 (timestamp - self.starttime, val))
180 self.task_stats[task_id][category][stat] = val
181 if val > self.stats_max[category][stat]:
182 self.stats_max[category][stat] = val
183 except Exception as e:
184 logger.info('Skipping malformed line: {}Error was: {}\n'.format(line, e))
185 logger.debug('%s: done parsing log', self.label)
187 # Enabling this will roll up stats for all subjobs into the parent job
188 if False and 'components' in self.job:
189 for cname, component in self.job['components'].iteritems():
190 self.run_child(component)
192 self.job_tot = collections.defaultdict(
193 functools.partial(collections.defaultdict, int))
194 for task_id, task_stat in self.task_stats.iteritems():
195 for category, stat_last in task_stat.iteritems():
196 for stat, val in stat_last.iteritems():
197 if stat in ['cpus', 'cache', 'swap', 'rss']:
198 # meaningless stats like 16 cpu cores x 5 tasks = 80
200 self.job_tot[category][stat] += val
201 logger.debug('%s: done totals', self.label)
203 def long_label(self):
206 label += ' -- elapsed time '
207 s = (self.finishtime - self.starttime).total_seconds()
209 label += '{}d'.format(int(s/86400))
211 label += '{}h'.format(int(s/3600) % 24)
213 label += '{}m'.format(int(s/60) % 60)
214 label += '{}s'.format(int(s) % 60)
217 def text_report(self):
219 return "(no report generated)\n"
220 return "\n".join(itertools.chain(
221 self._text_report_gen(),
222 self._recommend_gen())) + "\n"
224 def html_report(self):
225 return crunchstat_summary.chartjs.ChartJS(self.label, [self]).html()
227 def _text_report_gen(self):
228 yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
229 for category, stat_max in sorted(self.stats_max.iteritems()):
230 for stat, val in sorted(stat_max.iteritems()):
231 if stat.endswith('__rate'):
233 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
234 val = self._format(val)
235 tot = self._format(self.job_tot[category].get(stat, '-'))
236 yield "\t".join([category, stat, str(val), max_rate, tot])
238 ('Number of tasks: {}',
241 ('Max CPU time spent by a single task: {}s',
242 self.stats_max['cpu']['user+sys'],
244 ('Max CPU usage in a single interval: {}%',
245 self.stats_max['cpu']['user+sys__rate'],
247 ('Overall CPU usage: {}%',
248 self.job_tot['cpu']['user+sys'] /
249 self.job_tot['time']['elapsed']
250 if self.job_tot['time']['elapsed'] > 0 else 0,
252 ('Max memory used by a single task: {}GB',
253 self.stats_max['mem']['rss'],
255 ('Max network traffic in a single task: {}GB',
256 self.stats_max['net:eth0']['tx+rx'] +
257 self.stats_max['net:keep0']['tx+rx'],
259 ('Max network speed in a single interval: {}MB/s',
260 self.stats_max['net:eth0']['tx+rx__rate'] +
261 self.stats_max['net:keep0']['tx+rx__rate'],
263 ('Keep cache miss rate {}%',
264 (float(self.job_tot['keepcache']['miss']) /
265 float(self.job_tot['keepcalls']['get']))
266 if self.job_tot['keepcalls']['get'] > 0 else 0,
267 lambda x: x * 100.0),
268 ('Keep cache utilization {}%',
269 (float(self.job_tot['blkio:0:0']['read']) /
270 float(self.job_tot['net:keep0']['rx']))
271 if self.job_tot['net:keep0']['rx'] > 0 else 0,
272 lambda x: x * 100.0)):
273 format_string, val, transform = args
274 if val == float('-Inf'):
278 yield "# "+format_string.format(self._format(val))
280 def _recommend_gen(self):
281 return itertools.chain(
282 self._recommend_cpu(),
283 self._recommend_ram(),
284 self._recommend_keep_cache())
286 def _recommend_cpu(self):
287 """Recommend asking for 4 cores if max CPU usage was 333%"""
289 cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
290 if cpu_max_rate == float('-Inf'):
291 logger.warning('%s: no CPU usage data', self.label)
293 used_cores = max(1, int(math.ceil(cpu_max_rate)))
294 asked_cores = self.existing_constraints.get('min_cores_per_node')
295 if asked_cores is None or used_cores < asked_cores:
297 '#!! {} max CPU usage was {}% -- '
298 'try runtime_constraints "min_cores_per_node":{}'
301 int(math.ceil(cpu_max_rate*100)),
304 def _recommend_ram(self):
305 """Recommend an economical RAM constraint for this job.
307 Nodes that are advertised as "8 gibibytes" actually have what
308 we might call "8 nearlygibs" of memory available for jobs.
309 Here, we calculate a whole number of nearlygibs that would
310 have sufficed to run the job, then recommend requesting a node
311 with that number of nearlygibs (expressed as mebibytes).
313 Requesting a node with "nearly 8 gibibytes" is our best hope
314 of getting a node that actually has nearly 8 gibibytes
315 available. If the node manager is smart enough to account for
316 the discrepancy itself when choosing/creating a node, we'll
317 get an 8 GiB node with nearly 8 GiB available. Otherwise, the
318 advertised size of the next-size-smaller node (say, 6 GiB)
319 will be too low to satisfy our request, so we will effectively
320 get rounded up to 8 GiB.
322 For example, if we need 7500 MiB, we can ask for 7500 MiB, and
323 we will generally get a node that is advertised as "8 GiB" and
324 has at least 7500 MiB available. However, asking for 8192 MiB
325 would either result in an unnecessarily expensive 12 GiB node
326 (if node manager knows about the discrepancy), or an 8 GiB
327 node which has less than 8192 MiB available and is therefore
328 considered by crunch-dispatch to be too small to meet our
331 When node manager learns how to predict the available memory
332 for each node type such that crunch-dispatch always agrees
333 that a node is big enough to run the job it was brought up
334 for, all this will be unnecessary. We'll just ask for exactly
335 the memory we want -- even if that happens to be 8192 MiB.
338 used_bytes = self.stats_max['mem']['rss']
339 if used_bytes == float('-Inf'):
340 logger.warning('%s: no memory usage data', self.label)
342 used_mib = math.ceil(float(used_bytes) / 1048576)
343 asked_mib = self.existing_constraints.get('min_ram_mb_per_node')
345 nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
346 if asked_mib is None or (
347 math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib)):
349 '#!! {} max RSS was {} MiB -- '
350 'try runtime_constraints "min_ram_mb_per_node":{}'
354 int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024))
356 def _recommend_keep_cache(self):
357 """Recommend increasing keep cache if utilization < 80%"""
358 if self.job_tot['net:keep0']['rx'] == 0:
360 utilization = (float(self.job_tot['blkio:0:0']['read']) /
361 float(self.job_tot['net:keep0']['rx']))
362 asked_mib = self.existing_constraints.get('keep_cache_mb_per_task', 256)
364 if utilization < 0.8:
366 '#!! {} Keep cache utilization was {:.2f}% -- '
367 'try runtime_constraints "keep_cache_mb_per_task":{} (or more)'
374 def _format(self, val):
375 """Return a string representation of a stat.
377 {:.2f} for floats, default format for everything else."""
378 if isinstance(val, float):
379 return '{:.2f}'.format(val)
381 return '{}'.format(val)
384 class CollectionSummarizer(Summarizer):
385 def __init__(self, collection_id, **kwargs):
386 super(CollectionSummarizer, self).__init__(
387 crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
388 self.label = collection_id
391 class JobSummarizer(Summarizer):
392 def __init__(self, job, **kwargs):
393 arv = arvados.api('v1')
394 if isinstance(job, basestring):
395 self.job = arv.jobs().get(uuid=job).execute()
399 if self.job.get('log'):
401 rdr = crunchstat_summary.reader.CollectionReader(self.job['log'])
402 except arvados.errors.NotFoundError as e:
403 logger.warning("Trying event logs after failing to read "
404 "log collection %s: %s", self.job['log'], e)
406 label = self.job['uuid']
408 rdr = crunchstat_summary.reader.LiveLogReader(self.job['uuid'])
409 label = self.job['uuid'] + ' (partial)'
410 super(JobSummarizer, self).__init__(rdr, **kwargs)
412 self.existing_constraints = self.job.get('runtime_constraints', {})
415 class PipelineSummarizer(object):
416 def __init__(self, pipeline_instance_uuid, **kwargs):
417 self.arv = arvados.api('v1', model=OrderedJsonModel())
418 instance = self.arv.pipeline_instances().get(
419 uuid=pipeline_instance_uuid).execute()
420 self.summarizers = collections.OrderedDict()
421 for cname, component in instance['components'].iteritems():
422 if 'job' not in component:
424 "%s: skipping component with no job assigned", cname)
426 self.summarize_job(cname, component['job'], **kwargs)
427 self.label = pipeline_instance_uuid
429 def summarize_job(self, cname, job, **kwargs):
431 logger.info("%s: job %s", cname, uuid)
432 summarizer = JobSummarizer(job, **kwargs)
433 summarizer.label = '{} {}'.format(cname, uuid)
434 self.summarizers[cname] = summarizer
435 if 'components' in job:
436 for cname, uuid in job['components'].iteritems():
437 subjob = self.arv.jobs().get(uuid=uuid).execute()
438 self.summarize_job(cname, subjob, **kwargs)
442 for summarizer in self.summarizers.itervalues():
443 t = threading.Thread(target=summarizer.run)
450 def text_report(self):
452 for cname, summarizer in self.summarizers.iteritems():
453 txt += '### Summary for {} ({})\n'.format(
454 cname, summarizer.job['uuid'])
455 txt += summarizer.text_report()
459 def html_report(self):
460 return crunchstat_summary.chartjs.ChartJS(
461 self.label, self.summarizers.itervalues()).html()