11309: Accommodate crunch2 crunchstat log format.
[arvados.git] / tools / crunchstat-summary / crunchstat_summary / summarizer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 from __future__ import print_function
6
7 import arvados
8 import collections
9 import crunchstat_summary.dygraphs
10 import crunchstat_summary.reader
11 import datetime
12 import functools
13 import itertools
14 import math
15 import re
16 import sys
17 import threading
18 import _strptime
19
20 from arvados.api import OrderedJsonModel
21 from crunchstat_summary import logger
22
23 # Recommend memory constraints that are this multiple of an integral
24 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
25 # that have amounts like 7.5 GiB according to the kernel.)
26 AVAILABLE_RAM_RATIO = 0.95
27
28
29 # Workaround datetime.datetime.strptime() thread-safety bug by calling
30 # it once before starting threads.  https://bugs.python.org/issue7980
31 datetime.datetime.strptime('1999-12-31_23:59:59', '%Y-%m-%d_%H:%M:%S')
32
33
34 WEBCHART_CLASS = crunchstat_summary.dygraphs.DygraphsChart
35
36
37 class Task(object):
38     def __init__(self):
39         self.starttime = None
40         self.series = collections.defaultdict(list)
41
42
43 class Summarizer(object):
44     def __init__(self, logdata, label=None, skip_child_jobs=False):
45         self._logdata = logdata
46
47         self.label = label
48         self.starttime = None
49         self.finishtime = None
50         self._skip_child_jobs = skip_child_jobs
51
52         # stats_max: {category: {stat: val}}
53         self.stats_max = collections.defaultdict(
54             functools.partial(collections.defaultdict, lambda: 0))
55         # task_stats: {task_id: {category: {stat: val}}}
56         self.task_stats = collections.defaultdict(
57             functools.partial(collections.defaultdict, dict))
58
59         self.seq_to_uuid = {}
60         self.tasks = collections.defaultdict(Task)
61
62         # We won't bother recommending new runtime constraints if the
63         # constraints given when running the job are known to us and
64         # are already suitable.  If applicable, the subclass
65         # constructor will overwrite this with something useful.
66         self.existing_constraints = {}
67
68         logger.debug("%s: logdata %s", self.label, logdata)
69
70     def run(self):
71         logger.debug("%s: parsing logdata %s", self.label, self._logdata)
72         for line in self._logdata:
73             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
74             if m:
75                 seq = int(m.group('seq'))
76                 uuid = m.group('task_uuid')
77                 self.seq_to_uuid[seq] = uuid
78                 logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
79                 continue
80
81             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
82             if m:
83                 task_id = self.seq_to_uuid[int(m.group('seq'))]
84                 elapsed = int(m.group('elapsed'))
85                 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
86                 if elapsed > self.stats_max['time']['elapsed']:
87                     self.stats_max['time']['elapsed'] = elapsed
88                 continue
89
90             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
91             if m:
92                 uuid = m.group('uuid')
93                 if self._skip_child_jobs:
94                     logger.warning('%s: omitting stats from child job %s'
95                                    ' because --skip-child-jobs flag is on',
96                                    self.label, uuid)
97                     continue
98                 logger.debug('%s: follow %s', self.label, uuid)
99                 child_summarizer = ProcessSummarizer(uuid)
100                 child_summarizer.stats_max = self.stats_max
101                 child_summarizer.task_stats = self.task_stats
102                 child_summarizer.tasks = self.tasks
103                 child_summarizer.starttime = self.starttime
104                 child_summarizer.run()
105                 logger.debug('%s: done %s', self.label, uuid)
106                 continue
107
108             m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
109             if m:
110                 # crunch1 job
111                 task_id = self.seq_to_uuid[int(m.group('seq'))]
112             else:
113                 m = re.search(r'^(?P<timestamp>\S+) (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
114                 if m:
115                     # crunch2 container (seq/task don't apply)
116                     task_id = 'container'
117                 else:
118                     # not a crunchstat log
119                     continue
120             task = self.tasks[task_id]
121
122             try:
123                 if self.label is None:
124                     self.label = m.group('job_uuid')
125                     logger.debug('%s: using job uuid as label', self.label)
126                 if m.group('category').endswith(':'):
127                     # "stderr crunchstat: notice: ..."
128                     continue
129                 elif m.group('category') in ('error', 'caught'):
130                     continue
131                 elif m.group('category') == 'read':
132                     # "stderr crunchstat: read /proc/1234/net/dev: ..."
133                     # (crunchstat formatting fixed, but old logs still say this)
134                     continue
135
136                 # Use the first and last crunchstat timestamps as
137                 # approximations of starttime and finishtime.
138                 timestamp = m.group('timestamp')
139                 if timestamp[10:11] == '_':
140                     timestamp = datetime.datetime.strptime(
141                         timestamp, '%Y-%m-%d_%H:%M:%S')
142                 elif timestamp[10:11] == 'T':
143                     timestamp = datetime.datetime.strptime(
144                         timestamp[:19], '%Y-%m-%dT%H:%M:%S')
145                 else:
146                     raise ValueError("Cannot parse timestamp {!r}".format(
147                         timestamp))
148
149                 if not task.starttime:
150                     task.starttime = timestamp
151                     logger.debug('%s: task %s starttime %s',
152                                  self.label, task_id, timestamp)
153                 task.finishtime = timestamp
154
155                 if not self.starttime:
156                     self.starttime = timestamp
157                 self.finishtime = timestamp
158
159                 this_interval_s = None
160                 for group in ['current', 'interval']:
161                     if not m.group(group):
162                         continue
163                     category = m.group('category')
164                     words = m.group(group).split(' ')
165                     stats = {}
166                     for val, stat in zip(words[::2], words[1::2]):
167                         try:
168                             if '.' in val:
169                                 stats[stat] = float(val)
170                             else:
171                                 stats[stat] = int(val)
172                         except ValueError as e:
173                             raise ValueError(
174                                 'Error parsing {} stat: {!r}'.format(
175                                     stat, e))
176                     if 'user' in stats or 'sys' in stats:
177                         stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
178                     if 'tx' in stats or 'rx' in stats:
179                         stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
180                     for stat, val in stats.iteritems():
181                         if group == 'interval':
182                             if stat == 'seconds':
183                                 this_interval_s = val
184                                 continue
185                             elif not (this_interval_s > 0):
186                                 logger.error(
187                                     "BUG? interval stat given with duration {!r}".
188                                     format(this_interval_s))
189                                 continue
190                             else:
191                                 stat = stat + '__rate'
192                                 val = val / this_interval_s
193                                 if stat in ['user+sys__rate', 'tx+rx__rate']:
194                                     task.series[category, stat].append(
195                                         (timestamp - self.starttime, val))
196                         else:
197                             if stat in ['rss']:
198                                 task.series[category, stat].append(
199                                     (timestamp - self.starttime, val))
200                             self.task_stats[task_id][category][stat] = val
201                         if val > self.stats_max[category][stat]:
202                             self.stats_max[category][stat] = val
203             except Exception as e:
204                 logger.info('Skipping malformed line: {}Error was: {}\n'.format(line, e))
205         logger.debug('%s: done parsing', self.label)
206
207         self.job_tot = collections.defaultdict(
208             functools.partial(collections.defaultdict, int))
209         for task_id, task_stat in self.task_stats.iteritems():
210             for category, stat_last in task_stat.iteritems():
211                 for stat, val in stat_last.iteritems():
212                     if stat in ['cpus', 'cache', 'swap', 'rss']:
213                         # meaningless stats like 16 cpu cores x 5 tasks = 80
214                         continue
215                     self.job_tot[category][stat] += val
216         logger.debug('%s: done totals', self.label)
217
218     def long_label(self):
219         label = self.label
220         if self.finishtime:
221             label += ' -- elapsed time '
222             s = (self.finishtime - self.starttime).total_seconds()
223             if s > 86400:
224                 label += '{}d'.format(int(s/86400))
225             if s > 3600:
226                 label += '{}h'.format(int(s/3600) % 24)
227             if s > 60:
228                 label += '{}m'.format(int(s/60) % 60)
229             label += '{}s'.format(int(s) % 60)
230         return label
231
232     def text_report(self):
233         if not self.tasks:
234             return "(no report generated)\n"
235         return "\n".join(itertools.chain(
236             self._text_report_gen(),
237             self._recommend_gen())) + "\n"
238
239     def html_report(self):
240         return WEBCHART_CLASS(self.label, [self]).html()
241
242     def _text_report_gen(self):
243         yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
244         for category, stat_max in sorted(self.stats_max.iteritems()):
245             for stat, val in sorted(stat_max.iteritems()):
246                 if stat.endswith('__rate'):
247                     continue
248                 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
249                 val = self._format(val)
250                 tot = self._format(self.job_tot[category].get(stat, '-'))
251                 yield "\t".join([category, stat, str(val), max_rate, tot])
252         for args in (
253                 ('Number of tasks: {}',
254                  len(self.tasks),
255                  None),
256                 ('Max CPU time spent by a single task: {}s',
257                  self.stats_max['cpu']['user+sys'],
258                  None),
259                 ('Max CPU usage in a single interval: {}%',
260                  self.stats_max['cpu']['user+sys__rate'],
261                  lambda x: x * 100),
262                 ('Overall CPU usage: {}%',
263                  self.job_tot['cpu']['user+sys'] /
264                  self.job_tot['time']['elapsed']
265                  if self.job_tot['time']['elapsed'] > 0 else 0,
266                  lambda x: x * 100),
267                 ('Max memory used by a single task: {}GB',
268                  self.stats_max['mem']['rss'],
269                  lambda x: x / 1e9),
270                 ('Max network traffic in a single task: {}GB',
271                  self.stats_max['net:eth0']['tx+rx'] +
272                  self.stats_max['net:keep0']['tx+rx'],
273                  lambda x: x / 1e9),
274                 ('Max network speed in a single interval: {}MB/s',
275                  self.stats_max['net:eth0']['tx+rx__rate'] +
276                  self.stats_max['net:keep0']['tx+rx__rate'],
277                  lambda x: x / 1e6),
278                 ('Keep cache miss rate {}%',
279                  (float(self.job_tot['keepcache']['miss']) /
280                  float(self.job_tot['keepcalls']['get']))
281                  if self.job_tot['keepcalls']['get'] > 0 else 0,
282                  lambda x: x * 100.0),
283                 ('Keep cache utilization {}%',
284                  (float(self.job_tot['blkio:0:0']['read']) /
285                  float(self.job_tot['net:keep0']['rx']))
286                  if self.job_tot['net:keep0']['rx'] > 0 else 0,
287                  lambda x: x * 100.0)):
288             format_string, val, transform = args
289             if val == float('-Inf'):
290                 continue
291             if transform:
292                 val = transform(val)
293             yield "# "+format_string.format(self._format(val))
294
295     def _recommend_gen(self):
296         return itertools.chain(
297             self._recommend_cpu(),
298             self._recommend_ram(),
299             self._recommend_keep_cache())
300
301     def _recommend_cpu(self):
302         """Recommend asking for 4 cores if max CPU usage was 333%"""
303
304         cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
305         if cpu_max_rate == float('-Inf'):
306             logger.warning('%s: no CPU usage data', self.label)
307             return
308         used_cores = max(1, int(math.ceil(cpu_max_rate)))
309         asked_cores = self.existing_constraints.get('min_cores_per_node')
310         if asked_cores is None or used_cores < asked_cores:
311             yield (
312                 '#!! {} max CPU usage was {}% -- '
313                 'try runtime_constraints "min_cores_per_node":{}'
314             ).format(
315                 self.label,
316                 int(math.ceil(cpu_max_rate*100)),
317                 int(used_cores))
318
319     def _recommend_ram(self):
320         """Recommend an economical RAM constraint for this job.
321
322         Nodes that are advertised as "8 gibibytes" actually have what
323         we might call "8 nearlygibs" of memory available for jobs.
324         Here, we calculate a whole number of nearlygibs that would
325         have sufficed to run the job, then recommend requesting a node
326         with that number of nearlygibs (expressed as mebibytes).
327
328         Requesting a node with "nearly 8 gibibytes" is our best hope
329         of getting a node that actually has nearly 8 gibibytes
330         available.  If the node manager is smart enough to account for
331         the discrepancy itself when choosing/creating a node, we'll
332         get an 8 GiB node with nearly 8 GiB available.  Otherwise, the
333         advertised size of the next-size-smaller node (say, 6 GiB)
334         will be too low to satisfy our request, so we will effectively
335         get rounded up to 8 GiB.
336
337         For example, if we need 7500 MiB, we can ask for 7500 MiB, and
338         we will generally get a node that is advertised as "8 GiB" and
339         has at least 7500 MiB available.  However, asking for 8192 MiB
340         would either result in an unnecessarily expensive 12 GiB node
341         (if node manager knows about the discrepancy), or an 8 GiB
342         node which has less than 8192 MiB available and is therefore
343         considered by crunch-dispatch to be too small to meet our
344         constraint.
345
346         When node manager learns how to predict the available memory
347         for each node type such that crunch-dispatch always agrees
348         that a node is big enough to run the job it was brought up
349         for, all this will be unnecessary.  We'll just ask for exactly
350         the memory we want -- even if that happens to be 8192 MiB.
351         """
352
353         used_bytes = self.stats_max['mem']['rss']
354         if used_bytes == float('-Inf'):
355             logger.warning('%s: no memory usage data', self.label)
356             return
357         used_mib = math.ceil(float(used_bytes) / 1048576)
358         asked_mib = self.existing_constraints.get('min_ram_mb_per_node')
359
360         nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
361         if asked_mib is None or (
362                 math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib)):
363             yield (
364                 '#!! {} max RSS was {} MiB -- '
365                 'try runtime_constraints "min_ram_mb_per_node":{}'
366             ).format(
367                 self.label,
368                 int(used_mib),
369                 int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024))
370
371     def _recommend_keep_cache(self):
372         """Recommend increasing keep cache if utilization < 80%"""
373         if self.job_tot['net:keep0']['rx'] == 0:
374             return
375         utilization = (float(self.job_tot['blkio:0:0']['read']) /
376                        float(self.job_tot['net:keep0']['rx']))
377         asked_mib = self.existing_constraints.get('keep_cache_mb_per_task', 256)
378
379         if utilization < 0.8:
380             yield (
381                 '#!! {} Keep cache utilization was {:.2f}% -- '
382                 'try runtime_constraints "keep_cache_mb_per_task":{} (or more)'
383             ).format(
384                 self.label,
385                 utilization * 100.0,
386                 asked_mib*2)
387
388
389     def _format(self, val):
390         """Return a string representation of a stat.
391
392         {:.2f} for floats, default format for everything else."""
393         if isinstance(val, float):
394             return '{:.2f}'.format(val)
395         else:
396             return '{}'.format(val)
397
398
399 class CollectionSummarizer(Summarizer):
400     def __init__(self, collection_id, **kwargs):
401         super(CollectionSummarizer, self).__init__(
402             crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
403         self.label = collection_id
404
405
406 def NewSummarizer(process, **kwargs):
407     """Construct with the appropriate subclass for this uuid/object."""
408
409     if not isinstance(process, dict):
410         uuid = process
411         process = None
412         arv = arvados.api('v1', model=OrderedJsonModel())
413
414     if re.search('-dz642-', uuid):
415         if process is None:
416             process = arv.containers().get(uuid=uuid).execute()
417         return ContainerSummarizer(process, **kwargs)
418     elif re.search('-xvhdp-', uuid):
419         if process is None:
420             ctrReq = arv.container_requests().get(uuid=uuid).execute()
421             ctrUUID = ctrReq['container_uuid']
422             process = arv.containers().get(uuid=ctrUUID).execute()
423         return ContainerSummarizer(process, **kwargs)
424     elif re.search('-8i9sb-', uuid):
425         if process is None:
426             process = arv.jobs().get(uuid=uuid).execute()
427         return JobSummarizer(process, **kwargs)
428     elif re.search('-d1hrv-', uuid):
429         if process is None:
430             process = arv.pipeline_instances().get(uuid=uuid).execute()
431         return PipelineSummarizer(process, **kwargs)
432     else:
433         raise ArgumentError("Unrecognized uuid %s", uuid)
434
435
436 class ProcessSummarizer(Summarizer):
437     """Process is a job, pipeline, container, or container request."""
438
439     def __init__(self, process, **kwargs):
440         rdr = None
441         self.process = process
442         if self.process.get('log'):
443             try:
444                 rdr = crunchstat_summary.reader.CollectionReader(self.process['log'])
445             except arvados.errors.NotFoundError as e:
446                 logger.warning("Trying event logs after failing to read "
447                                "log collection %s: %s", self.process['log'], e)
448             else:
449                 label = self.process['uuid']
450         if rdr is None:
451             rdr = crunchstat_summary.reader.LiveLogReader(self.process['uuid'])
452             label = self.process['uuid'] + ' (partial)'
453         super(ProcessSummarizer, self).__init__(rdr, **kwargs)
454         self.label = label
455         self.existing_constraints = self.process.get('runtime_constraints', {})
456
457
458 class JobSummarizer(ProcessSummarizer):
459     pass
460
461
462 class ContainerSummarizer(ProcessSummarizer):
463     pass
464
465
466 class PipelineSummarizer(object):
467     def __init__(self, instance, **kwargs):
468         self.summarizers = collections.OrderedDict()
469         for cname, component in instance['components'].iteritems():
470             if 'job' not in component:
471                 logger.warning(
472                     "%s: skipping component with no job assigned", cname)
473             else:
474                 logger.info(
475                     "%s: job %s", cname, component['job']['uuid'])
476                 summarizer = JobSummarizer(component['job'], **kwargs)
477                 summarizer.label = '{} {}'.format(
478                     cname, component['job']['uuid'])
479                 self.summarizers[cname] = summarizer
480         self.label = instance['uuid']
481
482     def run(self):
483         threads = []
484         for summarizer in self.summarizers.itervalues():
485             t = threading.Thread(target=summarizer.run)
486             t.daemon = True
487             t.start()
488             threads.append(t)
489         for t in threads:
490             t.join()
491
492     def text_report(self):
493         txt = ''
494         for cname, summarizer in self.summarizers.iteritems():
495             txt += '### Summary for {} ({})\n'.format(
496                 cname, summarizer.process['uuid'])
497             txt += summarizer.text_report()
498             txt += '\n'
499         return txt
500
501     def html_report(self):
502         return WEBCHART_CLASS(self.label, self.summarizers.itervalues()).html()