7475: Merge branch 'master' into 7475-nodemgr-unsatisfiable-job-comms
[arvados.git] / tools / crunchstat-summary / crunchstat_summary / summarizer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 from __future__ import print_function
6
7 import arvados
8 import collections
9 import crunchstat_summary.dygraphs
10 import crunchstat_summary.reader
11 import datetime
12 import functools
13 import itertools
14 import math
15 import re
16 import sys
17 import threading
18 import _strptime
19
20 from arvados.api import OrderedJsonModel
21 from crunchstat_summary import logger
22
23 # Recommend memory constraints that are this multiple of an integral
24 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
25 # that have amounts like 7.5 GiB according to the kernel.)
26 AVAILABLE_RAM_RATIO = 0.95
27
28
29 # Workaround datetime.datetime.strptime() thread-safety bug by calling
30 # it once before starting threads.  https://bugs.python.org/issue7980
31 datetime.datetime.strptime('1999-12-31_23:59:59', '%Y-%m-%d_%H:%M:%S')
32
33
34 WEBCHART_CLASS = crunchstat_summary.dygraphs.DygraphsChart
35
36
37 class Task(object):
38     def __init__(self):
39         self.starttime = None
40         self.series = collections.defaultdict(list)
41
42
43 class Summarizer(object):
44     def __init__(self, logdata, label=None, skip_child_jobs=False):
45         self._logdata = logdata
46
47         self.label = label
48         self.starttime = None
49         self.finishtime = None
50         self._skip_child_jobs = skip_child_jobs
51
52         # stats_max: {category: {stat: val}}
53         self.stats_max = collections.defaultdict(
54             functools.partial(collections.defaultdict, lambda: 0))
55         # task_stats: {task_id: {category: {stat: val}}}
56         self.task_stats = collections.defaultdict(
57             functools.partial(collections.defaultdict, dict))
58
59         self.seq_to_uuid = {}
60         self.tasks = collections.defaultdict(Task)
61
62         # We won't bother recommending new runtime constraints if the
63         # constraints given when running the job are known to us and
64         # are already suitable.  If applicable, the subclass
65         # constructor will overwrite this with something useful.
66         self.existing_constraints = {}
67
68         logger.debug("%s: logdata %s", self.label, logdata)
69
70     def run(self):
71         logger.debug("%s: parsing logdata %s", self.label, self._logdata)
72         for line in self._logdata:
73             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
74             if m:
75                 seq = int(m.group('seq'))
76                 uuid = m.group('task_uuid')
77                 self.seq_to_uuid[seq] = uuid
78                 logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
79                 continue
80
81             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
82             if m:
83                 task_id = self.seq_to_uuid[int(m.group('seq'))]
84                 elapsed = int(m.group('elapsed'))
85                 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
86                 if elapsed > self.stats_max['time']['elapsed']:
87                     self.stats_max['time']['elapsed'] = elapsed
88                 continue
89
90             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
91             if m:
92                 uuid = m.group('uuid')
93                 if self._skip_child_jobs:
94                     logger.warning('%s: omitting stats from child job %s'
95                                    ' because --skip-child-jobs flag is on',
96                                    self.label, uuid)
97                     continue
98                 logger.debug('%s: follow %s', self.label, uuid)
99                 child_summarizer = JobSummarizer(uuid)
100                 child_summarizer.stats_max = self.stats_max
101                 child_summarizer.task_stats = self.task_stats
102                 child_summarizer.tasks = self.tasks
103                 child_summarizer.starttime = self.starttime
104                 child_summarizer.run()
105                 logger.debug('%s: done %s', self.label, uuid)
106                 continue
107
108             m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
109             if not m:
110                 continue
111
112             try:
113                 if self.label is None:
114                     self.label = m.group('job_uuid')
115                     logger.debug('%s: using job uuid as label', self.label)
116                 if m.group('category').endswith(':'):
117                     # "stderr crunchstat: notice: ..."
118                     continue
119                 elif m.group('category') in ('error', 'caught'):
120                     continue
121                 elif m.group('category') == 'read':
122                     # "stderr crunchstat: read /proc/1234/net/dev: ..."
123                     # (crunchstat formatting fixed, but old logs still say this)
124                     continue
125                 task_id = self.seq_to_uuid[int(m.group('seq'))]
126                 task = self.tasks[task_id]
127
128                 # Use the first and last crunchstat timestamps as
129                 # approximations of starttime and finishtime.
130                 timestamp = datetime.datetime.strptime(
131                     m.group('timestamp'), '%Y-%m-%d_%H:%M:%S')
132                 if not task.starttime:
133                     task.starttime = timestamp
134                     logger.debug('%s: task %s starttime %s',
135                                  self.label, task_id, timestamp)
136                 task.finishtime = timestamp
137
138                 if not self.starttime:
139                     self.starttime = timestamp
140                 self.finishtime = timestamp
141
142                 this_interval_s = None
143                 for group in ['current', 'interval']:
144                     if not m.group(group):
145                         continue
146                     category = m.group('category')
147                     words = m.group(group).split(' ')
148                     stats = {}
149                     for val, stat in zip(words[::2], words[1::2]):
150                         try:
151                             if '.' in val:
152                                 stats[stat] = float(val)
153                             else:
154                                 stats[stat] = int(val)
155                         except ValueError as e:
156                             raise ValueError(
157                                 'Error parsing {} stat: {!r}'.format(
158                                     stat, e))
159                     if 'user' in stats or 'sys' in stats:
160                         stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
161                     if 'tx' in stats or 'rx' in stats:
162                         stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
163                     for stat, val in stats.iteritems():
164                         if group == 'interval':
165                             if stat == 'seconds':
166                                 this_interval_s = val
167                                 continue
168                             elif not (this_interval_s > 0):
169                                 logger.error(
170                                     "BUG? interval stat given with duration {!r}".
171                                     format(this_interval_s))
172                                 continue
173                             else:
174                                 stat = stat + '__rate'
175                                 val = val / this_interval_s
176                                 if stat in ['user+sys__rate', 'tx+rx__rate']:
177                                     task.series[category, stat].append(
178                                         (timestamp - self.starttime, val))
179                         else:
180                             if stat in ['rss']:
181                                 task.series[category, stat].append(
182                                     (timestamp - self.starttime, val))
183                             self.task_stats[task_id][category][stat] = val
184                         if val > self.stats_max[category][stat]:
185                             self.stats_max[category][stat] = val
186             except Exception as e:
187                 logger.info('Skipping malformed line: {}Error was: {}\n'.format(line, e))
188         logger.debug('%s: done parsing', self.label)
189
190         self.job_tot = collections.defaultdict(
191             functools.partial(collections.defaultdict, int))
192         for task_id, task_stat in self.task_stats.iteritems():
193             for category, stat_last in task_stat.iteritems():
194                 for stat, val in stat_last.iteritems():
195                     if stat in ['cpus', 'cache', 'swap', 'rss']:
196                         # meaningless stats like 16 cpu cores x 5 tasks = 80
197                         continue
198                     self.job_tot[category][stat] += val
199         logger.debug('%s: done totals', self.label)
200
201     def long_label(self):
202         label = self.label
203         if self.finishtime:
204             label += ' -- elapsed time '
205             s = (self.finishtime - self.starttime).total_seconds()
206             if s > 86400:
207                 label += '{}d'.format(int(s/86400))
208             if s > 3600:
209                 label += '{}h'.format(int(s/3600) % 24)
210             if s > 60:
211                 label += '{}m'.format(int(s/60) % 60)
212             label += '{}s'.format(int(s) % 60)
213         return label
214
215     def text_report(self):
216         if not self.tasks:
217             return "(no report generated)\n"
218         return "\n".join(itertools.chain(
219             self._text_report_gen(),
220             self._recommend_gen())) + "\n"
221
222     def html_report(self):
223         return WEBCHART_CLASS(self.label, [self]).html()
224
225     def _text_report_gen(self):
226         yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
227         for category, stat_max in sorted(self.stats_max.iteritems()):
228             for stat, val in sorted(stat_max.iteritems()):
229                 if stat.endswith('__rate'):
230                     continue
231                 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
232                 val = self._format(val)
233                 tot = self._format(self.job_tot[category].get(stat, '-'))
234                 yield "\t".join([category, stat, str(val), max_rate, tot])
235         for args in (
236                 ('Number of tasks: {}',
237                  len(self.tasks),
238                  None),
239                 ('Max CPU time spent by a single task: {}s',
240                  self.stats_max['cpu']['user+sys'],
241                  None),
242                 ('Max CPU usage in a single interval: {}%',
243                  self.stats_max['cpu']['user+sys__rate'],
244                  lambda x: x * 100),
245                 ('Overall CPU usage: {}%',
246                  self.job_tot['cpu']['user+sys'] /
247                  self.job_tot['time']['elapsed']
248                  if self.job_tot['time']['elapsed'] > 0 else 0,
249                  lambda x: x * 100),
250                 ('Max memory used by a single task: {}GB',
251                  self.stats_max['mem']['rss'],
252                  lambda x: x / 1e9),
253                 ('Max network traffic in a single task: {}GB',
254                  self.stats_max['net:eth0']['tx+rx'] +
255                  self.stats_max['net:keep0']['tx+rx'],
256                  lambda x: x / 1e9),
257                 ('Max network speed in a single interval: {}MB/s',
258                  self.stats_max['net:eth0']['tx+rx__rate'] +
259                  self.stats_max['net:keep0']['tx+rx__rate'],
260                  lambda x: x / 1e6),
261                 ('Keep cache miss rate {}%',
262                  (float(self.job_tot['keepcache']['miss']) /
263                  float(self.job_tot['keepcalls']['get']))
264                  if self.job_tot['keepcalls']['get'] > 0 else 0,
265                  lambda x: x * 100.0),
266                 ('Keep cache utilization {}%',
267                  (float(self.job_tot['blkio:0:0']['read']) /
268                  float(self.job_tot['net:keep0']['rx']))
269                  if self.job_tot['net:keep0']['rx'] > 0 else 0,
270                  lambda x: x * 100.0)):
271             format_string, val, transform = args
272             if val == float('-Inf'):
273                 continue
274             if transform:
275                 val = transform(val)
276             yield "# "+format_string.format(self._format(val))
277
278     def _recommend_gen(self):
279         return itertools.chain(
280             self._recommend_cpu(),
281             self._recommend_ram(),
282             self._recommend_keep_cache())
283
284     def _recommend_cpu(self):
285         """Recommend asking for 4 cores if max CPU usage was 333%"""
286
287         cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
288         if cpu_max_rate == float('-Inf'):
289             logger.warning('%s: no CPU usage data', self.label)
290             return
291         used_cores = max(1, int(math.ceil(cpu_max_rate)))
292         asked_cores = self.existing_constraints.get('min_cores_per_node')
293         if asked_cores is None or used_cores < asked_cores:
294             yield (
295                 '#!! {} max CPU usage was {}% -- '
296                 'try runtime_constraints "min_cores_per_node":{}'
297             ).format(
298                 self.label,
299                 int(math.ceil(cpu_max_rate*100)),
300                 int(used_cores))
301
302     def _recommend_ram(self):
303         """Recommend an economical RAM constraint for this job.
304
305         Nodes that are advertised as "8 gibibytes" actually have what
306         we might call "8 nearlygibs" of memory available for jobs.
307         Here, we calculate a whole number of nearlygibs that would
308         have sufficed to run the job, then recommend requesting a node
309         with that number of nearlygibs (expressed as mebibytes).
310
311         Requesting a node with "nearly 8 gibibytes" is our best hope
312         of getting a node that actually has nearly 8 gibibytes
313         available.  If the node manager is smart enough to account for
314         the discrepancy itself when choosing/creating a node, we'll
315         get an 8 GiB node with nearly 8 GiB available.  Otherwise, the
316         advertised size of the next-size-smaller node (say, 6 GiB)
317         will be too low to satisfy our request, so we will effectively
318         get rounded up to 8 GiB.
319
320         For example, if we need 7500 MiB, we can ask for 7500 MiB, and
321         we will generally get a node that is advertised as "8 GiB" and
322         has at least 7500 MiB available.  However, asking for 8192 MiB
323         would either result in an unnecessarily expensive 12 GiB node
324         (if node manager knows about the discrepancy), or an 8 GiB
325         node which has less than 8192 MiB available and is therefore
326         considered by crunch-dispatch to be too small to meet our
327         constraint.
328
329         When node manager learns how to predict the available memory
330         for each node type such that crunch-dispatch always agrees
331         that a node is big enough to run the job it was brought up
332         for, all this will be unnecessary.  We'll just ask for exactly
333         the memory we want -- even if that happens to be 8192 MiB.
334         """
335
336         used_bytes = self.stats_max['mem']['rss']
337         if used_bytes == float('-Inf'):
338             logger.warning('%s: no memory usage data', self.label)
339             return
340         used_mib = math.ceil(float(used_bytes) / 1048576)
341         asked_mib = self.existing_constraints.get('min_ram_mb_per_node')
342
343         nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
344         if asked_mib is None or (
345                 math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib)):
346             yield (
347                 '#!! {} max RSS was {} MiB -- '
348                 'try runtime_constraints "min_ram_mb_per_node":{}'
349             ).format(
350                 self.label,
351                 int(used_mib),
352                 int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024))
353
354     def _recommend_keep_cache(self):
355         """Recommend increasing keep cache if utilization < 80%"""
356         if self.job_tot['net:keep0']['rx'] == 0:
357             return
358         utilization = (float(self.job_tot['blkio:0:0']['read']) /
359                        float(self.job_tot['net:keep0']['rx']))
360         asked_mib = self.existing_constraints.get('keep_cache_mb_per_task', 256)
361
362         if utilization < 0.8:
363             yield (
364                 '#!! {} Keep cache utilization was {:.2f}% -- '
365                 'try runtime_constraints "keep_cache_mb_per_task":{} (or more)'
366             ).format(
367                 self.label,
368                 utilization * 100.0,
369                 asked_mib*2)
370
371
372     def _format(self, val):
373         """Return a string representation of a stat.
374
375         {:.2f} for floats, default format for everything else."""
376         if isinstance(val, float):
377             return '{:.2f}'.format(val)
378         else:
379             return '{}'.format(val)
380
381
382 class CollectionSummarizer(Summarizer):
383     def __init__(self, collection_id, **kwargs):
384         super(CollectionSummarizer, self).__init__(
385             crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
386         self.label = collection_id
387
388
389 class JobSummarizer(Summarizer):
390     def __init__(self, job, **kwargs):
391         arv = arvados.api('v1')
392         if isinstance(job, basestring):
393             self.job = arv.jobs().get(uuid=job).execute()
394         else:
395             self.job = job
396         rdr = None
397         if self.job.get('log'):
398             try:
399                 rdr = crunchstat_summary.reader.CollectionReader(self.job['log'])
400             except arvados.errors.NotFoundError as e:
401                 logger.warning("Trying event logs after failing to read "
402                                "log collection %s: %s", self.job['log'], e)
403             else:
404                 label = self.job['uuid']
405         if rdr is None:
406             rdr = crunchstat_summary.reader.LiveLogReader(self.job['uuid'])
407             label = self.job['uuid'] + ' (partial)'
408         super(JobSummarizer, self).__init__(rdr, **kwargs)
409         self.label = label
410         self.existing_constraints = self.job.get('runtime_constraints', {})
411
412
413 class PipelineSummarizer(object):
414     def __init__(self, pipeline_instance_uuid, **kwargs):
415         arv = arvados.api('v1', model=OrderedJsonModel())
416         instance = arv.pipeline_instances().get(
417             uuid=pipeline_instance_uuid).execute()
418         self.summarizers = collections.OrderedDict()
419         for cname, component in instance['components'].iteritems():
420             if 'job' not in component:
421                 logger.warning(
422                     "%s: skipping component with no job assigned", cname)
423             else:
424                 logger.info(
425                     "%s: job %s", cname, component['job']['uuid'])
426                 summarizer = JobSummarizer(component['job'], **kwargs)
427                 summarizer.label = '{} {}'.format(
428                     cname, component['job']['uuid'])
429                 self.summarizers[cname] = summarizer
430         self.label = pipeline_instance_uuid
431
432     def run(self):
433         threads = []
434         for summarizer in self.summarizers.itervalues():
435             t = threading.Thread(target=summarizer.run)
436             t.daemon = True
437             t.start()
438             threads.append(t)
439         for t in threads:
440             t.join()
441
442     def text_report(self):
443         txt = ''
444         for cname, summarizer in self.summarizers.iteritems():
445             txt += '### Summary for {} ({})\n'.format(
446                 cname, summarizer.job['uuid'])
447             txt += summarizer.text_report()
448             txt += '\n'
449         return txt
450
451     def html_report(self):
452         return WEBCHART_CLASS(self.label, self.summarizers.itervalues()).html()