1 from __future__ import print_function
5 import crunchstat_summary.chartjs
6 import crunchstat_summary.reader
15 from arvados.api import OrderedJsonModel
16 from crunchstat_summary import logger
18 # Recommend memory constraints that are this multiple of an integral
19 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
20 # that have amounts like 7.5 GiB according to the kernel.)
21 AVAILABLE_RAM_RATIO = 0.95
27 self.series = collections.defaultdict(list)
30 class Summarizer(object):
31 def __init__(self, logdata, label=None, skip_child_jobs=False):
32 self._logdata = logdata
36 self.finishtime = None
37 self._skip_child_jobs = skip_child_jobs
39 # stats_max: {category: {stat: val}}
40 self.stats_max = collections.defaultdict(
41 functools.partial(collections.defaultdict, lambda: 0))
42 # task_stats: {task_id: {category: {stat: val}}}
43 self.task_stats = collections.defaultdict(
44 functools.partial(collections.defaultdict, dict))
47 self.tasks = collections.defaultdict(Task)
49 # We won't bother recommending new runtime constraints if the
50 # constraints given when running the job are known to us and
51 # are already suitable. If applicable, the subclass
52 # constructor will overwrite this with something useful.
53 self.existing_constraints = {}
55 logger.debug("%s: logdata %s", self.label, logdata)
58 logger.debug("%s: parsing logdata %s", self.label, self._logdata)
59 for line in self._logdata:
60 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
62 seq = int(m.group('seq'))
63 uuid = m.group('task_uuid')
64 self.seq_to_uuid[seq] = uuid
65 logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
68 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) success in (?P<elapsed>\d+) seconds', line)
70 task_id = self.seq_to_uuid[int(m.group('seq'))]
71 elapsed = int(m.group('elapsed'))
72 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
73 if elapsed > self.stats_max['time']['elapsed']:
74 self.stats_max['time']['elapsed'] = elapsed
77 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
79 uuid = m.group('uuid')
80 if self._skip_child_jobs:
81 logger.warning('%s: omitting stats from child job %s'
82 ' because --skip-child-jobs flag is on',
85 logger.debug('%s: follow %s', self.label, uuid)
86 child_summarizer = JobSummarizer(uuid)
87 child_summarizer.stats_max = self.stats_max
88 child_summarizer.task_stats = self.task_stats
89 child_summarizer.tasks = self.tasks
90 child_summarizer.starttime = self.starttime
91 child_summarizer.run()
92 logger.debug('%s: done %s', self.label, uuid)
95 m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
99 if self.label is None:
100 self.label = m.group('job_uuid')
101 logger.debug('%s: using job uuid as label', self.label)
102 if m.group('category').endswith(':'):
103 # "stderr crunchstat: notice: ..."
105 elif m.group('category') in ('error', 'caught'):
107 elif m.group('category') == 'read':
108 # "stderr crunchstat: read /proc/1234/net/dev: ..."
109 # (crunchstat formatting fixed, but old logs still say this)
111 task_id = self.seq_to_uuid[int(m.group('seq'))]
112 task = self.tasks[task_id]
114 # Use the first and last crunchstat timestamps as
115 # approximations of starttime and finishtime.
116 timestamp = datetime.datetime.strptime(
117 m.group('timestamp'), '%Y-%m-%d_%H:%M:%S')
118 if not task.starttime:
119 task.starttime = timestamp
120 logger.debug('%s: task %s starttime %s',
121 self.label, task_id, timestamp)
122 task.finishtime = timestamp
124 if not self.starttime:
125 self.starttime = timestamp
126 self.finishtime = timestamp
128 this_interval_s = None
129 for group in ['current', 'interval']:
130 if not m.group(group):
132 category = m.group('category')
133 words = m.group(group).split(' ')
135 for val, stat in zip(words[::2], words[1::2]):
138 stats[stat] = float(val)
140 stats[stat] = int(val)
141 except ValueError as e:
143 'Error parsing {} stat in "{}": {!r}'.format(
145 if 'user' in stats or 'sys' in stats:
146 stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
147 if 'tx' in stats or 'rx' in stats:
148 stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
149 for stat, val in stats.iteritems():
150 if group == 'interval':
151 if stat == 'seconds':
152 this_interval_s = val
154 elif not (this_interval_s > 0):
156 "BUG? interval stat given with duration {!r}".
157 format(this_interval_s))
160 stat = stat + '__rate'
161 val = val / this_interval_s
162 if stat in ['user+sys__rate', 'tx+rx__rate']:
163 task.series[category, stat].append(
164 (timestamp - self.starttime, val))
167 task.series[category, stat].append(
168 (timestamp - self.starttime, val))
169 self.task_stats[task_id][category][stat] = val
170 if val > self.stats_max[category][stat]:
171 self.stats_max[category][stat] = val
172 logger.debug('%s: done parsing', self.label)
174 self.job_tot = collections.defaultdict(
175 functools.partial(collections.defaultdict, int))
176 for task_id, task_stat in self.task_stats.iteritems():
177 for category, stat_last in task_stat.iteritems():
178 for stat, val in stat_last.iteritems():
179 if stat in ['cpus', 'cache', 'swap', 'rss']:
180 # meaningless stats like 16 cpu cores x 5 tasks = 80
182 self.job_tot[category][stat] += val
183 logger.debug('%s: done totals', self.label)
185 def long_label(self):
188 label += ' -- elapsed time '
189 s = (self.finishtime - self.starttime).total_seconds()
191 label += '{}d'.format(int(s/86400))
193 label += '{}h'.format(int(s/3600) % 24)
195 label += '{}m'.format(int(s/60) % 60)
196 label += '{}s'.format(int(s) % 60)
199 def text_report(self):
200 return "\n".join(itertools.chain(
201 self._text_report_gen(),
202 self._recommend_gen())) + "\n"
204 def html_report(self):
205 return crunchstat_summary.chartjs.ChartJS(self.label, [self]).html()
207 def _text_report_gen(self):
208 yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
209 for category, stat_max in sorted(self.stats_max.iteritems()):
210 for stat, val in sorted(stat_max.iteritems()):
211 if stat.endswith('__rate'):
213 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
214 val = self._format(val)
215 tot = self._format(self.job_tot[category].get(stat, '-'))
216 yield "\t".join([category, stat, str(val), max_rate, tot])
218 ('Number of tasks: {}',
221 ('Max CPU time spent by a single task: {}s',
222 self.stats_max['cpu']['user+sys'],
224 ('Max CPU usage in a single interval: {}%',
225 self.stats_max['cpu']['user+sys__rate'],
227 ('Overall CPU usage: {}%',
228 self.job_tot['cpu']['user+sys'] /
229 self.job_tot['time']['elapsed'],
231 ('Max memory used by a single task: {}GB',
232 self.stats_max['mem']['rss'],
234 ('Max network traffic in a single task: {}GB',
235 self.stats_max['net:eth0']['tx+rx'] +
236 self.stats_max['net:keep0']['tx+rx'],
238 ('Max network speed in a single interval: {}MB/s',
239 self.stats_max['net:eth0']['tx+rx__rate'] +
240 self.stats_max['net:keep0']['tx+rx__rate'],
242 format_string, val, transform = args
243 if val == float('-Inf'):
247 yield "# "+format_string.format(self._format(val))
249 def _recommend_gen(self):
250 return itertools.chain(
251 self._recommend_cpu(),
252 self._recommend_ram())
254 def _recommend_cpu(self):
255 """Recommend asking for 4 cores if max CPU usage was 333%"""
257 cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
258 if cpu_max_rate == float('-Inf'):
259 logger.warning('%s: no CPU usage data', self.label)
261 used_cores = max(1, int(math.ceil(cpu_max_rate)))
262 asked_cores = self.existing_constraints.get('min_cores_per_node')
263 if asked_cores is None or used_cores < asked_cores:
265 '#!! {} max CPU usage was {}% -- '
266 'try runtime_constraints "min_cores_per_node":{}'
269 int(math.ceil(cpu_max_rate*100)),
272 def _recommend_ram(self):
273 """Recommend an economical RAM constraint for this job.
275 Nodes that are advertised as "8 gibibytes" actually have what
276 we might call "8 nearlygibs" of memory available for jobs.
277 Here, we calculate a whole number of nearlygibs that would
278 have sufficed to run the job, then recommend requesting a node
279 with that number of nearlygibs (expressed as mebibytes).
281 Requesting a node with "nearly 8 gibibytes" is our best hope
282 of getting a node that actually has nearly 8 gibibytes
283 available. If the node manager is smart enough to account for
284 the discrepancy itself when choosing/creating a node, we'll
285 get an 8 GiB node with nearly 8 GiB available. Otherwise, the
286 advertised size of the next-size-smaller node (say, 6 GiB)
287 will be too low to satisfy our request, so we will effectively
288 get rounded up to 8 GiB.
290 For example, if we need 7500 MiB, we can ask for 7500 MiB, and
291 we will generally get a node that is advertised as "8 GiB" and
292 has at least 7500 MiB available. However, asking for 8192 MiB
293 would either result in an unnecessarily expensive 12 GiB node
294 (if node manager knows about the discrepancy), or an 8 GiB
295 node which has less than 8192 MiB available and is therefore
296 considered by crunch-dispatch to be too small to meet our
299 When node manager learns how to predict the available memory
300 for each node type such that crunch-dispatch always agrees
301 that a node is big enough to run the job it was brought up
302 for, all this will be unnecessary. We'll just ask for exactly
303 the memory we want -- even if that happens to be 8192 MiB.
306 used_bytes = self.stats_max['mem']['rss']
307 if used_bytes == float('-Inf'):
308 logger.warning('%s: no memory usage data', self.label)
310 used_mib = math.ceil(float(used_bytes) / 1048576)
311 asked_mib = self.existing_constraints.get('min_ram_mb_per_node')
313 nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
314 if asked_mib is None or (
315 math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib)):
317 '#!! {} max RSS was {} MiB -- '
318 'try runtime_constraints "min_ram_mb_per_node":{}'
322 int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024))
324 def _format(self, val):
325 """Return a string representation of a stat.
327 {:.2f} for floats, default format for everything else."""
328 if isinstance(val, float):
329 return '{:.2f}'.format(val)
331 return '{}'.format(val)
334 class CollectionSummarizer(Summarizer):
335 def __init__(self, collection_id, **kwargs):
336 super(CollectionSummarizer, self).__init__(
337 crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
338 self.label = collection_id
341 class JobSummarizer(Summarizer):
342 def __init__(self, job, **kwargs):
343 arv = arvados.api('v1')
344 if isinstance(job, basestring):
345 self.job = arv.jobs().get(uuid=job).execute()
351 rdr = crunchstat_summary.reader.CollectionReader(self.job['log'])
352 except arvados.errors.NotFoundError as e:
353 logger.warning("Trying event logs after failing to read "
354 "log collection %s: %s", self.job['log'], e)
356 label = self.job['uuid']
358 rdr = crunchstat_summary.reader.LiveLogReader(self.job['uuid'])
359 label = self.job['uuid'] + ' (partial)'
360 super(JobSummarizer, self).__init__(rdr, **kwargs)
362 self.existing_constraints = self.job.get('runtime_constraints', {})
365 class PipelineSummarizer(object):
366 def __init__(self, pipeline_instance_uuid, **kwargs):
367 arv = arvados.api('v1', model=OrderedJsonModel())
368 instance = arv.pipeline_instances().get(
369 uuid=pipeline_instance_uuid).execute()
370 self.summarizers = collections.OrderedDict()
371 for cname, component in instance['components'].iteritems():
372 if 'job' not in component:
374 "%s: skipping component with no job assigned", cname)
375 elif component['job'].get('log') is None:
377 "%s: skipping job %s with no log available",
378 cname, component['job'].get('uuid'))
381 "%s: logdata %s", cname, component['job']['log'])
382 summarizer = JobSummarizer(component['job'], **kwargs)
383 summarizer.label = cname
384 self.summarizers[cname] = summarizer
385 self.label = pipeline_instance_uuid
389 for summarizer in self.summarizers.itervalues():
390 t = threading.Thread(target=summarizer.run)
397 def text_report(self):
399 for cname, summarizer in self.summarizers.iteritems():
400 txt += '### Summary for {} ({})\n'.format(
401 cname, summarizer.job['uuid'])
402 txt += summarizer.text_report()
406 def html_report(self):
407 return crunchstat_summary.chartjs.ChartJS(
408 self.label, self.summarizers.itervalues()).html()