Merge branch '8784-dir-listings'
[arvados.git] / tools / crunchstat-summary / crunchstat_summary / summarizer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 from __future__ import print_function
6
7 import arvados
8 import collections
9 import crunchstat_summary.chartjs
10 import crunchstat_summary.reader
11 import datetime
12 import functools
13 import itertools
14 import math
15 import re
16 import sys
17 import threading
18 import _strptime
19
20 from arvados.api import OrderedJsonModel
21 from crunchstat_summary import logger
22
23 # Recommend memory constraints that are this multiple of an integral
24 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
25 # that have amounts like 7.5 GiB according to the kernel.)
26 AVAILABLE_RAM_RATIO = 0.95
27
28
29 # Workaround datetime.datetime.strptime() thread-safety bug by calling
30 # it once before starting threads.  https://bugs.python.org/issue7980
31 datetime.datetime.strptime('1999-12-31_23:59:59', '%Y-%m-%d_%H:%M:%S')
32
33
34 class Task(object):
35     def __init__(self):
36         self.starttime = None
37         self.series = collections.defaultdict(list)
38
39
40 class Summarizer(object):
41     def __init__(self, logdata, label=None, skip_child_jobs=False):
42         self._logdata = logdata
43
44         self.label = label
45         self.starttime = None
46         self.finishtime = None
47         self._skip_child_jobs = skip_child_jobs
48
49         # stats_max: {category: {stat: val}}
50         self.stats_max = collections.defaultdict(
51             functools.partial(collections.defaultdict, lambda: 0))
52         # task_stats: {task_id: {category: {stat: val}}}
53         self.task_stats = collections.defaultdict(
54             functools.partial(collections.defaultdict, dict))
55
56         self.seq_to_uuid = {}
57         self.tasks = collections.defaultdict(Task)
58
59         # We won't bother recommending new runtime constraints if the
60         # constraints given when running the job are known to us and
61         # are already suitable.  If applicable, the subclass
62         # constructor will overwrite this with something useful.
63         self.existing_constraints = {}
64
65         logger.debug("%s: logdata %s", self.label, logdata)
66
67     def run(self):
68         logger.debug("%s: parsing logdata %s", self.label, self._logdata)
69         for line in self._logdata:
70             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
71             if m:
72                 seq = int(m.group('seq'))
73                 uuid = m.group('task_uuid')
74                 self.seq_to_uuid[seq] = uuid
75                 logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
76                 continue
77
78             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
79             if m:
80                 task_id = self.seq_to_uuid[int(m.group('seq'))]
81                 elapsed = int(m.group('elapsed'))
82                 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
83                 if elapsed > self.stats_max['time']['elapsed']:
84                     self.stats_max['time']['elapsed'] = elapsed
85                 continue
86
87             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
88             if m:
89                 uuid = m.group('uuid')
90                 if self._skip_child_jobs:
91                     logger.warning('%s: omitting stats from child job %s'
92                                    ' because --skip-child-jobs flag is on',
93                                    self.label, uuid)
94                     continue
95                 logger.debug('%s: follow %s', self.label, uuid)
96                 child_summarizer = JobSummarizer(uuid)
97                 child_summarizer.stats_max = self.stats_max
98                 child_summarizer.task_stats = self.task_stats
99                 child_summarizer.tasks = self.tasks
100                 child_summarizer.starttime = self.starttime
101                 child_summarizer.run()
102                 logger.debug('%s: done %s', self.label, uuid)
103                 continue
104
105             m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
106             if not m:
107                 continue
108
109             try:
110                 if self.label is None:
111                     self.label = m.group('job_uuid')
112                     logger.debug('%s: using job uuid as label', self.label)
113                 if m.group('category').endswith(':'):
114                     # "stderr crunchstat: notice: ..."
115                     continue
116                 elif m.group('category') in ('error', 'caught'):
117                     continue
118                 elif m.group('category') == 'read':
119                     # "stderr crunchstat: read /proc/1234/net/dev: ..."
120                     # (crunchstat formatting fixed, but old logs still say this)
121                     continue
122                 task_id = self.seq_to_uuid[int(m.group('seq'))]
123                 task = self.tasks[task_id]
124
125                 # Use the first and last crunchstat timestamps as
126                 # approximations of starttime and finishtime.
127                 timestamp = datetime.datetime.strptime(
128                     m.group('timestamp'), '%Y-%m-%d_%H:%M:%S')
129                 if not task.starttime:
130                     task.starttime = timestamp
131                     logger.debug('%s: task %s starttime %s',
132                                  self.label, task_id, timestamp)
133                 task.finishtime = timestamp
134
135                 if not self.starttime:
136                     self.starttime = timestamp
137                 self.finishtime = timestamp
138
139                 this_interval_s = None
140                 for group in ['current', 'interval']:
141                     if not m.group(group):
142                         continue
143                     category = m.group('category')
144                     words = m.group(group).split(' ')
145                     stats = {}
146                     for val, stat in zip(words[::2], words[1::2]):
147                         try:
148                             if '.' in val:
149                                 stats[stat] = float(val)
150                             else:
151                                 stats[stat] = int(val)
152                         except ValueError as e:
153                             raise ValueError(
154                                 'Error parsing {} stat: {!r}'.format(
155                                     stat, e))
156                     if 'user' in stats or 'sys' in stats:
157                         stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
158                     if 'tx' in stats or 'rx' in stats:
159                         stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
160                     for stat, val in stats.iteritems():
161                         if group == 'interval':
162                             if stat == 'seconds':
163                                 this_interval_s = val
164                                 continue
165                             elif not (this_interval_s > 0):
166                                 logger.error(
167                                     "BUG? interval stat given with duration {!r}".
168                                     format(this_interval_s))
169                                 continue
170                             else:
171                                 stat = stat + '__rate'
172                                 val = val / this_interval_s
173                                 if stat in ['user+sys__rate', 'tx+rx__rate']:
174                                     task.series[category, stat].append(
175                                         (timestamp - self.starttime, val))
176                         else:
177                             if stat in ['rss']:
178                                 task.series[category, stat].append(
179                                     (timestamp - self.starttime, val))
180                             self.task_stats[task_id][category][stat] = val
181                         if val > self.stats_max[category][stat]:
182                             self.stats_max[category][stat] = val
183             except Exception as e:
184                 logger.info('Skipping malformed line: {}Error was: {}\n'.format(line, e))
185         logger.debug('%s: done parsing', self.label)
186
187         self.job_tot = collections.defaultdict(
188             functools.partial(collections.defaultdict, int))
189         for task_id, task_stat in self.task_stats.iteritems():
190             for category, stat_last in task_stat.iteritems():
191                 for stat, val in stat_last.iteritems():
192                     if stat in ['cpus', 'cache', 'swap', 'rss']:
193                         # meaningless stats like 16 cpu cores x 5 tasks = 80
194                         continue
195                     self.job_tot[category][stat] += val
196         logger.debug('%s: done totals', self.label)
197
198     def long_label(self):
199         label = self.label
200         if self.finishtime:
201             label += ' -- elapsed time '
202             s = (self.finishtime - self.starttime).total_seconds()
203             if s > 86400:
204                 label += '{}d'.format(int(s/86400))
205             if s > 3600:
206                 label += '{}h'.format(int(s/3600) % 24)
207             if s > 60:
208                 label += '{}m'.format(int(s/60) % 60)
209             label += '{}s'.format(int(s) % 60)
210         return label
211
212     def text_report(self):
213         if not self.tasks:
214             return "(no report generated)\n"
215         return "\n".join(itertools.chain(
216             self._text_report_gen(),
217             self._recommend_gen())) + "\n"
218
219     def html_report(self):
220         return crunchstat_summary.chartjs.ChartJS(self.label, [self]).html()
221
222     def _text_report_gen(self):
223         yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
224         for category, stat_max in sorted(self.stats_max.iteritems()):
225             for stat, val in sorted(stat_max.iteritems()):
226                 if stat.endswith('__rate'):
227                     continue
228                 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
229                 val = self._format(val)
230                 tot = self._format(self.job_tot[category].get(stat, '-'))
231                 yield "\t".join([category, stat, str(val), max_rate, tot])
232         for args in (
233                 ('Number of tasks: {}',
234                  len(self.tasks),
235                  None),
236                 ('Max CPU time spent by a single task: {}s',
237                  self.stats_max['cpu']['user+sys'],
238                  None),
239                 ('Max CPU usage in a single interval: {}%',
240                  self.stats_max['cpu']['user+sys__rate'],
241                  lambda x: x * 100),
242                 ('Overall CPU usage: {}%',
243                  self.job_tot['cpu']['user+sys'] /
244                  self.job_tot['time']['elapsed']
245                  if self.job_tot['time']['elapsed'] > 0 else 0,
246                  lambda x: x * 100),
247                 ('Max memory used by a single task: {}GB',
248                  self.stats_max['mem']['rss'],
249                  lambda x: x / 1e9),
250                 ('Max network traffic in a single task: {}GB',
251                  self.stats_max['net:eth0']['tx+rx'] +
252                  self.stats_max['net:keep0']['tx+rx'],
253                  lambda x: x / 1e9),
254                 ('Max network speed in a single interval: {}MB/s',
255                  self.stats_max['net:eth0']['tx+rx__rate'] +
256                  self.stats_max['net:keep0']['tx+rx__rate'],
257                  lambda x: x / 1e6),
258                 ('Keep cache miss rate {}%',
259                  (float(self.job_tot['keepcache']['miss']) /
260                  float(self.job_tot['keepcalls']['get']))
261                  if self.job_tot['keepcalls']['get'] > 0 else 0,
262                  lambda x: x * 100.0),
263                 ('Keep cache utilization {}%',
264                  (float(self.job_tot['blkio:0:0']['read']) /
265                  float(self.job_tot['net:keep0']['rx']))
266                  if self.job_tot['net:keep0']['rx'] > 0 else 0,
267                  lambda x: x * 100.0)):
268             format_string, val, transform = args
269             if val == float('-Inf'):
270                 continue
271             if transform:
272                 val = transform(val)
273             yield "# "+format_string.format(self._format(val))
274
275     def _recommend_gen(self):
276         return itertools.chain(
277             self._recommend_cpu(),
278             self._recommend_ram(),
279             self._recommend_keep_cache())
280
281     def _recommend_cpu(self):
282         """Recommend asking for 4 cores if max CPU usage was 333%"""
283
284         cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
285         if cpu_max_rate == float('-Inf'):
286             logger.warning('%s: no CPU usage data', self.label)
287             return
288         used_cores = max(1, int(math.ceil(cpu_max_rate)))
289         asked_cores = self.existing_constraints.get('min_cores_per_node')
290         if asked_cores is None or used_cores < asked_cores:
291             yield (
292                 '#!! {} max CPU usage was {}% -- '
293                 'try runtime_constraints "min_cores_per_node":{}'
294             ).format(
295                 self.label,
296                 int(math.ceil(cpu_max_rate*100)),
297                 int(used_cores))
298
299     def _recommend_ram(self):
300         """Recommend an economical RAM constraint for this job.
301
302         Nodes that are advertised as "8 gibibytes" actually have what
303         we might call "8 nearlygibs" of memory available for jobs.
304         Here, we calculate a whole number of nearlygibs that would
305         have sufficed to run the job, then recommend requesting a node
306         with that number of nearlygibs (expressed as mebibytes).
307
308         Requesting a node with "nearly 8 gibibytes" is our best hope
309         of getting a node that actually has nearly 8 gibibytes
310         available.  If the node manager is smart enough to account for
311         the discrepancy itself when choosing/creating a node, we'll
312         get an 8 GiB node with nearly 8 GiB available.  Otherwise, the
313         advertised size of the next-size-smaller node (say, 6 GiB)
314         will be too low to satisfy our request, so we will effectively
315         get rounded up to 8 GiB.
316
317         For example, if we need 7500 MiB, we can ask for 7500 MiB, and
318         we will generally get a node that is advertised as "8 GiB" and
319         has at least 7500 MiB available.  However, asking for 8192 MiB
320         would either result in an unnecessarily expensive 12 GiB node
321         (if node manager knows about the discrepancy), or an 8 GiB
322         node which has less than 8192 MiB available and is therefore
323         considered by crunch-dispatch to be too small to meet our
324         constraint.
325
326         When node manager learns how to predict the available memory
327         for each node type such that crunch-dispatch always agrees
328         that a node is big enough to run the job it was brought up
329         for, all this will be unnecessary.  We'll just ask for exactly
330         the memory we want -- even if that happens to be 8192 MiB.
331         """
332
333         used_bytes = self.stats_max['mem']['rss']
334         if used_bytes == float('-Inf'):
335             logger.warning('%s: no memory usage data', self.label)
336             return
337         used_mib = math.ceil(float(used_bytes) / 1048576)
338         asked_mib = self.existing_constraints.get('min_ram_mb_per_node')
339
340         nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
341         if asked_mib is None or (
342                 math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib)):
343             yield (
344                 '#!! {} max RSS was {} MiB -- '
345                 'try runtime_constraints "min_ram_mb_per_node":{}'
346             ).format(
347                 self.label,
348                 int(used_mib),
349                 int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024))
350
351     def _recommend_keep_cache(self):
352         """Recommend increasing keep cache if utilization < 80%"""
353         if self.job_tot['net:keep0']['rx'] == 0:
354             return
355         utilization = (float(self.job_tot['blkio:0:0']['read']) /
356                        float(self.job_tot['net:keep0']['rx']))
357         asked_mib = self.existing_constraints.get('keep_cache_mb_per_task', 256)
358
359         if utilization < 0.8:
360             yield (
361                 '#!! {} Keep cache utilization was {:.2f}% -- '
362                 'try runtime_constraints "keep_cache_mb_per_task":{} (or more)'
363             ).format(
364                 self.label,
365                 utilization * 100.0,
366                 asked_mib*2)
367
368
369     def _format(self, val):
370         """Return a string representation of a stat.
371
372         {:.2f} for floats, default format for everything else."""
373         if isinstance(val, float):
374             return '{:.2f}'.format(val)
375         else:
376             return '{}'.format(val)
377
378
379 class CollectionSummarizer(Summarizer):
380     def __init__(self, collection_id, **kwargs):
381         super(CollectionSummarizer, self).__init__(
382             crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
383         self.label = collection_id
384
385
386 class JobSummarizer(Summarizer):
387     def __init__(self, job, **kwargs):
388         arv = arvados.api('v1')
389         if isinstance(job, basestring):
390             self.job = arv.jobs().get(uuid=job).execute()
391         else:
392             self.job = job
393         rdr = None
394         if self.job.get('log'):
395             try:
396                 rdr = crunchstat_summary.reader.CollectionReader(self.job['log'])
397             except arvados.errors.NotFoundError as e:
398                 logger.warning("Trying event logs after failing to read "
399                                "log collection %s: %s", self.job['log'], e)
400             else:
401                 label = self.job['uuid']
402         if rdr is None:
403             rdr = crunchstat_summary.reader.LiveLogReader(self.job['uuid'])
404             label = self.job['uuid'] + ' (partial)'
405         super(JobSummarizer, self).__init__(rdr, **kwargs)
406         self.label = label
407         self.existing_constraints = self.job.get('runtime_constraints', {})
408
409
410 class PipelineSummarizer(object):
411     def __init__(self, pipeline_instance_uuid, **kwargs):
412         arv = arvados.api('v1', model=OrderedJsonModel())
413         instance = arv.pipeline_instances().get(
414             uuid=pipeline_instance_uuid).execute()
415         self.summarizers = collections.OrderedDict()
416         for cname, component in instance['components'].iteritems():
417             if 'job' not in component:
418                 logger.warning(
419                     "%s: skipping component with no job assigned", cname)
420             else:
421                 logger.info(
422                     "%s: job %s", cname, component['job']['uuid'])
423                 summarizer = JobSummarizer(component['job'], **kwargs)
424                 summarizer.label = '{} {}'.format(
425                     cname, component['job']['uuid'])
426                 self.summarizers[cname] = summarizer
427         self.label = pipeline_instance_uuid
428
429     def run(self):
430         threads = []
431         for summarizer in self.summarizers.itervalues():
432             t = threading.Thread(target=summarizer.run)
433             t.daemon = True
434             t.start()
435             threads.append(t)
436         for t in threads:
437             t.join()
438
439     def text_report(self):
440         txt = ''
441         for cname, summarizer in self.summarizers.iteritems():
442             txt += '### Summary for {} ({})\n'.format(
443                 cname, summarizer.job['uuid'])
444             txt += summarizer.text_report()
445             txt += '\n'
446         return txt
447
448     def html_report(self):
449         return crunchstat_summary.chartjs.ChartJS(
450             self.label, self.summarizers.itervalues()).html()