4083: crunchstat-summary imports _strptime.
[arvados.git] / tools / crunchstat-summary / crunchstat_summary / summarizer.py
1 from __future__ import print_function
2
3 import arvados
4 import collections
5 import crunchstat_summary.chartjs
6 import crunchstat_summary.reader
7 import datetime
8 import functools
9 import itertools
10 import math
11 import re
12 import sys
13 import threading
14 import _strptime
15
16 from arvados.api import OrderedJsonModel
17 from crunchstat_summary import logger
18
19 # Recommend memory constraints that are this multiple of an integral
20 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
21 # that have amounts like 7.5 GiB according to the kernel.)
22 AVAILABLE_RAM_RATIO = 0.95
23
24
25 # Workaround datetime.datetime.strptime() thread-safety bug by calling
26 # it once before starting threads.  https://bugs.python.org/issue7980
27 datetime.datetime.strptime('1999-12-31_23:59:59', '%Y-%m-%d_%H:%M:%S')
28
29
30 class Task(object):
31     def __init__(self):
32         self.starttime = None
33         self.series = collections.defaultdict(list)
34
35
36 class Summarizer(object):
37     def __init__(self, logdata, label=None, skip_child_jobs=False):
38         self._logdata = logdata
39
40         self.label = label
41         self.starttime = None
42         self.finishtime = None
43         self._skip_child_jobs = skip_child_jobs
44
45         # stats_max: {category: {stat: val}}
46         self.stats_max = collections.defaultdict(
47             functools.partial(collections.defaultdict, lambda: 0))
48         # task_stats: {task_id: {category: {stat: val}}}
49         self.task_stats = collections.defaultdict(
50             functools.partial(collections.defaultdict, dict))
51
52         self.seq_to_uuid = {}
53         self.tasks = collections.defaultdict(Task)
54
55         # We won't bother recommending new runtime constraints if the
56         # constraints given when running the job are known to us and
57         # are already suitable.  If applicable, the subclass
58         # constructor will overwrite this with something useful.
59         self.existing_constraints = {}
60
61         logger.debug("%s: logdata %s", self.label, logdata)
62
63     def run(self):
64         logger.debug("%s: parsing logdata %s", self.label, self._logdata)
65         for line in self._logdata:
66             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
67             if m:
68                 seq = int(m.group('seq'))
69                 uuid = m.group('task_uuid')
70                 self.seq_to_uuid[seq] = uuid
71                 logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
72                 continue
73
74             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
75             if m:
76                 task_id = self.seq_to_uuid[int(m.group('seq'))]
77                 elapsed = int(m.group('elapsed'))
78                 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
79                 if elapsed > self.stats_max['time']['elapsed']:
80                     self.stats_max['time']['elapsed'] = elapsed
81                 continue
82
83             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
84             if m:
85                 uuid = m.group('uuid')
86                 if self._skip_child_jobs:
87                     logger.warning('%s: omitting stats from child job %s'
88                                    ' because --skip-child-jobs flag is on',
89                                    self.label, uuid)
90                     continue
91                 logger.debug('%s: follow %s', self.label, uuid)
92                 child_summarizer = JobSummarizer(uuid)
93                 child_summarizer.stats_max = self.stats_max
94                 child_summarizer.task_stats = self.task_stats
95                 child_summarizer.tasks = self.tasks
96                 child_summarizer.starttime = self.starttime
97                 child_summarizer.run()
98                 logger.debug('%s: done %s', self.label, uuid)
99                 continue
100
101             m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
102             if not m:
103                 continue
104
105             if self.label is None:
106                 self.label = m.group('job_uuid')
107                 logger.debug('%s: using job uuid as label', self.label)
108             if m.group('category').endswith(':'):
109                 # "stderr crunchstat: notice: ..."
110                 continue
111             elif m.group('category') in ('error', 'caught'):
112                 continue
113             elif m.group('category') == 'read':
114                 # "stderr crunchstat: read /proc/1234/net/dev: ..."
115                 # (crunchstat formatting fixed, but old logs still say this)
116                 continue
117             task_id = self.seq_to_uuid[int(m.group('seq'))]
118             task = self.tasks[task_id]
119
120             # Use the first and last crunchstat timestamps as
121             # approximations of starttime and finishtime.
122             timestamp = datetime.datetime.strptime(
123                 m.group('timestamp'), '%Y-%m-%d_%H:%M:%S')
124             if not task.starttime:
125                 task.starttime = timestamp
126                 logger.debug('%s: task %s starttime %s',
127                              self.label, task_id, timestamp)
128             task.finishtime = timestamp
129
130             if not self.starttime:
131                 self.starttime = timestamp
132             self.finishtime = timestamp
133
134             this_interval_s = None
135             for group in ['current', 'interval']:
136                 if not m.group(group):
137                     continue
138                 category = m.group('category')
139                 words = m.group(group).split(' ')
140                 stats = {}
141                 for val, stat in zip(words[::2], words[1::2]):
142                     try:
143                         if '.' in val:
144                             stats[stat] = float(val)
145                         else:
146                             stats[stat] = int(val)
147                     except ValueError as e:
148                         raise ValueError(
149                             'Error parsing {} stat in "{}": {!r}'.format(
150                                 stat, line, e))
151                 if 'user' in stats or 'sys' in stats:
152                     stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
153                 if 'tx' in stats or 'rx' in stats:
154                     stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
155                 for stat, val in stats.iteritems():
156                     if group == 'interval':
157                         if stat == 'seconds':
158                             this_interval_s = val
159                             continue
160                         elif not (this_interval_s > 0):
161                             logger.error(
162                                 "BUG? interval stat given with duration {!r}".
163                                 format(this_interval_s))
164                             continue
165                         else:
166                             stat = stat + '__rate'
167                             val = val / this_interval_s
168                             if stat in ['user+sys__rate', 'tx+rx__rate']:
169                                 task.series[category, stat].append(
170                                     (timestamp - self.starttime, val))
171                     else:
172                         if stat in ['rss']:
173                             task.series[category, stat].append(
174                                 (timestamp - self.starttime, val))
175                         self.task_stats[task_id][category][stat] = val
176                     if val > self.stats_max[category][stat]:
177                         self.stats_max[category][stat] = val
178         logger.debug('%s: done parsing', self.label)
179
180         self.job_tot = collections.defaultdict(
181             functools.partial(collections.defaultdict, int))
182         for task_id, task_stat in self.task_stats.iteritems():
183             for category, stat_last in task_stat.iteritems():
184                 for stat, val in stat_last.iteritems():
185                     if stat in ['cpus', 'cache', 'swap', 'rss']:
186                         # meaningless stats like 16 cpu cores x 5 tasks = 80
187                         continue
188                     self.job_tot[category][stat] += val
189         logger.debug('%s: done totals', self.label)
190
191     def long_label(self):
192         label = self.label
193         if self.finishtime:
194             label += ' -- elapsed time '
195             s = (self.finishtime - self.starttime).total_seconds()
196             if s > 86400:
197                 label += '{}d'.format(int(s/86400))
198             if s > 3600:
199                 label += '{}h'.format(int(s/3600) % 24)
200             if s > 60:
201                 label += '{}m'.format(int(s/60) % 60)
202             label += '{}s'.format(int(s) % 60)
203         return label
204
205     def text_report(self):
206         if not self.tasks:
207             return "(no report generated)\n"
208         return "\n".join(itertools.chain(
209             self._text_report_gen(),
210             self._recommend_gen())) + "\n"
211
212     def html_report(self):
213         return crunchstat_summary.chartjs.ChartJS(self.label, [self]).html()
214
215     def _text_report_gen(self):
216         yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
217         for category, stat_max in sorted(self.stats_max.iteritems()):
218             for stat, val in sorted(stat_max.iteritems()):
219                 if stat.endswith('__rate'):
220                     continue
221                 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
222                 val = self._format(val)
223                 tot = self._format(self.job_tot[category].get(stat, '-'))
224                 yield "\t".join([category, stat, str(val), max_rate, tot])
225         for args in (
226                 ('Number of tasks: {}',
227                  len(self.tasks),
228                  None),
229                 ('Max CPU time spent by a single task: {}s',
230                  self.stats_max['cpu']['user+sys'],
231                  None),
232                 ('Max CPU usage in a single interval: {}%',
233                  self.stats_max['cpu']['user+sys__rate'],
234                  lambda x: x * 100),
235                 ('Overall CPU usage: {}%',
236                  self.job_tot['cpu']['user+sys'] /
237                  self.job_tot['time']['elapsed']
238                  if self.job_tot['time']['elapsed'] > 0 else 0,
239                  lambda x: x * 100),
240                 ('Max memory used by a single task: {}GB',
241                  self.stats_max['mem']['rss'],
242                  lambda x: x / 1e9),
243                 ('Max network traffic in a single task: {}GB',
244                  self.stats_max['net:eth0']['tx+rx'] +
245                  self.stats_max['net:keep0']['tx+rx'],
246                  lambda x: x / 1e9),
247                 ('Max network speed in a single interval: {}MB/s',
248                  self.stats_max['net:eth0']['tx+rx__rate'] +
249                  self.stats_max['net:keep0']['tx+rx__rate'],
250                  lambda x: x / 1e6),
251                 ('Keep cache miss rate {}%',
252                  (float(self.job_tot['keepcache']['miss']) /
253                  float(self.job_tot['keepcalls']['get']))
254                  if self.job_tot['keepcalls']['get'] > 0 else 0,
255                  lambda x: x * 100.0),
256                 ('Keep cache utilization {}%',
257                  (float(self.job_tot['blkio:0:0']['read']) /
258                  float(self.job_tot['net:keep0']['rx']))
259                  if self.job_tot['net:keep0']['rx'] > 0 else 0,
260                  lambda x: x * 100.0)):
261             format_string, val, transform = args
262             if val == float('-Inf'):
263                 continue
264             if transform:
265                 val = transform(val)
266             yield "# "+format_string.format(self._format(val))
267
268     def _recommend_gen(self):
269         return itertools.chain(
270             self._recommend_cpu(),
271             self._recommend_ram(),
272             self._recommend_keep_cache())
273
274     def _recommend_cpu(self):
275         """Recommend asking for 4 cores if max CPU usage was 333%"""
276
277         cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
278         if cpu_max_rate == float('-Inf'):
279             logger.warning('%s: no CPU usage data', self.label)
280             return
281         used_cores = max(1, int(math.ceil(cpu_max_rate)))
282         asked_cores = self.existing_constraints.get('min_cores_per_node')
283         if asked_cores is None or used_cores < asked_cores:
284             yield (
285                 '#!! {} max CPU usage was {}% -- '
286                 'try runtime_constraints "min_cores_per_node":{}'
287             ).format(
288                 self.label,
289                 int(math.ceil(cpu_max_rate*100)),
290                 int(used_cores))
291
292     def _recommend_ram(self):
293         """Recommend an economical RAM constraint for this job.
294
295         Nodes that are advertised as "8 gibibytes" actually have what
296         we might call "8 nearlygibs" of memory available for jobs.
297         Here, we calculate a whole number of nearlygibs that would
298         have sufficed to run the job, then recommend requesting a node
299         with that number of nearlygibs (expressed as mebibytes).
300
301         Requesting a node with "nearly 8 gibibytes" is our best hope
302         of getting a node that actually has nearly 8 gibibytes
303         available.  If the node manager is smart enough to account for
304         the discrepancy itself when choosing/creating a node, we'll
305         get an 8 GiB node with nearly 8 GiB available.  Otherwise, the
306         advertised size of the next-size-smaller node (say, 6 GiB)
307         will be too low to satisfy our request, so we will effectively
308         get rounded up to 8 GiB.
309
310         For example, if we need 7500 MiB, we can ask for 7500 MiB, and
311         we will generally get a node that is advertised as "8 GiB" and
312         has at least 7500 MiB available.  However, asking for 8192 MiB
313         would either result in an unnecessarily expensive 12 GiB node
314         (if node manager knows about the discrepancy), or an 8 GiB
315         node which has less than 8192 MiB available and is therefore
316         considered by crunch-dispatch to be too small to meet our
317         constraint.
318
319         When node manager learns how to predict the available memory
320         for each node type such that crunch-dispatch always agrees
321         that a node is big enough to run the job it was brought up
322         for, all this will be unnecessary.  We'll just ask for exactly
323         the memory we want -- even if that happens to be 8192 MiB.
324         """
325
326         used_bytes = self.stats_max['mem']['rss']
327         if used_bytes == float('-Inf'):
328             logger.warning('%s: no memory usage data', self.label)
329             return
330         used_mib = math.ceil(float(used_bytes) / 1048576)
331         asked_mib = self.existing_constraints.get('min_ram_mb_per_node')
332
333         nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
334         if asked_mib is None or (
335                 math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib)):
336             yield (
337                 '#!! {} max RSS was {} MiB -- '
338                 'try runtime_constraints "min_ram_mb_per_node":{}'
339             ).format(
340                 self.label,
341                 int(used_mib),
342                 int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024))
343
344     def _recommend_keep_cache(self):
345         """Recommend increasing keep cache if utilization < 80%"""
346         if self.job_tot['net:keep0']['rx'] == 0:
347             return
348         utilization = (float(self.job_tot['blkio:0:0']['read']) /
349                        float(self.job_tot['net:keep0']['rx']))
350         asked_mib = self.existing_constraints.get('keep_cache_mb_per_task', 256)
351
352         if utilization < 0.8:
353             yield (
354                 '#!! {} Keep cache utilization was {:.2f}% -- '
355                 'try runtime_constraints "keep_cache_mb_per_task":{} (or more)'
356             ).format(
357                 self.label,
358                 utilization * 100.0,
359                 asked_mib*2)
360
361
362     def _format(self, val):
363         """Return a string representation of a stat.
364
365         {:.2f} for floats, default format for everything else."""
366         if isinstance(val, float):
367             return '{:.2f}'.format(val)
368         else:
369             return '{}'.format(val)
370
371
372 class CollectionSummarizer(Summarizer):
373     def __init__(self, collection_id, **kwargs):
374         super(CollectionSummarizer, self).__init__(
375             crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
376         self.label = collection_id
377
378
379 class JobSummarizer(Summarizer):
380     def __init__(self, job, **kwargs):
381         arv = arvados.api('v1')
382         if isinstance(job, basestring):
383             self.job = arv.jobs().get(uuid=job).execute()
384         else:
385             self.job = job
386         rdr = None
387         if self.job.get('log'):
388             try:
389                 rdr = crunchstat_summary.reader.CollectionReader(self.job['log'])
390             except arvados.errors.NotFoundError as e:
391                 logger.warning("Trying event logs after failing to read "
392                                "log collection %s: %s", self.job['log'], e)
393             else:
394                 label = self.job['uuid']
395         if rdr is None:
396             rdr = crunchstat_summary.reader.LiveLogReader(self.job['uuid'])
397             label = self.job['uuid'] + ' (partial)'
398         super(JobSummarizer, self).__init__(rdr, **kwargs)
399         self.label = label
400         self.existing_constraints = self.job.get('runtime_constraints', {})
401
402
403 class PipelineSummarizer(object):
404     def __init__(self, pipeline_instance_uuid, **kwargs):
405         arv = arvados.api('v1', model=OrderedJsonModel())
406         instance = arv.pipeline_instances().get(
407             uuid=pipeline_instance_uuid).execute()
408         self.summarizers = collections.OrderedDict()
409         for cname, component in instance['components'].iteritems():
410             if 'job' not in component:
411                 logger.warning(
412                     "%s: skipping component with no job assigned", cname)
413             else:
414                 logger.info(
415                     "%s: job %s", cname, component['job']['uuid'])
416                 summarizer = JobSummarizer(component['job'], **kwargs)
417                 summarizer.label = '{} {}'.format(
418                     cname, component['job']['uuid'])
419                 self.summarizers[cname] = summarizer
420         self.label = pipeline_instance_uuid
421
422     def run(self):
423         threads = []
424         for summarizer in self.summarizers.itervalues():
425             t = threading.Thread(target=summarizer.run)
426             t.daemon = True
427             t.start()
428             threads.append(t)
429         for t in threads:
430             t.join()
431
432     def text_report(self):
433         txt = ''
434         for cname, summarizer in self.summarizers.iteritems():
435             txt += '### Summary for {} ({})\n'.format(
436                 cname, summarizer.job['uuid'])
437             txt += summarizer.text_report()
438             txt += '\n'
439         return txt
440
441     def html_report(self):
442         return crunchstat_summary.chartjs.ChartJS(
443             self.label, self.summarizers.itervalues()).html()