10008: Merge branch 'master' into 10008-flaky-token-test
[arvados.git] / tools / crunchstat-summary / crunchstat_summary / summarizer.py
1 from __future__ import print_function
2
3 import arvados
4 import collections
5 import crunchstat_summary.chartjs
6 import crunchstat_summary.reader
7 import datetime
8 import functools
9 import itertools
10 import math
11 import re
12 import sys
13 import threading
14 import _strptime
15
16 from arvados.api import OrderedJsonModel
17 from crunchstat_summary import logger
18
19 # Recommend memory constraints that are this multiple of an integral
20 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
21 # that have amounts like 7.5 GiB according to the kernel.)
22 AVAILABLE_RAM_RATIO = 0.95
23
24
25 # Workaround datetime.datetime.strptime() thread-safety bug by calling
26 # it once before starting threads.  https://bugs.python.org/issue7980
27 datetime.datetime.strptime('1999-12-31_23:59:59', '%Y-%m-%d_%H:%M:%S')
28
29
30 class Task(object):
31     def __init__(self):
32         self.starttime = None
33         self.series = collections.defaultdict(list)
34
35
36 class Summarizer(object):
37     def __init__(self, logdata, label=None, skip_child_jobs=False):
38         self._logdata = logdata
39
40         self.label = label
41         self.starttime = None
42         self.finishtime = None
43         self._skip_child_jobs = skip_child_jobs
44
45         # stats_max: {category: {stat: val}}
46         self.stats_max = collections.defaultdict(
47             functools.partial(collections.defaultdict, lambda: 0))
48         # task_stats: {task_id: {category: {stat: val}}}
49         self.task_stats = collections.defaultdict(
50             functools.partial(collections.defaultdict, dict))
51
52         self.seq_to_uuid = {}
53         self.tasks = collections.defaultdict(Task)
54
55         # We won't bother recommending new runtime constraints if the
56         # constraints given when running the job are known to us and
57         # are already suitable.  If applicable, the subclass
58         # constructor will overwrite this with something useful.
59         self.existing_constraints = {}
60
61         logger.debug("%s: logdata %s", self.label, logdata)
62
63     def run(self):
64         logger.debug("%s: parsing logdata %s", self.label, self._logdata)
65         for line in self._logdata:
66             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
67             if m:
68                 seq = int(m.group('seq'))
69                 uuid = m.group('task_uuid')
70                 self.seq_to_uuid[seq] = uuid
71                 logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
72                 continue
73
74             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
75             if m:
76                 task_id = self.seq_to_uuid[int(m.group('seq'))]
77                 elapsed = int(m.group('elapsed'))
78                 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
79                 if elapsed > self.stats_max['time']['elapsed']:
80                     self.stats_max['time']['elapsed'] = elapsed
81                 continue
82
83             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
84             if m:
85                 uuid = m.group('uuid')
86                 if self._skip_child_jobs:
87                     logger.warning('%s: omitting stats from child job %s'
88                                    ' because --skip-child-jobs flag is on',
89                                    self.label, uuid)
90                     continue
91                 logger.debug('%s: follow %s', self.label, uuid)
92                 child_summarizer = JobSummarizer(uuid)
93                 child_summarizer.stats_max = self.stats_max
94                 child_summarizer.task_stats = self.task_stats
95                 child_summarizer.tasks = self.tasks
96                 child_summarizer.starttime = self.starttime
97                 child_summarizer.run()
98                 logger.debug('%s: done %s', self.label, uuid)
99                 continue
100
101             m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
102             if not m:
103                 continue
104
105             try:
106                 if self.label is None:
107                     self.label = m.group('job_uuid')
108                     logger.debug('%s: using job uuid as label', self.label)
109                 if m.group('category').endswith(':'):
110                     # "stderr crunchstat: notice: ..."
111                     continue
112                 elif m.group('category') in ('error', 'caught'):
113                     continue
114                 elif m.group('category') == 'read':
115                     # "stderr crunchstat: read /proc/1234/net/dev: ..."
116                     # (crunchstat formatting fixed, but old logs still say this)
117                     continue
118                 task_id = self.seq_to_uuid[int(m.group('seq'))]
119                 task = self.tasks[task_id]
120
121                 # Use the first and last crunchstat timestamps as
122                 # approximations of starttime and finishtime.
123                 timestamp = datetime.datetime.strptime(
124                     m.group('timestamp'), '%Y-%m-%d_%H:%M:%S')
125                 if not task.starttime:
126                     task.starttime = timestamp
127                     logger.debug('%s: task %s starttime %s',
128                                  self.label, task_id, timestamp)
129                 task.finishtime = timestamp
130
131                 if not self.starttime:
132                     self.starttime = timestamp
133                 self.finishtime = timestamp
134
135                 this_interval_s = None
136                 for group in ['current', 'interval']:
137                     if not m.group(group):
138                         continue
139                     category = m.group('category')
140                     words = m.group(group).split(' ')
141                     stats = {}
142                     for val, stat in zip(words[::2], words[1::2]):
143                         try:
144                             if '.' in val:
145                                 stats[stat] = float(val)
146                             else:
147                                 stats[stat] = int(val)
148                         except ValueError as e:
149                             raise ValueError(
150                                 'Error parsing {} stat: {!r}'.format(
151                                     stat, e))
152                     if 'user' in stats or 'sys' in stats:
153                         stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
154                     if 'tx' in stats or 'rx' in stats:
155                         stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
156                     for stat, val in stats.iteritems():
157                         if group == 'interval':
158                             if stat == 'seconds':
159                                 this_interval_s = val
160                                 continue
161                             elif not (this_interval_s > 0):
162                                 logger.error(
163                                     "BUG? interval stat given with duration {!r}".
164                                     format(this_interval_s))
165                                 continue
166                             else:
167                                 stat = stat + '__rate'
168                                 val = val / this_interval_s
169                                 if stat in ['user+sys__rate', 'tx+rx__rate']:
170                                     task.series[category, stat].append(
171                                         (timestamp - self.starttime, val))
172                         else:
173                             if stat in ['rss']:
174                                 task.series[category, stat].append(
175                                     (timestamp - self.starttime, val))
176                             self.task_stats[task_id][category][stat] = val
177                         if val > self.stats_max[category][stat]:
178                             self.stats_max[category][stat] = val
179             except Exception as e:
180                 logger.info('Skipping malformed line: {}Error was: {}\n'.format(line, e))
181         logger.debug('%s: done parsing', self.label)
182
183         self.job_tot = collections.defaultdict(
184             functools.partial(collections.defaultdict, int))
185         for task_id, task_stat in self.task_stats.iteritems():
186             for category, stat_last in task_stat.iteritems():
187                 for stat, val in stat_last.iteritems():
188                     if stat in ['cpus', 'cache', 'swap', 'rss']:
189                         # meaningless stats like 16 cpu cores x 5 tasks = 80
190                         continue
191                     self.job_tot[category][stat] += val
192         logger.debug('%s: done totals', self.label)
193
194     def long_label(self):
195         label = self.label
196         if self.finishtime:
197             label += ' -- elapsed time '
198             s = (self.finishtime - self.starttime).total_seconds()
199             if s > 86400:
200                 label += '{}d'.format(int(s/86400))
201             if s > 3600:
202                 label += '{}h'.format(int(s/3600) % 24)
203             if s > 60:
204                 label += '{}m'.format(int(s/60) % 60)
205             label += '{}s'.format(int(s) % 60)
206         return label
207
208     def text_report(self):
209         if not self.tasks:
210             return "(no report generated)\n"
211         return "\n".join(itertools.chain(
212             self._text_report_gen(),
213             self._recommend_gen())) + "\n"
214
215     def html_report(self):
216         return crunchstat_summary.chartjs.ChartJS(self.label, [self]).html()
217
218     def _text_report_gen(self):
219         yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
220         for category, stat_max in sorted(self.stats_max.iteritems()):
221             for stat, val in sorted(stat_max.iteritems()):
222                 if stat.endswith('__rate'):
223                     continue
224                 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
225                 val = self._format(val)
226                 tot = self._format(self.job_tot[category].get(stat, '-'))
227                 yield "\t".join([category, stat, str(val), max_rate, tot])
228         for args in (
229                 ('Number of tasks: {}',
230                  len(self.tasks),
231                  None),
232                 ('Max CPU time spent by a single task: {}s',
233                  self.stats_max['cpu']['user+sys'],
234                  None),
235                 ('Max CPU usage in a single interval: {}%',
236                  self.stats_max['cpu']['user+sys__rate'],
237                  lambda x: x * 100),
238                 ('Overall CPU usage: {}%',
239                  self.job_tot['cpu']['user+sys'] /
240                  self.job_tot['time']['elapsed']
241                  if self.job_tot['time']['elapsed'] > 0 else 0,
242                  lambda x: x * 100),
243                 ('Max memory used by a single task: {}GB',
244                  self.stats_max['mem']['rss'],
245                  lambda x: x / 1e9),
246                 ('Max network traffic in a single task: {}GB',
247                  self.stats_max['net:eth0']['tx+rx'] +
248                  self.stats_max['net:keep0']['tx+rx'],
249                  lambda x: x / 1e9),
250                 ('Max network speed in a single interval: {}MB/s',
251                  self.stats_max['net:eth0']['tx+rx__rate'] +
252                  self.stats_max['net:keep0']['tx+rx__rate'],
253                  lambda x: x / 1e6),
254                 ('Keep cache miss rate {}%',
255                  (float(self.job_tot['keepcache']['miss']) /
256                  float(self.job_tot['keepcalls']['get']))
257                  if self.job_tot['keepcalls']['get'] > 0 else 0,
258                  lambda x: x * 100.0),
259                 ('Keep cache utilization {}%',
260                  (float(self.job_tot['blkio:0:0']['read']) /
261                  float(self.job_tot['net:keep0']['rx']))
262                  if self.job_tot['net:keep0']['rx'] > 0 else 0,
263                  lambda x: x * 100.0)):
264             format_string, val, transform = args
265             if val == float('-Inf'):
266                 continue
267             if transform:
268                 val = transform(val)
269             yield "# "+format_string.format(self._format(val))
270
271     def _recommend_gen(self):
272         return itertools.chain(
273             self._recommend_cpu(),
274             self._recommend_ram(),
275             self._recommend_keep_cache())
276
277     def _recommend_cpu(self):
278         """Recommend asking for 4 cores if max CPU usage was 333%"""
279
280         cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
281         if cpu_max_rate == float('-Inf'):
282             logger.warning('%s: no CPU usage data', self.label)
283             return
284         used_cores = max(1, int(math.ceil(cpu_max_rate)))
285         asked_cores = self.existing_constraints.get('min_cores_per_node')
286         if asked_cores is None or used_cores < asked_cores:
287             yield (
288                 '#!! {} max CPU usage was {}% -- '
289                 'try runtime_constraints "min_cores_per_node":{}'
290             ).format(
291                 self.label,
292                 int(math.ceil(cpu_max_rate*100)),
293                 int(used_cores))
294
295     def _recommend_ram(self):
296         """Recommend an economical RAM constraint for this job.
297
298         Nodes that are advertised as "8 gibibytes" actually have what
299         we might call "8 nearlygibs" of memory available for jobs.
300         Here, we calculate a whole number of nearlygibs that would
301         have sufficed to run the job, then recommend requesting a node
302         with that number of nearlygibs (expressed as mebibytes).
303
304         Requesting a node with "nearly 8 gibibytes" is our best hope
305         of getting a node that actually has nearly 8 gibibytes
306         available.  If the node manager is smart enough to account for
307         the discrepancy itself when choosing/creating a node, we'll
308         get an 8 GiB node with nearly 8 GiB available.  Otherwise, the
309         advertised size of the next-size-smaller node (say, 6 GiB)
310         will be too low to satisfy our request, so we will effectively
311         get rounded up to 8 GiB.
312
313         For example, if we need 7500 MiB, we can ask for 7500 MiB, and
314         we will generally get a node that is advertised as "8 GiB" and
315         has at least 7500 MiB available.  However, asking for 8192 MiB
316         would either result in an unnecessarily expensive 12 GiB node
317         (if node manager knows about the discrepancy), or an 8 GiB
318         node which has less than 8192 MiB available and is therefore
319         considered by crunch-dispatch to be too small to meet our
320         constraint.
321
322         When node manager learns how to predict the available memory
323         for each node type such that crunch-dispatch always agrees
324         that a node is big enough to run the job it was brought up
325         for, all this will be unnecessary.  We'll just ask for exactly
326         the memory we want -- even if that happens to be 8192 MiB.
327         """
328
329         used_bytes = self.stats_max['mem']['rss']
330         if used_bytes == float('-Inf'):
331             logger.warning('%s: no memory usage data', self.label)
332             return
333         used_mib = math.ceil(float(used_bytes) / 1048576)
334         asked_mib = self.existing_constraints.get('min_ram_mb_per_node')
335
336         nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
337         if asked_mib is None or (
338                 math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib)):
339             yield (
340                 '#!! {} max RSS was {} MiB -- '
341                 'try runtime_constraints "min_ram_mb_per_node":{}'
342             ).format(
343                 self.label,
344                 int(used_mib),
345                 int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024))
346
347     def _recommend_keep_cache(self):
348         """Recommend increasing keep cache if utilization < 80%"""
349         if self.job_tot['net:keep0']['rx'] == 0:
350             return
351         utilization = (float(self.job_tot['blkio:0:0']['read']) /
352                        float(self.job_tot['net:keep0']['rx']))
353         asked_mib = self.existing_constraints.get('keep_cache_mb_per_task', 256)
354
355         if utilization < 0.8:
356             yield (
357                 '#!! {} Keep cache utilization was {:.2f}% -- '
358                 'try runtime_constraints "keep_cache_mb_per_task":{} (or more)'
359             ).format(
360                 self.label,
361                 utilization * 100.0,
362                 asked_mib*2)
363
364
365     def _format(self, val):
366         """Return a string representation of a stat.
367
368         {:.2f} for floats, default format for everything else."""
369         if isinstance(val, float):
370             return '{:.2f}'.format(val)
371         else:
372             return '{}'.format(val)
373
374
375 class CollectionSummarizer(Summarizer):
376     def __init__(self, collection_id, **kwargs):
377         super(CollectionSummarizer, self).__init__(
378             crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
379         self.label = collection_id
380
381
382 class JobSummarizer(Summarizer):
383     def __init__(self, job, **kwargs):
384         arv = arvados.api('v1')
385         if isinstance(job, basestring):
386             self.job = arv.jobs().get(uuid=job).execute()
387         else:
388             self.job = job
389         rdr = None
390         if self.job.get('log'):
391             try:
392                 rdr = crunchstat_summary.reader.CollectionReader(self.job['log'])
393             except arvados.errors.NotFoundError as e:
394                 logger.warning("Trying event logs after failing to read "
395                                "log collection %s: %s", self.job['log'], e)
396             else:
397                 label = self.job['uuid']
398         if rdr is None:
399             rdr = crunchstat_summary.reader.LiveLogReader(self.job['uuid'])
400             label = self.job['uuid'] + ' (partial)'
401         super(JobSummarizer, self).__init__(rdr, **kwargs)
402         self.label = label
403         self.existing_constraints = self.job.get('runtime_constraints', {})
404
405
406 class PipelineSummarizer(object):
407     def __init__(self, pipeline_instance_uuid, **kwargs):
408         arv = arvados.api('v1', model=OrderedJsonModel())
409         instance = arv.pipeline_instances().get(
410             uuid=pipeline_instance_uuid).execute()
411         self.summarizers = collections.OrderedDict()
412         for cname, component in instance['components'].iteritems():
413             if 'job' not in component:
414                 logger.warning(
415                     "%s: skipping component with no job assigned", cname)
416             else:
417                 logger.info(
418                     "%s: job %s", cname, component['job']['uuid'])
419                 summarizer = JobSummarizer(component['job'], **kwargs)
420                 summarizer.label = '{} {}'.format(
421                     cname, component['job']['uuid'])
422                 self.summarizers[cname] = summarizer
423         self.label = pipeline_instance_uuid
424
425     def run(self):
426         threads = []
427         for summarizer in self.summarizers.itervalues():
428             t = threading.Thread(target=summarizer.run)
429             t.daemon = True
430             t.start()
431             threads.append(t)
432         for t in threads:
433             t.join()
434
435     def text_report(self):
436         txt = ''
437         for cname, summarizer in self.summarizers.iteritems():
438             txt += '### Summary for {} ({})\n'.format(
439                 cname, summarizer.job['uuid'])
440             txt += summarizer.text_report()
441             txt += '\n'
442         return txt
443
444     def html_report(self):
445         return crunchstat_summary.chartjs.ChartJS(
446             self.label, self.summarizers.itervalues()).html()