10472: add latent support for rolled up stats
[arvados.git] / tools / crunchstat-summary / crunchstat_summary / summarizer.py
1 from __future__ import print_function
2
3 import arvados
4 import collections
5 import crunchstat_summary.chartjs
6 import crunchstat_summary.reader
7 import datetime
8 import functools
9 import itertools
10 import math
11 import re
12 import sys
13 import threading
14 import _strptime
15
16 from arvados.api import OrderedJsonModel
17 from crunchstat_summary import logger
18
19 # Recommend memory constraints that are this multiple of an integral
20 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
21 # that have amounts like 7.5 GiB according to the kernel.)
22 AVAILABLE_RAM_RATIO = 0.95
23
24
25 # Workaround datetime.datetime.strptime() thread-safety bug by calling
26 # it once before starting threads.  https://bugs.python.org/issue7980
27 datetime.datetime.strptime('1999-12-31_23:59:59', '%Y-%m-%d_%H:%M:%S')
28
29
30 class Task(object):
31     def __init__(self):
32         self.starttime = None
33         self.series = collections.defaultdict(list)
34
35
36 class Summarizer(object):
37     def __init__(self, logdata, label=None, skip_child_jobs=False):
38         self._logdata = logdata
39
40         self.label = label
41         self.starttime = None
42         self.finishtime = None
43         self._skip_child_jobs = skip_child_jobs
44
45         # stats_max: {category: {stat: val}}
46         self.stats_max = collections.defaultdict(
47             functools.partial(collections.defaultdict, lambda: 0))
48         # task_stats: {task_id: {category: {stat: val}}}
49         self.task_stats = collections.defaultdict(
50             functools.partial(collections.defaultdict, dict))
51
52         self.seq_to_uuid = {}
53         self.tasks = collections.defaultdict(Task)
54
55         # We won't bother recommending new runtime constraints if the
56         # constraints given when running the job are known to us and
57         # are already suitable.  If applicable, the subclass
58         # constructor will overwrite this with something useful.
59         self.existing_constraints = {}
60
61         logger.debug("%s: logdata %s", self.label, logdata)
62
63     def run_child(self, uuid):
64         if self._skip_child_jobs:
65             logger.warning('%s: omitting stats from child job %s'
66                            ' because --skip-child-jobs flag is on',
67                            self.label, uuid)
68             return
69         logger.debug('%s: follow %s', self.label, uuid)
70         child_summarizer = JobSummarizer(uuid)
71         child_summarizer.stats_max = self.stats_max
72         child_summarizer.task_stats = self.task_stats
73         child_summarizer.tasks = self.tasks
74         child_summarizer.starttime = self.starttime
75         child_summarizer.run()
76         logger.debug('%s: done %s', self.label, uuid)
77
78     def run(self):
79         logger.debug("%s: parsing logdata %s", self.label, self._logdata)
80         for line in self._logdata:
81             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
82             if m:
83                 seq = int(m.group('seq'))
84                 uuid = m.group('task_uuid')
85                 self.seq_to_uuid[seq] = uuid
86                 logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
87                 continue
88
89             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
90             if m:
91                 task_id = self.seq_to_uuid[int(m.group('seq'))]
92                 elapsed = int(m.group('elapsed'))
93                 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
94                 if elapsed > self.stats_max['time']['elapsed']:
95                     self.stats_max['time']['elapsed'] = elapsed
96                 continue
97
98             # Old style job logs only - newer style uses job['components']
99             uuid = None
100             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
101             if m:
102                 self.run_child(m.group('uuid'))
103                 continue
104
105             m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
106             if not m:
107                 continue
108
109             try:
110                 if self.label is None:
111                     self.label = m.group('job_uuid')
112                     logger.debug('%s: using job uuid as label', self.label)
113                 if m.group('category').endswith(':'):
114                     # "stderr crunchstat: notice: ..."
115                     continue
116                 elif m.group('category') in ('error', 'caught'):
117                     continue
118                 elif m.group('category') == 'read':
119                     # "stderr crunchstat: read /proc/1234/net/dev: ..."
120                     # (crunchstat formatting fixed, but old logs still say this)
121                     continue
122                 task_id = self.seq_to_uuid[int(m.group('seq'))]
123                 task = self.tasks[task_id]
124
125                 # Use the first and last crunchstat timestamps as
126                 # approximations of starttime and finishtime.
127                 timestamp = datetime.datetime.strptime(
128                     m.group('timestamp'), '%Y-%m-%d_%H:%M:%S')
129                 if not task.starttime:
130                     task.starttime = timestamp
131                     logger.debug('%s: task %s starttime %s',
132                                  self.label, task_id, timestamp)
133                 task.finishtime = timestamp
134
135                 if not self.starttime:
136                     self.starttime = timestamp
137                 self.finishtime = timestamp
138
139                 this_interval_s = None
140                 for group in ['current', 'interval']:
141                     if not m.group(group):
142                         continue
143                     category = m.group('category')
144                     words = m.group(group).split(' ')
145                     stats = {}
146                     for val, stat in zip(words[::2], words[1::2]):
147                         try:
148                             if '.' in val:
149                                 stats[stat] = float(val)
150                             else:
151                                 stats[stat] = int(val)
152                         except ValueError as e:
153                             raise ValueError(
154                                 'Error parsing {} stat: {!r}'.format(
155                                     stat, e))
156                     if 'user' in stats or 'sys' in stats:
157                         stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
158                     if 'tx' in stats or 'rx' in stats:
159                         stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
160                     for stat, val in stats.iteritems():
161                         if group == 'interval':
162                             if stat == 'seconds':
163                                 this_interval_s = val
164                                 continue
165                             elif not (this_interval_s > 0):
166                                 logger.error(
167                                     "BUG? interval stat given with duration {!r}".
168                                     format(this_interval_s))
169                                 continue
170                             else:
171                                 stat = stat + '__rate'
172                                 val = val / this_interval_s
173                                 if stat in ['user+sys__rate', 'tx+rx__rate']:
174                                     task.series[category, stat].append(
175                                         (timestamp - self.starttime, val))
176                         else:
177                             if stat in ['rss']:
178                                 task.series[category, stat].append(
179                                     (timestamp - self.starttime, val))
180                             self.task_stats[task_id][category][stat] = val
181                         if val > self.stats_max[category][stat]:
182                             self.stats_max[category][stat] = val
183             except Exception as e:
184                 logger.info('Skipping malformed line: {}Error was: {}\n'.format(line, e))
185         logger.debug('%s: done parsing log', self.label)
186
187         # Enabling this will roll up stats for all subjobs into the parent job
188         if False and 'components' in self.job:
189             for cname, component in self.job['components'].iteritems():
190                 self.run_child(component)
191
192         self.job_tot = collections.defaultdict(
193             functools.partial(collections.defaultdict, int))
194         for task_id, task_stat in self.task_stats.iteritems():
195             for category, stat_last in task_stat.iteritems():
196                 for stat, val in stat_last.iteritems():
197                     if stat in ['cpus', 'cache', 'swap', 'rss']:
198                         # meaningless stats like 16 cpu cores x 5 tasks = 80
199                         continue
200                     self.job_tot[category][stat] += val
201         logger.debug('%s: done totals', self.label)
202
203     def long_label(self):
204         label = self.label
205         if self.finishtime:
206             label += ' -- elapsed time '
207             s = (self.finishtime - self.starttime).total_seconds()
208             if s > 86400:
209                 label += '{}d'.format(int(s/86400))
210             if s > 3600:
211                 label += '{}h'.format(int(s/3600) % 24)
212             if s > 60:
213                 label += '{}m'.format(int(s/60) % 60)
214             label += '{}s'.format(int(s) % 60)
215         return label
216
217     def text_report(self):
218         if not self.tasks:
219             return "(no report generated)\n"
220         return "\n".join(itertools.chain(
221             self._text_report_gen(),
222             self._recommend_gen())) + "\n"
223
224     def html_report(self):
225         return crunchstat_summary.chartjs.ChartJS(self.label, [self]).html()
226
227     def _text_report_gen(self):
228         yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
229         for category, stat_max in sorted(self.stats_max.iteritems()):
230             for stat, val in sorted(stat_max.iteritems()):
231                 if stat.endswith('__rate'):
232                     continue
233                 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
234                 val = self._format(val)
235                 tot = self._format(self.job_tot[category].get(stat, '-'))
236                 yield "\t".join([category, stat, str(val), max_rate, tot])
237         for args in (
238                 ('Number of tasks: {}',
239                  len(self.tasks),
240                  None),
241                 ('Max CPU time spent by a single task: {}s',
242                  self.stats_max['cpu']['user+sys'],
243                  None),
244                 ('Max CPU usage in a single interval: {}%',
245                  self.stats_max['cpu']['user+sys__rate'],
246                  lambda x: x * 100),
247                 ('Overall CPU usage: {}%',
248                  self.job_tot['cpu']['user+sys'] /
249                  self.job_tot['time']['elapsed']
250                  if self.job_tot['time']['elapsed'] > 0 else 0,
251                  lambda x: x * 100),
252                 ('Max memory used by a single task: {}GB',
253                  self.stats_max['mem']['rss'],
254                  lambda x: x / 1e9),
255                 ('Max network traffic in a single task: {}GB',
256                  self.stats_max['net:eth0']['tx+rx'] +
257                  self.stats_max['net:keep0']['tx+rx'],
258                  lambda x: x / 1e9),
259                 ('Max network speed in a single interval: {}MB/s',
260                  self.stats_max['net:eth0']['tx+rx__rate'] +
261                  self.stats_max['net:keep0']['tx+rx__rate'],
262                  lambda x: x / 1e6),
263                 ('Keep cache miss rate {}%',
264                  (float(self.job_tot['keepcache']['miss']) /
265                  float(self.job_tot['keepcalls']['get']))
266                  if self.job_tot['keepcalls']['get'] > 0 else 0,
267                  lambda x: x * 100.0),
268                 ('Keep cache utilization {}%',
269                  (float(self.job_tot['blkio:0:0']['read']) /
270                  float(self.job_tot['net:keep0']['rx']))
271                  if self.job_tot['net:keep0']['rx'] > 0 else 0,
272                  lambda x: x * 100.0)):
273             format_string, val, transform = args
274             if val == float('-Inf'):
275                 continue
276             if transform:
277                 val = transform(val)
278             yield "# "+format_string.format(self._format(val))
279
280     def _recommend_gen(self):
281         return itertools.chain(
282             self._recommend_cpu(),
283             self._recommend_ram(),
284             self._recommend_keep_cache())
285
286     def _recommend_cpu(self):
287         """Recommend asking for 4 cores if max CPU usage was 333%"""
288
289         cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
290         if cpu_max_rate == float('-Inf'):
291             logger.warning('%s: no CPU usage data', self.label)
292             return
293         used_cores = max(1, int(math.ceil(cpu_max_rate)))
294         asked_cores = self.existing_constraints.get('min_cores_per_node')
295         if asked_cores is None or used_cores < asked_cores:
296             yield (
297                 '#!! {} max CPU usage was {}% -- '
298                 'try runtime_constraints "min_cores_per_node":{}'
299             ).format(
300                 self.label,
301                 int(math.ceil(cpu_max_rate*100)),
302                 int(used_cores))
303
304     def _recommend_ram(self):
305         """Recommend an economical RAM constraint for this job.
306
307         Nodes that are advertised as "8 gibibytes" actually have what
308         we might call "8 nearlygibs" of memory available for jobs.
309         Here, we calculate a whole number of nearlygibs that would
310         have sufficed to run the job, then recommend requesting a node
311         with that number of nearlygibs (expressed as mebibytes).
312
313         Requesting a node with "nearly 8 gibibytes" is our best hope
314         of getting a node that actually has nearly 8 gibibytes
315         available.  If the node manager is smart enough to account for
316         the discrepancy itself when choosing/creating a node, we'll
317         get an 8 GiB node with nearly 8 GiB available.  Otherwise, the
318         advertised size of the next-size-smaller node (say, 6 GiB)
319         will be too low to satisfy our request, so we will effectively
320         get rounded up to 8 GiB.
321
322         For example, if we need 7500 MiB, we can ask for 7500 MiB, and
323         we will generally get a node that is advertised as "8 GiB" and
324         has at least 7500 MiB available.  However, asking for 8192 MiB
325         would either result in an unnecessarily expensive 12 GiB node
326         (if node manager knows about the discrepancy), or an 8 GiB
327         node which has less than 8192 MiB available and is therefore
328         considered by crunch-dispatch to be too small to meet our
329         constraint.
330
331         When node manager learns how to predict the available memory
332         for each node type such that crunch-dispatch always agrees
333         that a node is big enough to run the job it was brought up
334         for, all this will be unnecessary.  We'll just ask for exactly
335         the memory we want -- even if that happens to be 8192 MiB.
336         """
337
338         used_bytes = self.stats_max['mem']['rss']
339         if used_bytes == float('-Inf'):
340             logger.warning('%s: no memory usage data', self.label)
341             return
342         used_mib = math.ceil(float(used_bytes) / 1048576)
343         asked_mib = self.existing_constraints.get('min_ram_mb_per_node')
344
345         nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
346         if asked_mib is None or (
347                 math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib)):
348             yield (
349                 '#!! {} max RSS was {} MiB -- '
350                 'try runtime_constraints "min_ram_mb_per_node":{}'
351             ).format(
352                 self.label,
353                 int(used_mib),
354                 int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024))
355
356     def _recommend_keep_cache(self):
357         """Recommend increasing keep cache if utilization < 80%"""
358         if self.job_tot['net:keep0']['rx'] == 0:
359             return
360         utilization = (float(self.job_tot['blkio:0:0']['read']) /
361                        float(self.job_tot['net:keep0']['rx']))
362         asked_mib = self.existing_constraints.get('keep_cache_mb_per_task', 256)
363
364         if utilization < 0.8:
365             yield (
366                 '#!! {} Keep cache utilization was {:.2f}% -- '
367                 'try runtime_constraints "keep_cache_mb_per_task":{} (or more)'
368             ).format(
369                 self.label,
370                 utilization * 100.0,
371                 asked_mib*2)
372
373
374     def _format(self, val):
375         """Return a string representation of a stat.
376
377         {:.2f} for floats, default format for everything else."""
378         if isinstance(val, float):
379             return '{:.2f}'.format(val)
380         else:
381             return '{}'.format(val)
382
383
384 class CollectionSummarizer(Summarizer):
385     def __init__(self, collection_id, **kwargs):
386         super(CollectionSummarizer, self).__init__(
387             crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
388         self.label = collection_id
389
390
391 class JobSummarizer(Summarizer):
392     def __init__(self, job, **kwargs):
393         arv = arvados.api('v1')
394         if isinstance(job, basestring):
395             self.job = arv.jobs().get(uuid=job).execute()
396         else:
397             self.job = job
398         rdr = None
399         if self.job.get('log'):
400             try:
401                 rdr = crunchstat_summary.reader.CollectionReader(self.job['log'])
402             except arvados.errors.NotFoundError as e:
403                 logger.warning("Trying event logs after failing to read "
404                                "log collection %s: %s", self.job['log'], e)
405             else:
406                 label = self.job['uuid']
407         if rdr is None:
408             rdr = crunchstat_summary.reader.LiveLogReader(self.job['uuid'])
409             label = self.job['uuid'] + ' (partial)'
410         super(JobSummarizer, self).__init__(rdr, **kwargs)
411         self.label = label
412         self.existing_constraints = self.job.get('runtime_constraints', {})
413
414
415 class PipelineSummarizer(object):
416     def __init__(self, pipeline_instance_uuid, **kwargs):
417         self.arv = arvados.api('v1', model=OrderedJsonModel())
418         instance = self.arv.pipeline_instances().get(
419             uuid=pipeline_instance_uuid).execute()
420         self.summarizers = collections.OrderedDict()
421         for cname, component in instance['components'].iteritems():
422             if 'job' not in component:
423                 logger.warning(
424                     "%s: skipping component with no job assigned", cname)
425             else:
426                 self.summarize_job(cname, component['job'], **kwargs)
427         self.label = pipeline_instance_uuid
428
429     def summarize_job(self, cname, job, **kwargs):
430         uuid = job['uuid']
431         logger.info("%s: job %s", cname, uuid)
432         summarizer = JobSummarizer(job, **kwargs)
433         summarizer.label = '{} {}'.format(cname, uuid)
434         self.summarizers[cname] = summarizer
435         if 'components' in job:
436             for cname, uuid in job['components'].iteritems():
437                 subjob = self.arv.jobs().get(uuid=uuid).execute()
438                 self.summarize_job(cname, subjob, **kwargs)
439
440     def run(self):
441         threads = []
442         for summarizer in self.summarizers.itervalues():
443             t = threading.Thread(target=summarizer.run)
444             t.daemon = True
445             t.start()
446             threads.append(t)
447         for t in threads:
448             t.join()
449
450     def text_report(self):
451         txt = ''
452         for cname, summarizer in self.summarizers.iteritems():
453             txt += '### Summary for {} ({})\n'.format(
454                 cname, summarizer.job['uuid'])
455             txt += summarizer.text_report()
456             txt += '\n'
457         return txt
458
459     def html_report(self):
460         return crunchstat_summary.chartjs.ChartJS(
461             self.label, self.summarizers.itervalues()).html()