8341: In pipeline mode, process all jobs concurrently.
[arvados.git] / tools / crunchstat-summary / crunchstat_summary / summarizer.py
1 from __future__ import print_function
2
3 import arvados
4 import collections
5 import crunchstat_summary.chartjs
6 import crunchstat_summary.reader
7 import datetime
8 import functools
9 import itertools
10 import math
11 import re
12 import sys
13 import threading
14
15 from arvados.api import OrderedJsonModel
16 from crunchstat_summary import logger
17
18 # Recommend memory constraints that are this multiple of an integral
19 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
20 # that have amounts like 7.5 GiB according to the kernel.)
21 AVAILABLE_RAM_RATIO = 0.95
22
23
24 class Task(object):
25     def __init__(self):
26         self.starttime = None
27         self.series = collections.defaultdict(list)
28
29
30 class Summarizer(object):
31     def __init__(self, logdata, label=None, skip_child_jobs=False):
32         self._logdata = logdata
33
34         self.label = label
35         self.starttime = None
36         self.finishtime = None
37         self._skip_child_jobs = skip_child_jobs
38
39         # stats_max: {category: {stat: val}}
40         self.stats_max = collections.defaultdict(
41             functools.partial(collections.defaultdict, lambda: 0))
42         # task_stats: {task_id: {category: {stat: val}}}
43         self.task_stats = collections.defaultdict(
44             functools.partial(collections.defaultdict, dict))
45
46         self.seq_to_uuid = {}
47         self.tasks = collections.defaultdict(Task)
48
49         # We won't bother recommending new runtime constraints if the
50         # constraints given when running the job are known to us and
51         # are already suitable.  If applicable, the subclass
52         # constructor will overwrite this with something useful.
53         self.existing_constraints = {}
54
55         logger.debug("%s: logdata %s", self.label, logdata)
56
57     def run(self):
58         logger.debug("%s: parsing logdata %s", self.label, self._logdata)
59         for line in self._logdata:
60             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
61             if m:
62                 seq = int(m.group('seq'))
63                 uuid = m.group('task_uuid')
64                 self.seq_to_uuid[seq] = uuid
65                 logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
66                 continue
67
68             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) success in (?P<elapsed>\d+) seconds', line)
69             if m:
70                 task_id = self.seq_to_uuid[int(m.group('seq'))]
71                 elapsed = int(m.group('elapsed'))
72                 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
73                 if elapsed > self.stats_max['time']['elapsed']:
74                     self.stats_max['time']['elapsed'] = elapsed
75                 continue
76
77             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
78             if m:
79                 uuid = m.group('uuid')
80                 if self._skip_child_jobs:
81                     logger.warning('%s: omitting stats from child job %s'
82                                    ' because --skip-child-jobs flag is on',
83                                    self.label, uuid)
84                     continue
85                 logger.debug('%s: follow %s', self.label, uuid)
86                 child_summarizer = JobSummarizer(uuid)
87                 child_summarizer.stats_max = self.stats_max
88                 child_summarizer.task_stats = self.task_stats
89                 child_summarizer.tasks = self.tasks
90                 child_summarizer.starttime = self.starttime
91                 child_summarizer.run()
92                 logger.debug('%s: done %s', self.label, uuid)
93                 continue
94
95             m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
96             if not m:
97                 continue
98
99             if self.label is None:
100                 self.label = m.group('job_uuid')
101                 logger.debug('%s: using job uuid as label', self.label)
102             if m.group('category').endswith(':'):
103                 # "stderr crunchstat: notice: ..."
104                 continue
105             elif m.group('category') in ('error', 'caught'):
106                 continue
107             elif m.group('category') == 'read':
108                 # "stderr crunchstat: read /proc/1234/net/dev: ..."
109                 # (crunchstat formatting fixed, but old logs still say this)
110                 continue
111             task_id = self.seq_to_uuid[int(m.group('seq'))]
112             task = self.tasks[task_id]
113
114             # Use the first and last crunchstat timestamps as
115             # approximations of starttime and finishtime.
116             timestamp = datetime.datetime.strptime(
117                 m.group('timestamp'), '%Y-%m-%d_%H:%M:%S')
118             if not task.starttime:
119                 task.starttime = timestamp
120                 logger.debug('%s: task %s starttime %s',
121                              self.label, task_id, timestamp)
122             task.finishtime = timestamp
123
124             if not self.starttime:
125                 self.starttime = timestamp
126             self.finishtime = timestamp
127
128             this_interval_s = None
129             for group in ['current', 'interval']:
130                 if not m.group(group):
131                     continue
132                 category = m.group('category')
133                 words = m.group(group).split(' ')
134                 stats = {}
135                 for val, stat in zip(words[::2], words[1::2]):
136                     try:
137                         if '.' in val:
138                             stats[stat] = float(val)
139                         else:
140                             stats[stat] = int(val)
141                     except ValueError as e:
142                         raise ValueError(
143                             'Error parsing {} stat in "{}": {!r}'.format(
144                                 stat, line, e))
145                 if 'user' in stats or 'sys' in stats:
146                     stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
147                 if 'tx' in stats or 'rx' in stats:
148                     stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
149                 for stat, val in stats.iteritems():
150                     if group == 'interval':
151                         if stat == 'seconds':
152                             this_interval_s = val
153                             continue
154                         elif not (this_interval_s > 0):
155                             logger.error(
156                                 "BUG? interval stat given with duration {!r}".
157                                 format(this_interval_s))
158                             continue
159                         else:
160                             stat = stat + '__rate'
161                             val = val / this_interval_s
162                             if stat in ['user+sys__rate', 'tx+rx__rate']:
163                                 task.series[category, stat].append(
164                                     (timestamp - self.starttime, val))
165                     else:
166                         if stat in ['rss']:
167                             task.series[category, stat].append(
168                                 (timestamp - self.starttime, val))
169                         self.task_stats[task_id][category][stat] = val
170                     if val > self.stats_max[category][stat]:
171                         self.stats_max[category][stat] = val
172         logger.debug('%s: done parsing', self.label)
173
174         self.job_tot = collections.defaultdict(
175             functools.partial(collections.defaultdict, int))
176         for task_id, task_stat in self.task_stats.iteritems():
177             for category, stat_last in task_stat.iteritems():
178                 for stat, val in stat_last.iteritems():
179                     if stat in ['cpus', 'cache', 'swap', 'rss']:
180                         # meaningless stats like 16 cpu cores x 5 tasks = 80
181                         continue
182                     self.job_tot[category][stat] += val
183         logger.debug('%s: done totals', self.label)
184
185     def long_label(self):
186         label = self.label
187         if self.finishtime:
188             label += ' -- elapsed time '
189             s = (self.finishtime - self.starttime).total_seconds()
190             if s > 86400:
191                 label += '{}d'.format(int(s/86400))
192             if s > 3600:
193                 label += '{}h'.format(int(s/3600) % 24)
194             if s > 60:
195                 label += '{}m'.format(int(s/60) % 60)
196             label += '{}s'.format(int(s) % 60)
197         return label
198
199     def text_report(self):
200         return "\n".join(itertools.chain(
201             self._text_report_gen(),
202             self._recommend_gen())) + "\n"
203
204     def html_report(self):
205         return crunchstat_summary.chartjs.ChartJS(self.label, [self]).html()
206
207     def _text_report_gen(self):
208         yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
209         for category, stat_max in sorted(self.stats_max.iteritems()):
210             for stat, val in sorted(stat_max.iteritems()):
211                 if stat.endswith('__rate'):
212                     continue
213                 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
214                 val = self._format(val)
215                 tot = self._format(self.job_tot[category].get(stat, '-'))
216                 yield "\t".join([category, stat, str(val), max_rate, tot])
217         for args in (
218                 ('Number of tasks: {}',
219                  len(self.tasks),
220                  None),
221                 ('Max CPU time spent by a single task: {}s',
222                  self.stats_max['cpu']['user+sys'],
223                  None),
224                 ('Max CPU usage in a single interval: {}%',
225                  self.stats_max['cpu']['user+sys__rate'],
226                  lambda x: x * 100),
227                 ('Overall CPU usage: {}%',
228                  self.job_tot['cpu']['user+sys'] /
229                  self.job_tot['time']['elapsed'],
230                  lambda x: x * 100),
231                 ('Max memory used by a single task: {}GB',
232                  self.stats_max['mem']['rss'],
233                  lambda x: x / 1e9),
234                 ('Max network traffic in a single task: {}GB',
235                  self.stats_max['net:eth0']['tx+rx'] +
236                  self.stats_max['net:keep0']['tx+rx'],
237                  lambda x: x / 1e9),
238                 ('Max network speed in a single interval: {}MB/s',
239                  self.stats_max['net:eth0']['tx+rx__rate'] +
240                  self.stats_max['net:keep0']['tx+rx__rate'],
241                  lambda x: x / 1e6)):
242             format_string, val, transform = args
243             if val == float('-Inf'):
244                 continue
245             if transform:
246                 val = transform(val)
247             yield "# "+format_string.format(self._format(val))
248
249     def _recommend_gen(self):
250         return itertools.chain(
251             self._recommend_cpu(),
252             self._recommend_ram())
253
254     def _recommend_cpu(self):
255         """Recommend asking for 4 cores if max CPU usage was 333%"""
256
257         cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
258         if cpu_max_rate == float('-Inf'):
259             logger.warning('%s: no CPU usage data', self.label)
260             return
261         used_cores = int(math.ceil(cpu_max_rate))
262         asked_cores = self.existing_constraints.get('min_cores_per_node')
263         if asked_cores is None or used_cores < asked_cores:
264             yield (
265                 '#!! {} max CPU usage was {}% -- '
266                 'try runtime_constraints "min_cores_per_node":{}'
267             ).format(
268                 self.label,
269                 int(math.ceil(cpu_max_rate*100)),
270                 int(used_cores))
271
272     def _recommend_ram(self):
273         """Recommend an economical RAM constraint for this job.
274
275         Nodes that are advertised as "8 gibibytes" actually have what
276         we might call "8 nearlygibs" of memory available for jobs.
277         Here, we calculate a whole number of nearlygibs that would
278         have sufficed to run the job, then recommend requesting a node
279         with that number of nearlygibs (expressed as mebibytes).
280
281         Requesting a node with "nearly 8 gibibytes" is our best hope
282         of getting a node that actually has nearly 8 gibibytes
283         available.  If the node manager is smart enough to account for
284         the discrepancy itself when choosing/creating a node, we'll
285         get an 8 GiB node with nearly 8 GiB available.  Otherwise, the
286         advertised size of the next-size-smaller node (say, 6 GiB)
287         will be too low to satisfy our request, so we will effectively
288         get rounded up to 8 GiB.
289
290         For example, if we need 7500 MiB, we can ask for 7500 MiB, and
291         we will generally get a node that is advertised as "8 GiB" and
292         has at least 7500 MiB available.  However, asking for 8192 MiB
293         would either result in an unnecessarily expensive 12 GiB node
294         (if node manager knows about the discrepancy), or an 8 GiB
295         node which has less than 8192 MiB available and is therefore
296         considered by crunch-dispatch to be too small to meet our
297         constraint.
298
299         When node manager learns how to predict the available memory
300         for each node type such that crunch-dispatch always agrees
301         that a node is big enough to run the job it was brought up
302         for, all this will be unnecessary.  We'll just ask for exactly
303         the memory we want -- even if that happens to be 8192 MiB.
304         """
305
306         used_bytes = self.stats_max['mem']['rss']
307         if used_bytes == float('-Inf'):
308             logger.warning('%s: no memory usage data', self.label)
309             return
310         used_mib = math.ceil(float(used_bytes) / 1048576)
311         asked_mib = self.existing_constraints.get('min_ram_mb_per_node')
312
313         nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
314         if asked_mib is None or (
315                 math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib)):
316             yield (
317                 '#!! {} max RSS was {} MiB -- '
318                 'try runtime_constraints "min_ram_mb_per_node":{}'
319             ).format(
320                 self.label,
321                 int(used_mib),
322                 int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024))
323
324     def _format(self, val):
325         """Return a string representation of a stat.
326
327         {:.2f} for floats, default format for everything else."""
328         if isinstance(val, float):
329             return '{:.2f}'.format(val)
330         else:
331             return '{}'.format(val)
332
333
334 class CollectionSummarizer(Summarizer):
335     def __init__(self, collection_id, **kwargs):
336         super(CollectionSummarizer, self).__init__(
337             crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
338         self.label = collection_id
339
340
341 class JobSummarizer(Summarizer):
342     def __init__(self, job, **kwargs):
343         arv = arvados.api('v1')
344         if isinstance(job, basestring):
345             self.job = arv.jobs().get(uuid=job).execute()
346         else:
347             self.job = job
348         if self.job['log']:
349             rdr = crunchstat_summary.reader.CollectionReader(self.job['log'])
350             label = self.job['uuid']
351         else:
352             rdr = crunchstat_summary.reader.LiveLogReader(self.job['uuid'])
353             label = self.job['uuid'] + ' (partial)'
354         super(JobSummarizer, self).__init__(rdr, **kwargs)
355         self.label = label
356         self.existing_constraints = self.job.get('runtime_constraints', {})
357
358
359 class PipelineSummarizer(object):
360     def __init__(self, pipeline_instance_uuid, **kwargs):
361         arv = arvados.api('v1', model=OrderedJsonModel())
362         instance = arv.pipeline_instances().get(
363             uuid=pipeline_instance_uuid).execute()
364         self.summarizers = collections.OrderedDict()
365         for cname, component in instance['components'].iteritems():
366             if 'job' not in component:
367                 logger.warning(
368                     "%s: skipping component with no job assigned", cname)
369             elif component['job'].get('log') is None:
370                 logger.warning(
371                     "%s: skipping job %s with no log available",
372                     cname, component['job'].get('uuid'))
373             else:
374                 logger.info(
375                     "%s: logdata %s", cname, component['job']['log'])
376                 summarizer = JobSummarizer(component['job'], **kwargs)
377                 summarizer.label = cname
378                 self.summarizers[cname] = summarizer
379         self.label = pipeline_instance_uuid
380
381     def run(self):
382         threads = []
383         for summarizer in self.summarizers.itervalues():
384             t = threading.Thread(target=summarizer.run)
385             t.daemon = True
386             t.start()
387             threads.append(t)
388         for t in threads:
389             t.join()
390
391     def text_report(self):
392         txt = ''
393         for cname, summarizer in self.summarizers.iteritems():
394             txt += '### Summary for {} ({})\n'.format(
395                 cname, summarizer.job['uuid'])
396             txt += summarizer.text_report()
397             txt += '\n'
398         return txt
399
400     def html_report(self):
401         return crunchstat_summary.chartjs.ChartJS(
402             self.label, self.summarizers.itervalues()).html()