Merge branch '8341-crunchstat-job-time-axis'
[arvados.git] / tools / crunchstat-summary / crunchstat_summary / summarizer.py
1 from __future__ import print_function
2
3 import arvados
4 import collections
5 import crunchstat_summary.chartjs
6 import crunchstat_summary.reader
7 import datetime
8 import functools
9 import itertools
10 import math
11 import re
12 import sys
13 import threading
14
15 from arvados.api import OrderedJsonModel
16 from crunchstat_summary import logger
17
18 # Recommend memory constraints that are this multiple of an integral
19 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
20 # that have amounts like 7.5 GiB according to the kernel.)
21 AVAILABLE_RAM_RATIO = 0.95
22
23
24 class Task(object):
25     def __init__(self):
26         self.starttime = None
27         self.series = collections.defaultdict(list)
28
29
30 class Summarizer(object):
31     def __init__(self, logdata, label=None, skip_child_jobs=False):
32         self._logdata = logdata
33
34         self.label = label
35         self.starttime = None
36         self.finishtime = None
37         self._skip_child_jobs = skip_child_jobs
38
39         # stats_max: {category: {stat: val}}
40         self.stats_max = collections.defaultdict(
41             functools.partial(collections.defaultdict, lambda: 0))
42         # task_stats: {task_id: {category: {stat: val}}}
43         self.task_stats = collections.defaultdict(
44             functools.partial(collections.defaultdict, dict))
45
46         self.seq_to_uuid = {}
47         self.tasks = collections.defaultdict(Task)
48
49         # We won't bother recommending new runtime constraints if the
50         # constraints given when running the job are known to us and
51         # are already suitable.  If applicable, the subclass
52         # constructor will overwrite this with something useful.
53         self.existing_constraints = {}
54
55         logger.debug("%s: logdata %s", self.label, logdata)
56
57     def run(self):
58         logger.debug("%s: parsing logdata %s", self.label, self._logdata)
59         for line in self._logdata:
60             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
61             if m:
62                 seq = int(m.group('seq'))
63                 uuid = m.group('task_uuid')
64                 self.seq_to_uuid[seq] = uuid
65                 logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
66                 continue
67
68             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) success in (?P<elapsed>\d+) seconds', line)
69             if m:
70                 task_id = self.seq_to_uuid[int(m.group('seq'))]
71                 elapsed = int(m.group('elapsed'))
72                 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
73                 if elapsed > self.stats_max['time']['elapsed']:
74                     self.stats_max['time']['elapsed'] = elapsed
75                 continue
76
77             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
78             if m:
79                 uuid = m.group('uuid')
80                 if self._skip_child_jobs:
81                     logger.warning('%s: omitting stats from child job %s'
82                                    ' because --skip-child-jobs flag is on',
83                                    self.label, uuid)
84                     continue
85                 logger.debug('%s: follow %s', self.label, uuid)
86                 child_summarizer = JobSummarizer(uuid)
87                 child_summarizer.stats_max = self.stats_max
88                 child_summarizer.task_stats = self.task_stats
89                 child_summarizer.tasks = self.tasks
90                 child_summarizer.starttime = self.starttime
91                 child_summarizer.run()
92                 logger.debug('%s: done %s', self.label, uuid)
93                 continue
94
95             m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
96             if not m:
97                 continue
98
99             if self.label is None:
100                 self.label = m.group('job_uuid')
101                 logger.debug('%s: using job uuid as label', self.label)
102             if m.group('category').endswith(':'):
103                 # "stderr crunchstat: notice: ..."
104                 continue
105             elif m.group('category') in ('error', 'caught'):
106                 continue
107             elif m.group('category') == 'read':
108                 # "stderr crunchstat: read /proc/1234/net/dev: ..."
109                 # (crunchstat formatting fixed, but old logs still say this)
110                 continue
111             task_id = self.seq_to_uuid[int(m.group('seq'))]
112             task = self.tasks[task_id]
113
114             # Use the first and last crunchstat timestamps as
115             # approximations of starttime and finishtime.
116             timestamp = datetime.datetime.strptime(
117                 m.group('timestamp'), '%Y-%m-%d_%H:%M:%S')
118             if not task.starttime:
119                 task.starttime = timestamp
120                 logger.debug('%s: task %s starttime %s',
121                              self.label, task_id, timestamp)
122             task.finishtime = timestamp
123
124             if not self.starttime:
125                 self.starttime = timestamp
126             self.finishtime = timestamp
127
128             this_interval_s = None
129             for group in ['current', 'interval']:
130                 if not m.group(group):
131                     continue
132                 category = m.group('category')
133                 words = m.group(group).split(' ')
134                 stats = {}
135                 for val, stat in zip(words[::2], words[1::2]):
136                     try:
137                         if '.' in val:
138                             stats[stat] = float(val)
139                         else:
140                             stats[stat] = int(val)
141                     except ValueError as e:
142                         raise ValueError(
143                             'Error parsing {} stat in "{}": {!r}'.format(
144                                 stat, line, e))
145                 if 'user' in stats or 'sys' in stats:
146                     stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
147                 if 'tx' in stats or 'rx' in stats:
148                     stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
149                 for stat, val in stats.iteritems():
150                     if group == 'interval':
151                         if stat == 'seconds':
152                             this_interval_s = val
153                             continue
154                         elif not (this_interval_s > 0):
155                             logger.error(
156                                 "BUG? interval stat given with duration {!r}".
157                                 format(this_interval_s))
158                             continue
159                         else:
160                             stat = stat + '__rate'
161                             val = val / this_interval_s
162                             if stat in ['user+sys__rate', 'tx+rx__rate']:
163                                 task.series[category, stat].append(
164                                     (timestamp - self.starttime, val))
165                     else:
166                         if stat in ['rss']:
167                             task.series[category, stat].append(
168                                 (timestamp - self.starttime, val))
169                         self.task_stats[task_id][category][stat] = val
170                     if val > self.stats_max[category][stat]:
171                         self.stats_max[category][stat] = val
172         logger.debug('%s: done parsing', self.label)
173
174         self.job_tot = collections.defaultdict(
175             functools.partial(collections.defaultdict, int))
176         for task_id, task_stat in self.task_stats.iteritems():
177             for category, stat_last in task_stat.iteritems():
178                 for stat, val in stat_last.iteritems():
179                     if stat in ['cpus', 'cache', 'swap', 'rss']:
180                         # meaningless stats like 16 cpu cores x 5 tasks = 80
181                         continue
182                     self.job_tot[category][stat] += val
183         logger.debug('%s: done totals', self.label)
184
185     def long_label(self):
186         label = self.label
187         if self.finishtime:
188             label += ' -- elapsed time '
189             s = (self.finishtime - self.starttime).total_seconds()
190             if s > 86400:
191                 label += '{}d'.format(int(s/86400))
192             if s > 3600:
193                 label += '{}h'.format(int(s/3600) % 24)
194             if s > 60:
195                 label += '{}m'.format(int(s/60) % 60)
196             label += '{}s'.format(int(s) % 60)
197         return label
198
199     def text_report(self):
200         return "\n".join(itertools.chain(
201             self._text_report_gen(),
202             self._recommend_gen())) + "\n"
203
204     def html_report(self):
205         return crunchstat_summary.chartjs.ChartJS(self.label, [self]).html()
206
207     def _text_report_gen(self):
208         yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
209         for category, stat_max in sorted(self.stats_max.iteritems()):
210             for stat, val in sorted(stat_max.iteritems()):
211                 if stat.endswith('__rate'):
212                     continue
213                 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
214                 val = self._format(val)
215                 tot = self._format(self.job_tot[category].get(stat, '-'))
216                 yield "\t".join([category, stat, str(val), max_rate, tot])
217         for args in (
218                 ('Number of tasks: {}',
219                  len(self.tasks),
220                  None),
221                 ('Max CPU time spent by a single task: {}s',
222                  self.stats_max['cpu']['user+sys'],
223                  None),
224                 ('Max CPU usage in a single interval: {}%',
225                  self.stats_max['cpu']['user+sys__rate'],
226                  lambda x: x * 100),
227                 ('Overall CPU usage: {}%',
228                  self.job_tot['cpu']['user+sys'] /
229                  self.job_tot['time']['elapsed'],
230                  lambda x: x * 100),
231                 ('Max memory used by a single task: {}GB',
232                  self.stats_max['mem']['rss'],
233                  lambda x: x / 1e9),
234                 ('Max network traffic in a single task: {}GB',
235                  self.stats_max['net:eth0']['tx+rx'] +
236                  self.stats_max['net:keep0']['tx+rx'],
237                  lambda x: x / 1e9),
238                 ('Max network speed in a single interval: {}MB/s',
239                  self.stats_max['net:eth0']['tx+rx__rate'] +
240                  self.stats_max['net:keep0']['tx+rx__rate'],
241                  lambda x: x / 1e6)):
242             format_string, val, transform = args
243             if val == float('-Inf'):
244                 continue
245             if transform:
246                 val = transform(val)
247             yield "# "+format_string.format(self._format(val))
248
249     def _recommend_gen(self):
250         return itertools.chain(
251             self._recommend_cpu(),
252             self._recommend_ram())
253
254     def _recommend_cpu(self):
255         """Recommend asking for 4 cores if max CPU usage was 333%"""
256
257         cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
258         if cpu_max_rate == float('-Inf'):
259             logger.warning('%s: no CPU usage data', self.label)
260             return
261         used_cores = max(1, int(math.ceil(cpu_max_rate)))
262         asked_cores = self.existing_constraints.get('min_cores_per_node')
263         if asked_cores is None or used_cores < asked_cores:
264             yield (
265                 '#!! {} max CPU usage was {}% -- '
266                 'try runtime_constraints "min_cores_per_node":{}'
267             ).format(
268                 self.label,
269                 int(math.ceil(cpu_max_rate*100)),
270                 int(used_cores))
271
272     def _recommend_ram(self):
273         """Recommend an economical RAM constraint for this job.
274
275         Nodes that are advertised as "8 gibibytes" actually have what
276         we might call "8 nearlygibs" of memory available for jobs.
277         Here, we calculate a whole number of nearlygibs that would
278         have sufficed to run the job, then recommend requesting a node
279         with that number of nearlygibs (expressed as mebibytes).
280
281         Requesting a node with "nearly 8 gibibytes" is our best hope
282         of getting a node that actually has nearly 8 gibibytes
283         available.  If the node manager is smart enough to account for
284         the discrepancy itself when choosing/creating a node, we'll
285         get an 8 GiB node with nearly 8 GiB available.  Otherwise, the
286         advertised size of the next-size-smaller node (say, 6 GiB)
287         will be too low to satisfy our request, so we will effectively
288         get rounded up to 8 GiB.
289
290         For example, if we need 7500 MiB, we can ask for 7500 MiB, and
291         we will generally get a node that is advertised as "8 GiB" and
292         has at least 7500 MiB available.  However, asking for 8192 MiB
293         would either result in an unnecessarily expensive 12 GiB node
294         (if node manager knows about the discrepancy), or an 8 GiB
295         node which has less than 8192 MiB available and is therefore
296         considered by crunch-dispatch to be too small to meet our
297         constraint.
298
299         When node manager learns how to predict the available memory
300         for each node type such that crunch-dispatch always agrees
301         that a node is big enough to run the job it was brought up
302         for, all this will be unnecessary.  We'll just ask for exactly
303         the memory we want -- even if that happens to be 8192 MiB.
304         """
305
306         used_bytes = self.stats_max['mem']['rss']
307         if used_bytes == float('-Inf'):
308             logger.warning('%s: no memory usage data', self.label)
309             return
310         used_mib = math.ceil(float(used_bytes) / 1048576)
311         asked_mib = self.existing_constraints.get('min_ram_mb_per_node')
312
313         nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
314         if asked_mib is None or (
315                 math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib)):
316             yield (
317                 '#!! {} max RSS was {} MiB -- '
318                 'try runtime_constraints "min_ram_mb_per_node":{}'
319             ).format(
320                 self.label,
321                 int(used_mib),
322                 int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024))
323
324     def _format(self, val):
325         """Return a string representation of a stat.
326
327         {:.2f} for floats, default format for everything else."""
328         if isinstance(val, float):
329             return '{:.2f}'.format(val)
330         else:
331             return '{}'.format(val)
332
333
334 class CollectionSummarizer(Summarizer):
335     def __init__(self, collection_id, **kwargs):
336         super(CollectionSummarizer, self).__init__(
337             crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
338         self.label = collection_id
339
340
341 class JobSummarizer(Summarizer):
342     def __init__(self, job, **kwargs):
343         arv = arvados.api('v1')
344         if isinstance(job, basestring):
345             self.job = arv.jobs().get(uuid=job).execute()
346         else:
347             self.job = job
348         rdr = None
349         if self.job['log']:
350             try:
351                 rdr = crunchstat_summary.reader.CollectionReader(self.job['log'])
352             except arvados.errors.NotFoundError as e:
353                 logger.warning("Trying event logs after failing to read "
354                                "log collection %s: %s", self.job['log'], e)
355             else:
356                 label = self.job['uuid']
357         if rdr is None:
358             rdr = crunchstat_summary.reader.LiveLogReader(self.job['uuid'])
359             label = self.job['uuid'] + ' (partial)'
360         super(JobSummarizer, self).__init__(rdr, **kwargs)
361         self.label = label
362         self.existing_constraints = self.job.get('runtime_constraints', {})
363
364
365 class PipelineSummarizer(object):
366     def __init__(self, pipeline_instance_uuid, **kwargs):
367         arv = arvados.api('v1', model=OrderedJsonModel())
368         instance = arv.pipeline_instances().get(
369             uuid=pipeline_instance_uuid).execute()
370         self.summarizers = collections.OrderedDict()
371         for cname, component in instance['components'].iteritems():
372             if 'job' not in component:
373                 logger.warning(
374                     "%s: skipping component with no job assigned", cname)
375             elif component['job'].get('log') is None:
376                 logger.warning(
377                     "%s: skipping job %s with no log available",
378                     cname, component['job'].get('uuid'))
379             else:
380                 logger.info(
381                     "%s: logdata %s", cname, component['job']['log'])
382                 summarizer = JobSummarizer(component['job'], **kwargs)
383                 summarizer.label = cname
384                 self.summarizers[cname] = summarizer
385         self.label = pipeline_instance_uuid
386
387     def run(self):
388         threads = []
389         for summarizer in self.summarizers.itervalues():
390             t = threading.Thread(target=summarizer.run)
391             t.daemon = True
392             t.start()
393             threads.append(t)
394         for t in threads:
395             t.join()
396
397     def text_report(self):
398         txt = ''
399         for cname, summarizer in self.summarizers.iteritems():
400             txt += '### Summary for {} ({})\n'.format(
401                 cname, summarizer.job['uuid'])
402             txt += summarizer.text_report()
403             txt += '\n'
404         return txt
405
406     def html_report(self):
407         return crunchstat_summary.chartjs.ChartJS(
408             self.label, self.summarizers.itervalues()).html()