Merge branch '8015-crunch2-mount' closes #8015
[arvados.git] / tools / crunchstat-summary / crunchstat_summary / summarizer.py
1 from __future__ import print_function
2
3 import arvados
4 import collections
5 import crunchstat_summary.chartjs
6 import crunchstat_summary.reader
7 import datetime
8 import functools
9 import itertools
10 import math
11 import re
12 import sys
13 import threading
14
15 from arvados.api import OrderedJsonModel
16 from crunchstat_summary import logger
17
18 # Recommend memory constraints that are this multiple of an integral
19 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
20 # that have amounts like 7.5 GiB according to the kernel.)
21 AVAILABLE_RAM_RATIO = 0.95
22
23
24 # Workaround datetime.datetime.strptime() thread-safety bug by calling
25 # it once before starting threads.  https://bugs.python.org/issue7980
26 datetime.datetime.strptime('1999-12-31_23:59:59', '%Y-%m-%d_%H:%M:%S')
27
28
29 class Task(object):
30     def __init__(self):
31         self.starttime = None
32         self.series = collections.defaultdict(list)
33
34
35 class Summarizer(object):
36     def __init__(self, logdata, label=None, skip_child_jobs=False):
37         self._logdata = logdata
38
39         self.label = label
40         self.starttime = None
41         self.finishtime = None
42         self._skip_child_jobs = skip_child_jobs
43
44         # stats_max: {category: {stat: val}}
45         self.stats_max = collections.defaultdict(
46             functools.partial(collections.defaultdict, lambda: 0))
47         # task_stats: {task_id: {category: {stat: val}}}
48         self.task_stats = collections.defaultdict(
49             functools.partial(collections.defaultdict, dict))
50
51         self.seq_to_uuid = {}
52         self.tasks = collections.defaultdict(Task)
53
54         # We won't bother recommending new runtime constraints if the
55         # constraints given when running the job are known to us and
56         # are already suitable.  If applicable, the subclass
57         # constructor will overwrite this with something useful.
58         self.existing_constraints = {}
59
60         logger.debug("%s: logdata %s", self.label, logdata)
61
62     def run(self):
63         logger.debug("%s: parsing logdata %s", self.label, self._logdata)
64         for line in self._logdata:
65             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
66             if m:
67                 seq = int(m.group('seq'))
68                 uuid = m.group('task_uuid')
69                 self.seq_to_uuid[seq] = uuid
70                 logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
71                 continue
72
73             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
74             if m:
75                 task_id = self.seq_to_uuid[int(m.group('seq'))]
76                 elapsed = int(m.group('elapsed'))
77                 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
78                 if elapsed > self.stats_max['time']['elapsed']:
79                     self.stats_max['time']['elapsed'] = elapsed
80                 continue
81
82             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
83             if m:
84                 uuid = m.group('uuid')
85                 if self._skip_child_jobs:
86                     logger.warning('%s: omitting stats from child job %s'
87                                    ' because --skip-child-jobs flag is on',
88                                    self.label, uuid)
89                     continue
90                 logger.debug('%s: follow %s', self.label, uuid)
91                 child_summarizer = JobSummarizer(uuid)
92                 child_summarizer.stats_max = self.stats_max
93                 child_summarizer.task_stats = self.task_stats
94                 child_summarizer.tasks = self.tasks
95                 child_summarizer.starttime = self.starttime
96                 child_summarizer.run()
97                 logger.debug('%s: done %s', self.label, uuid)
98                 continue
99
100             m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
101             if not m:
102                 continue
103
104             if self.label is None:
105                 self.label = m.group('job_uuid')
106                 logger.debug('%s: using job uuid as label', self.label)
107             if m.group('category').endswith(':'):
108                 # "stderr crunchstat: notice: ..."
109                 continue
110             elif m.group('category') in ('error', 'caught'):
111                 continue
112             elif m.group('category') == 'read':
113                 # "stderr crunchstat: read /proc/1234/net/dev: ..."
114                 # (crunchstat formatting fixed, but old logs still say this)
115                 continue
116             task_id = self.seq_to_uuid[int(m.group('seq'))]
117             task = self.tasks[task_id]
118
119             # Use the first and last crunchstat timestamps as
120             # approximations of starttime and finishtime.
121             timestamp = datetime.datetime.strptime(
122                 m.group('timestamp'), '%Y-%m-%d_%H:%M:%S')
123             if not task.starttime:
124                 task.starttime = timestamp
125                 logger.debug('%s: task %s starttime %s',
126                              self.label, task_id, timestamp)
127             task.finishtime = timestamp
128
129             if not self.starttime:
130                 self.starttime = timestamp
131             self.finishtime = timestamp
132
133             this_interval_s = None
134             for group in ['current', 'interval']:
135                 if not m.group(group):
136                     continue
137                 category = m.group('category')
138                 words = m.group(group).split(' ')
139                 stats = {}
140                 for val, stat in zip(words[::2], words[1::2]):
141                     try:
142                         if '.' in val:
143                             stats[stat] = float(val)
144                         else:
145                             stats[stat] = int(val)
146                     except ValueError as e:
147                         raise ValueError(
148                             'Error parsing {} stat in "{}": {!r}'.format(
149                                 stat, line, e))
150                 if 'user' in stats or 'sys' in stats:
151                     stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
152                 if 'tx' in stats or 'rx' in stats:
153                     stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
154                 for stat, val in stats.iteritems():
155                     if group == 'interval':
156                         if stat == 'seconds':
157                             this_interval_s = val
158                             continue
159                         elif not (this_interval_s > 0):
160                             logger.error(
161                                 "BUG? interval stat given with duration {!r}".
162                                 format(this_interval_s))
163                             continue
164                         else:
165                             stat = stat + '__rate'
166                             val = val / this_interval_s
167                             if stat in ['user+sys__rate', 'tx+rx__rate']:
168                                 task.series[category, stat].append(
169                                     (timestamp - self.starttime, val))
170                     else:
171                         if stat in ['rss']:
172                             task.series[category, stat].append(
173                                 (timestamp - self.starttime, val))
174                         self.task_stats[task_id][category][stat] = val
175                     if val > self.stats_max[category][stat]:
176                         self.stats_max[category][stat] = val
177         logger.debug('%s: done parsing', self.label)
178
179         self.job_tot = collections.defaultdict(
180             functools.partial(collections.defaultdict, int))
181         for task_id, task_stat in self.task_stats.iteritems():
182             for category, stat_last in task_stat.iteritems():
183                 for stat, val in stat_last.iteritems():
184                     if stat in ['cpus', 'cache', 'swap', 'rss']:
185                         # meaningless stats like 16 cpu cores x 5 tasks = 80
186                         continue
187                     self.job_tot[category][stat] += val
188         logger.debug('%s: done totals', self.label)
189
190     def long_label(self):
191         label = self.label
192         if self.finishtime:
193             label += ' -- elapsed time '
194             s = (self.finishtime - self.starttime).total_seconds()
195             if s > 86400:
196                 label += '{}d'.format(int(s/86400))
197             if s > 3600:
198                 label += '{}h'.format(int(s/3600) % 24)
199             if s > 60:
200                 label += '{}m'.format(int(s/60) % 60)
201             label += '{}s'.format(int(s) % 60)
202         return label
203
204     def text_report(self):
205         if not self.tasks:
206             return "(no report generated)\n"
207         return "\n".join(itertools.chain(
208             self._text_report_gen(),
209             self._recommend_gen())) + "\n"
210
211     def html_report(self):
212         return crunchstat_summary.chartjs.ChartJS(self.label, [self]).html()
213
214     def _text_report_gen(self):
215         yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
216         for category, stat_max in sorted(self.stats_max.iteritems()):
217             for stat, val in sorted(stat_max.iteritems()):
218                 if stat.endswith('__rate'):
219                     continue
220                 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
221                 val = self._format(val)
222                 tot = self._format(self.job_tot[category].get(stat, '-'))
223                 yield "\t".join([category, stat, str(val), max_rate, tot])
224         for args in (
225                 ('Number of tasks: {}',
226                  len(self.tasks),
227                  None),
228                 ('Max CPU time spent by a single task: {}s',
229                  self.stats_max['cpu']['user+sys'],
230                  None),
231                 ('Max CPU usage in a single interval: {}%',
232                  self.stats_max['cpu']['user+sys__rate'],
233                  lambda x: x * 100),
234                 ('Overall CPU usage: {}%',
235                  self.job_tot['cpu']['user+sys'] /
236                  self.job_tot['time']['elapsed']
237                  if self.job_tot['time']['elapsed'] > 0 else 0,
238                  lambda x: x * 100),
239                 ('Max memory used by a single task: {}GB',
240                  self.stats_max['mem']['rss'],
241                  lambda x: x / 1e9),
242                 ('Max network traffic in a single task: {}GB',
243                  self.stats_max['net:eth0']['tx+rx'] +
244                  self.stats_max['net:keep0']['tx+rx'],
245                  lambda x: x / 1e9),
246                 ('Max network speed in a single interval: {}MB/s',
247                  self.stats_max['net:eth0']['tx+rx__rate'] +
248                  self.stats_max['net:keep0']['tx+rx__rate'],
249                  lambda x: x / 1e6),
250                 ('Keep cache miss rate {}%',
251                  (float(self.job_tot['keepcache']['miss']) /
252                  float(self.job_tot['keepcalls']['get']))
253                  if self.job_tot['keepcalls']['get'] > 0 else 0,
254                  lambda x: x * 100.0),
255                 ('Keep cache utilization {}%',
256                  (float(self.job_tot['blkio:0:0']['read']) /
257                  float(self.job_tot['net:keep0']['rx']))
258                  if self.job_tot['net:keep0']['rx'] > 0 else 0,
259                  lambda x: x * 100.0)):
260             format_string, val, transform = args
261             if val == float('-Inf'):
262                 continue
263             if transform:
264                 val = transform(val)
265             yield "# "+format_string.format(self._format(val))
266
267     def _recommend_gen(self):
268         return itertools.chain(
269             self._recommend_cpu(),
270             self._recommend_ram(),
271             self._recommend_keep_cache())
272
273     def _recommend_cpu(self):
274         """Recommend asking for 4 cores if max CPU usage was 333%"""
275
276         cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
277         if cpu_max_rate == float('-Inf'):
278             logger.warning('%s: no CPU usage data', self.label)
279             return
280         used_cores = max(1, int(math.ceil(cpu_max_rate)))
281         asked_cores = self.existing_constraints.get('min_cores_per_node')
282         if asked_cores is None or used_cores < asked_cores:
283             yield (
284                 '#!! {} max CPU usage was {}% -- '
285                 'try runtime_constraints "min_cores_per_node":{}'
286             ).format(
287                 self.label,
288                 int(math.ceil(cpu_max_rate*100)),
289                 int(used_cores))
290
291     def _recommend_ram(self):
292         """Recommend an economical RAM constraint for this job.
293
294         Nodes that are advertised as "8 gibibytes" actually have what
295         we might call "8 nearlygibs" of memory available for jobs.
296         Here, we calculate a whole number of nearlygibs that would
297         have sufficed to run the job, then recommend requesting a node
298         with that number of nearlygibs (expressed as mebibytes).
299
300         Requesting a node with "nearly 8 gibibytes" is our best hope
301         of getting a node that actually has nearly 8 gibibytes
302         available.  If the node manager is smart enough to account for
303         the discrepancy itself when choosing/creating a node, we'll
304         get an 8 GiB node with nearly 8 GiB available.  Otherwise, the
305         advertised size of the next-size-smaller node (say, 6 GiB)
306         will be too low to satisfy our request, so we will effectively
307         get rounded up to 8 GiB.
308
309         For example, if we need 7500 MiB, we can ask for 7500 MiB, and
310         we will generally get a node that is advertised as "8 GiB" and
311         has at least 7500 MiB available.  However, asking for 8192 MiB
312         would either result in an unnecessarily expensive 12 GiB node
313         (if node manager knows about the discrepancy), or an 8 GiB
314         node which has less than 8192 MiB available and is therefore
315         considered by crunch-dispatch to be too small to meet our
316         constraint.
317
318         When node manager learns how to predict the available memory
319         for each node type such that crunch-dispatch always agrees
320         that a node is big enough to run the job it was brought up
321         for, all this will be unnecessary.  We'll just ask for exactly
322         the memory we want -- even if that happens to be 8192 MiB.
323         """
324
325         used_bytes = self.stats_max['mem']['rss']
326         if used_bytes == float('-Inf'):
327             logger.warning('%s: no memory usage data', self.label)
328             return
329         used_mib = math.ceil(float(used_bytes) / 1048576)
330         asked_mib = self.existing_constraints.get('min_ram_mb_per_node')
331
332         nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
333         if asked_mib is None or (
334                 math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib)):
335             yield (
336                 '#!! {} max RSS was {} MiB -- '
337                 'try runtime_constraints "min_ram_mb_per_node":{}'
338             ).format(
339                 self.label,
340                 int(used_mib),
341                 int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024))
342
343     def _recommend_keep_cache(self):
344         """Recommend increasing keep cache if utilization < 80%"""
345         if self.job_tot['net:keep0']['rx'] == 0:
346             return
347         utilization = (float(self.job_tot['blkio:0:0']['read']) /
348                        float(self.job_tot['net:keep0']['rx']))
349         asked_mib = self.existing_constraints.get('keep_cache_mb_per_task', 256)
350
351         if utilization < 0.8:
352             yield (
353                 '#!! {} Keep cache utilization was {:.2f}% -- '
354                 'try runtime_constraints "keep_cache_mb_per_task":{} (or more)'
355             ).format(
356                 self.label,
357                 utilization * 100.0,
358                 asked_mib*2)
359
360
361     def _format(self, val):
362         """Return a string representation of a stat.
363
364         {:.2f} for floats, default format for everything else."""
365         if isinstance(val, float):
366             return '{:.2f}'.format(val)
367         else:
368             return '{}'.format(val)
369
370
371 class CollectionSummarizer(Summarizer):
372     def __init__(self, collection_id, **kwargs):
373         super(CollectionSummarizer, self).__init__(
374             crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
375         self.label = collection_id
376
377
378 class JobSummarizer(Summarizer):
379     def __init__(self, job, **kwargs):
380         arv = arvados.api('v1')
381         if isinstance(job, basestring):
382             self.job = arv.jobs().get(uuid=job).execute()
383         else:
384             self.job = job
385         rdr = None
386         if self.job.get('log'):
387             try:
388                 rdr = crunchstat_summary.reader.CollectionReader(self.job['log'])
389             except arvados.errors.NotFoundError as e:
390                 logger.warning("Trying event logs after failing to read "
391                                "log collection %s: %s", self.job['log'], e)
392             else:
393                 label = self.job['uuid']
394         if rdr is None:
395             rdr = crunchstat_summary.reader.LiveLogReader(self.job['uuid'])
396             label = self.job['uuid'] + ' (partial)'
397         super(JobSummarizer, self).__init__(rdr, **kwargs)
398         self.label = label
399         self.existing_constraints = self.job.get('runtime_constraints', {})
400
401
402 class PipelineSummarizer(object):
403     def __init__(self, pipeline_instance_uuid, **kwargs):
404         arv = arvados.api('v1', model=OrderedJsonModel())
405         instance = arv.pipeline_instances().get(
406             uuid=pipeline_instance_uuid).execute()
407         self.summarizers = collections.OrderedDict()
408         for cname, component in instance['components'].iteritems():
409             if 'job' not in component:
410                 logger.warning(
411                     "%s: skipping component with no job assigned", cname)
412             else:
413                 logger.info(
414                     "%s: job %s", cname, component['job']['uuid'])
415                 summarizer = JobSummarizer(component['job'], **kwargs)
416                 summarizer.label = '{} {}'.format(
417                     cname, component['job']['uuid'])
418                 self.summarizers[cname] = summarizer
419         self.label = pipeline_instance_uuid
420
421     def run(self):
422         threads = []
423         for summarizer in self.summarizers.itervalues():
424             t = threading.Thread(target=summarizer.run)
425             t.daemon = True
426             t.start()
427             threads.append(t)
428         for t in threads:
429             t.join()
430
431     def text_report(self):
432         txt = ''
433         for cname, summarizer in self.summarizers.iteritems():
434             txt += '### Summary for {} ({})\n'.format(
435                 cname, summarizer.job['uuid'])
436             txt += summarizer.text_report()
437             txt += '\n'
438         return txt
439
440     def html_report(self):
441         return crunchstat_summary.chartjs.ChartJS(
442             self.label, self.summarizers.itervalues()).html()