8341: Use "time since job start", not "time since task start", as X axis.
[arvados.git] / tools / crunchstat-summary / crunchstat_summary / summarizer.py
1 from __future__ import print_function
2
3 import arvados
4 import collections
5 import crunchstat_summary.chartjs
6 import crunchstat_summary.reader
7 import datetime
8 import functools
9 import itertools
10 import math
11 import re
12 import sys
13
14 from arvados.api import OrderedJsonModel
15 from crunchstat_summary import logger
16
17 # Recommend memory constraints that are this multiple of an integral
18 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
19 # that have amounts like 7.5 GiB according to the kernel.)
20 AVAILABLE_RAM_RATIO = 0.95
21
22
23 class Task(object):
24     def __init__(self):
25         self.starttime = None
26         self.series = collections.defaultdict(list)
27
28
29 class Summarizer(object):
30     def __init__(self, logdata, label=None, skip_child_jobs=False):
31         self._logdata = logdata
32
33         self.label = label
34         self.starttime = None
35         self.finishtime = None
36         self._skip_child_jobs = skip_child_jobs
37
38         # stats_max: {category: {stat: val}}
39         self.stats_max = collections.defaultdict(
40             functools.partial(collections.defaultdict,
41                               lambda: float('-Inf')))
42         # task_stats: {task_id: {category: {stat: val}}}
43         self.task_stats = collections.defaultdict(
44             functools.partial(collections.defaultdict, dict))
45
46         self.seq_to_uuid = {}
47         self.tasks = collections.defaultdict(Task)
48
49         # We won't bother recommending new runtime constraints if the
50         # constraints given when running the job are known to us and
51         # are already suitable.  If applicable, the subclass
52         # constructor will overwrite this with something useful.
53         self.existing_constraints = {}
54
55         logger.debug("%s: logdata %s", self.label, repr(logdata))
56
57     def run(self):
58         logger.debug("%s: parsing log data", self.label)
59         for line in self._logdata:
60             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
61             if m:
62                 seq = int(m.group('seq'))
63                 uuid = m.group('task_uuid')
64                 self.seq_to_uuid[seq] = uuid
65                 logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
66                 continue
67
68             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) success in (?P<elapsed>\d+) seconds', line)
69             if m:
70                 task_id = self.seq_to_uuid[int(m.group('seq'))]
71                 elapsed = int(m.group('elapsed'))
72                 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
73                 if elapsed > self.stats_max['time']['elapsed']:
74                     self.stats_max['time']['elapsed'] = elapsed
75                 continue
76
77             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
78             if m:
79                 uuid = m.group('uuid')
80                 if self._skip_child_jobs:
81                     logger.warning('%s: omitting stats from child job %s'
82                                    ' because --skip-child-jobs flag is on',
83                                    self.label, uuid)
84                     continue
85                 logger.debug('%s: follow %s', self.label, uuid)
86                 child_summarizer = JobSummarizer(uuid)
87                 child_summarizer.stats_max = self.stats_max
88                 child_summarizer.task_stats = self.task_stats
89                 child_summarizer.tasks = self.tasks
90                 child_summarizer.starttime = self.starttime
91                 child_summarizer.run()
92                 logger.debug('%s: done %s', self.label, uuid)
93                 continue
94
95             m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
96             if not m:
97                 continue
98
99             if self.label is None:
100                 self.label = m.group('job_uuid')
101                 logger.debug('%s: using job uuid as label', self.label)
102             if m.group('category').endswith(':'):
103                 # "stderr crunchstat: notice: ..."
104                 continue
105             elif m.group('category') in ('error', 'caught'):
106                 continue
107             elif m.group('category') == 'read':
108                 # "stderr crunchstat: read /proc/1234/net/dev: ..."
109                 # (crunchstat formatting fixed, but old logs still say this)
110                 continue
111             task_id = self.seq_to_uuid[int(m.group('seq'))]
112             task = self.tasks[task_id]
113
114             # Use the first and last crunchstat timestamps as
115             # approximations of starttime and finishtime.
116             timestamp = datetime.datetime.strptime(
117                 m.group('timestamp'), '%Y-%m-%d_%H:%M:%S')
118             if not task.starttime:
119                 task.starttime = timestamp
120                 logger.debug('%s: task %s starttime %s',
121                              self.label, task_id, timestamp)
122             task.finishtime = timestamp
123
124             if not self.starttime:
125                 self.starttime = timestamp
126             self.finishtime = timestamp
127
128             this_interval_s = None
129             for group in ['current', 'interval']:
130                 if not m.group(group):
131                     continue
132                 category = m.group('category')
133                 words = m.group(group).split(' ')
134                 stats = {}
135                 for val, stat in zip(words[::2], words[1::2]):
136                     try:
137                         if '.' in val:
138                             stats[stat] = float(val)
139                         else:
140                             stats[stat] = int(val)
141                     except ValueError as e:
142                         raise ValueError(
143                             'Error parsing {} stat in "{}": {!r}'.format(
144                                 stat, line, e))
145                 if 'user' in stats or 'sys' in stats:
146                     stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
147                 if 'tx' in stats or 'rx' in stats:
148                     stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
149                 for stat, val in stats.iteritems():
150                     if group == 'interval':
151                         if stat == 'seconds':
152                             this_interval_s = val
153                             continue
154                         elif not (this_interval_s > 0):
155                             logger.error(
156                                 "BUG? interval stat given with duration {!r}".
157                                 format(this_interval_s))
158                             continue
159                         else:
160                             stat = stat + '__rate'
161                             val = val / this_interval_s
162                             if stat in ['user+sys__rate', 'tx+rx__rate']:
163                                 task.series[category, stat].append(
164                                     (timestamp - self.starttime, val))
165                     else:
166                         if stat in ['rss']:
167                             task.series[category, stat].append(
168                                 (timestamp - self.starttime, val))
169                         self.task_stats[task_id][category][stat] = val
170                     if val > self.stats_max[category][stat]:
171                         self.stats_max[category][stat] = val
172         logger.debug('%s: done parsing', self.label)
173
174         self.job_tot = collections.defaultdict(
175             functools.partial(collections.defaultdict, int))
176         for task_id, task_stat in self.task_stats.iteritems():
177             for category, stat_last in task_stat.iteritems():
178                 for stat, val in stat_last.iteritems():
179                     if stat in ['cpus', 'cache', 'swap', 'rss']:
180                         # meaningless stats like 16 cpu cores x 5 tasks = 80
181                         continue
182                     self.job_tot[category][stat] += val
183         logger.debug('%s: done totals', self.label)
184
185     def long_label(self):
186         label = self.label
187         if self.finishtime:
188             label += ' -- elapsed time '
189             s = (self.finishtime - self.starttime).total_seconds()
190             if s > 86400:
191                 label += '{}d'.format(int(s/86400))
192             if s > 3600:
193                 label += '{}h'.format(int(s/3600) % 24)
194             if s > 60:
195                 label += '{}m'.format(int(s/60) % 60)
196             label += '{}s'.format(int(s) % 60)
197         return label
198
199     def text_report(self):
200         return "\n".join(itertools.chain(
201             self._text_report_gen(),
202             self._recommend_gen())) + "\n"
203
204     def html_report(self):
205         return crunchstat_summary.chartjs.ChartJS(self.label, [self]).html()
206
207     def _text_report_gen(self):
208         yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
209         for category, stat_max in sorted(self.stats_max.iteritems()):
210             for stat, val in sorted(stat_max.iteritems()):
211                 if stat.endswith('__rate'):
212                     continue
213                 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
214                 val = self._format(val)
215                 tot = self._format(self.job_tot[category].get(stat, '-'))
216                 yield "\t".join([category, stat, str(val), max_rate, tot])
217         for args in (
218                 ('Number of tasks: {}',
219                  len(self.tasks),
220                  None),
221                 ('Max CPU time spent by a single task: {}s',
222                  self.stats_max['cpu']['user+sys'],
223                  None),
224                 ('Max CPU usage in a single interval: {}%',
225                  self.stats_max['cpu']['user+sys__rate'],
226                  lambda x: x * 100),
227                 ('Overall CPU usage: {}%',
228                  self.job_tot['cpu']['user+sys'] /
229                  self.job_tot['time']['elapsed'],
230                  lambda x: x * 100),
231                 ('Max memory used by a single task: {}GB',
232                  self.stats_max['mem']['rss'],
233                  lambda x: x / 1e9),
234                 ('Max network traffic in a single task: {}GB',
235                  self.stats_max['net:eth0']['tx+rx'],
236                  lambda x: x / 1e9),
237                 ('Max network speed in a single interval: {}MB/s',
238                  self.stats_max['net:eth0']['tx+rx__rate'],
239                  lambda x: x / 1e6)):
240             format_string, val, transform = args
241             if val == float('-Inf'):
242                 continue
243             if transform:
244                 val = transform(val)
245             yield "# "+format_string.format(self._format(val))
246
247     def _recommend_gen(self):
248         return itertools.chain(
249             self._recommend_cpu(),
250             self._recommend_ram())
251
252     def _recommend_cpu(self):
253         """Recommend asking for 4 cores if max CPU usage was 333%"""
254
255         cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
256         if cpu_max_rate == float('-Inf'):
257             logger.warning('%s: no CPU usage data', self.label)
258             return
259         used_cores = int(math.ceil(cpu_max_rate))
260         asked_cores = self.existing_constraints.get('min_cores_per_node')
261         if asked_cores is None or used_cores < asked_cores:
262             yield (
263                 '#!! {} max CPU usage was {}% -- '
264                 'try runtime_constraints "min_cores_per_node":{}'
265             ).format(
266                 self.label,
267                 int(math.ceil(cpu_max_rate*100)),
268                 int(used_cores))
269
270     def _recommend_ram(self):
271         """Recommend an economical RAM constraint for this job.
272
273         Nodes that are advertised as "8 gibibytes" actually have what
274         we might call "8 nearlygibs" of memory available for jobs.
275         Here, we calculate a whole number of nearlygibs that would
276         have sufficed to run the job, then recommend requesting a node
277         with that number of nearlygibs (expressed as mebibytes).
278
279         Requesting a node with "nearly 8 gibibytes" is our best hope
280         of getting a node that actually has nearly 8 gibibytes
281         available.  If the node manager is smart enough to account for
282         the discrepancy itself when choosing/creating a node, we'll
283         get an 8 GiB node with nearly 8 GiB available.  Otherwise, the
284         advertised size of the next-size-smaller node (say, 6 GiB)
285         will be too low to satisfy our request, so we will effectively
286         get rounded up to 8 GiB.
287
288         For example, if we need 7500 MiB, we can ask for 7500 MiB, and
289         we will generally get a node that is advertised as "8 GiB" and
290         has at least 7500 MiB available.  However, asking for 8192 MiB
291         would either result in an unnecessarily expensive 12 GiB node
292         (if node manager knows about the discrepancy), or an 8 GiB
293         node which has less than 8192 MiB available and is therefore
294         considered by crunch-dispatch to be too small to meet our
295         constraint.
296
297         When node manager learns how to predict the available memory
298         for each node type such that crunch-dispatch always agrees
299         that a node is big enough to run the job it was brought up
300         for, all this will be unnecessary.  We'll just ask for exactly
301         the memory we want -- even if that happens to be 8192 MiB.
302         """
303
304         used_bytes = self.stats_max['mem']['rss']
305         if used_bytes == float('-Inf'):
306             logger.warning('%s: no memory usage data', self.label)
307             return
308         used_mib = math.ceil(float(used_bytes) / 1048576)
309         asked_mib = self.existing_constraints.get('min_ram_mb_per_node')
310
311         nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
312         if asked_mib is None or (
313                 math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib)):
314             yield (
315                 '#!! {} max RSS was {} MiB -- '
316                 'try runtime_constraints "min_ram_mb_per_node":{}'
317             ).format(
318                 self.label,
319                 int(used_mib),
320                 int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024))
321
322     def _format(self, val):
323         """Return a string representation of a stat.
324
325         {:.2f} for floats, default format for everything else."""
326         if isinstance(val, float):
327             return '{:.2f}'.format(val)
328         else:
329             return '{}'.format(val)
330
331
332 class CollectionSummarizer(Summarizer):
333     def __init__(self, collection_id, **kwargs):
334         super(CollectionSummarizer, self).__init__(
335             crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
336         self.label = collection_id
337
338
339 class JobSummarizer(Summarizer):
340     def __init__(self, job, **kwargs):
341         arv = arvados.api('v1')
342         if isinstance(job, basestring):
343             self.job = arv.jobs().get(uuid=job).execute()
344         else:
345             self.job = job
346         if self.job['log']:
347             rdr = crunchstat_summary.reader.CollectionReader(self.job['log'])
348             label = self.job['uuid']
349         else:
350             rdr = crunchstat_summary.reader.LiveLogReader(self.job['uuid'])
351             label = self.job['uuid'] + ' (partial)'
352         super(JobSummarizer, self).__init__(rdr, **kwargs)
353         self.label = label
354         self.existing_constraints = self.job.get('runtime_constraints', {})
355
356
357 class PipelineSummarizer(object):
358     def __init__(self, pipeline_instance_uuid, **kwargs):
359         arv = arvados.api('v1', model=OrderedJsonModel())
360         instance = arv.pipeline_instances().get(
361             uuid=pipeline_instance_uuid).execute()
362         self.summarizers = collections.OrderedDict()
363         for cname, component in instance['components'].iteritems():
364             if 'job' not in component:
365                 logger.warning(
366                     "%s: skipping component with no job assigned", cname)
367             elif component['job'].get('log') is None:
368                 logger.warning(
369                     "%s: skipping job %s with no log available",
370                     cname, component['job'].get('uuid'))
371             else:
372                 logger.info(
373                     "%s: logdata %s", cname, component['job']['log'])
374                 summarizer = JobSummarizer(component['job'], **kwargs)
375                 summarizer.label = cname
376                 self.summarizers[cname] = summarizer
377         self.label = pipeline_instance_uuid
378
379     def run(self):
380         for summarizer in self.summarizers.itervalues():
381             summarizer.run()
382
383     def text_report(self):
384         txt = ''
385         for cname, summarizer in self.summarizers.iteritems():
386             txt += '### Summary for {} ({})\n'.format(
387                 cname, summarizer.job['uuid'])
388             txt += summarizer.text_report()
389             txt += '\n'
390         return txt
391
392     def html_report(self):
393         return crunchstat_summary.chartjs.ChartJS(
394             self.label, self.summarizers.itervalues()).html()