4924b400dd3d2db8a9e25301c10f7ff6f26c4a64
[arvados.git] / tools / crunchstat-summary / crunchstat_summary / summarizer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 from __future__ import print_function
6
7 import arvados
8 import collections
9 import crunchstat_summary.dygraphs
10 import crunchstat_summary.reader
11 import datetime
12 import functools
13 import itertools
14 import math
15 import re
16 import sys
17 import threading
18 import _strptime
19
20 from arvados.api import OrderedJsonModel
21 from crunchstat_summary import logger
22
23 # Recommend memory constraints that are this multiple of an integral
24 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
25 # that have amounts like 7.5 GiB according to the kernel.)
26 AVAILABLE_RAM_RATIO = 0.95
27
28
29 # Workaround datetime.datetime.strptime() thread-safety bug by calling
30 # it once before starting threads.  https://bugs.python.org/issue7980
31 datetime.datetime.strptime('1999-12-31_23:59:59', '%Y-%m-%d_%H:%M:%S')
32
33
34 WEBCHART_CLASS = crunchstat_summary.dygraphs.DygraphsChart
35
36
37 class Task(object):
38     def __init__(self):
39         self.starttime = None
40         self.series = collections.defaultdict(list)
41
42
43 class Summarizer(object):
44     def __init__(self, logdata, label=None, skip_child_jobs=False, uuid=None, **kwargs):
45         self._logdata = logdata
46
47         self.uuid = uuid
48         self.label = label
49         self.starttime = None
50         self.finishtime = None
51         self._skip_child_jobs = skip_child_jobs
52
53         # stats_max: {category: {stat: val}}
54         self.stats_max = collections.defaultdict(
55             functools.partial(collections.defaultdict, lambda: 0))
56         # task_stats: {task_id: {category: {stat: val}}}
57         self.task_stats = collections.defaultdict(
58             functools.partial(collections.defaultdict, dict))
59
60         self.seq_to_uuid = {}
61         self.tasks = collections.defaultdict(Task)
62
63         # We won't bother recommending new runtime constraints if the
64         # constraints given when running the job are known to us and
65         # are already suitable.  If applicable, the subclass
66         # constructor will overwrite this with something useful.
67         self.existing_constraints = {}
68
69         logger.debug("%s: logdata %s", self.label, logdata)
70
71     def run(self):
72         logger.debug("%s: parsing logdata %s", self.label, self._logdata)
73         self.detected_crunch1 = False
74         for line in self._logdata:
75             if not self.detected_crunch1 and '-8i9sb-' in line:
76                 self.detected_crunch1 = True
77
78             if self.detected_crunch1:
79                 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
80                 if m:
81                     seq = int(m.group('seq'))
82                     uuid = m.group('task_uuid')
83                     self.seq_to_uuid[seq] = uuid
84                     logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
85                     continue
86
87                 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
88                 if m:
89                     task_id = self.seq_to_uuid[int(m.group('seq'))]
90                     elapsed = int(m.group('elapsed'))
91                     self.task_stats[task_id]['time'] = {'elapsed': elapsed}
92                     if elapsed > self.stats_max['time']['elapsed']:
93                         self.stats_max['time']['elapsed'] = elapsed
94                     continue
95
96                 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
97                 if m:
98                     uuid = m.group('uuid')
99                     if self._skip_child_jobs:
100                         logger.warning('%s: omitting stats from child job %s'
101                                        ' because --skip-child-jobs flag is on',
102                                        self.label, uuid)
103                         continue
104                     logger.debug('%s: follow %s', self.label, uuid)
105                     child_summarizer = ProcessSummarizer(uuid)
106                     child_summarizer.stats_max = self.stats_max
107                     child_summarizer.task_stats = self.task_stats
108                     child_summarizer.tasks = self.tasks
109                     child_summarizer.starttime = self.starttime
110                     child_summarizer.run()
111                     logger.debug('%s: done %s', self.label, uuid)
112                     continue
113
114                 m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
115                 if not m:
116                     continue
117             else:
118                 # crunch2
119                 m = re.search(r'^(?P<timestamp>\S+) (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
120                 if not m:
121                     continue
122
123             if self.label is None:
124                 try:
125                     self.label = m.group('job_uuid')
126                 except IndexError:
127                     self.label = 'container'
128             if m.group('category').endswith(':'):
129                 # "stderr crunchstat: notice: ..."
130                 continue
131             elif m.group('category') in ('error', 'caught', 'Running'):
132                 continue
133             elif m.group('category') in ['read', 'open', 'cgroup', 'CID']:
134                 # "stderr crunchstat: read /proc/1234/net/dev: ..."
135                 # (old logs are less careful with unprefixed error messages)
136                 continue
137
138             if self.detected_crunch1:
139                 task_id = self.seq_to_uuid[int(m.group('seq'))]
140             else:
141                 task_id = 'container'
142             task = self.tasks[task_id]
143
144             # Use the first and last crunchstat timestamps as
145             # approximations of starttime and finishtime.
146             timestamp = m.group('timestamp')
147             if timestamp[10:11] == '_':
148                 timestamp = datetime.datetime.strptime(
149                     timestamp, '%Y-%m-%d_%H:%M:%S')
150             elif timestamp[10:11] == 'T':
151                 timestamp = datetime.datetime.strptime(
152                     timestamp[:19], '%Y-%m-%dT%H:%M:%S')
153             else:
154                 raise ValueError("Cannot parse timestamp {!r}".format(
155                     timestamp))
156
157             if not task.starttime:
158                 task.starttime = timestamp
159                 logger.debug('%s: task %s starttime %s',
160                              self.label, task_id, timestamp)
161             task.finishtime = timestamp
162
163             if not self.starttime:
164                 self.starttime = timestamp
165             self.finishtime = timestamp
166
167             this_interval_s = None
168             for group in ['current', 'interval']:
169                 if not m.group(group):
170                     continue
171                 category = m.group('category')
172                 words = m.group(group).split(' ')
173                 stats = {}
174                 try:
175                     for val, stat in zip(words[::2], words[1::2]):
176                         if '.' in val:
177                             stats[stat] = float(val)
178                         else:
179                             stats[stat] = int(val)
180                 except ValueError as e:
181                     logger.warning('Error parsing {} stat: {!r}'.format(
182                         stat, e))
183                     continue
184                 if 'user' in stats or 'sys' in stats:
185                     stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
186                 if 'tx' in stats or 'rx' in stats:
187                     stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
188                 for stat, val in stats.iteritems():
189                     if group == 'interval':
190                         if stat == 'seconds':
191                             this_interval_s = val
192                             continue
193                         elif not (this_interval_s > 0):
194                             logger.error(
195                                 "BUG? interval stat given with duration {!r}".
196                                 format(this_interval_s))
197                             continue
198                         else:
199                             stat = stat + '__rate'
200                             val = val / this_interval_s
201                             if stat in ['user+sys__rate', 'tx+rx__rate']:
202                                 task.series[category, stat].append(
203                                     (timestamp - self.starttime, val))
204                     else:
205                         if stat in ['rss']:
206                             task.series[category, stat].append(
207                                 (timestamp - self.starttime, val))
208                         self.task_stats[task_id][category][stat] = val
209                     if val > self.stats_max[category][stat]:
210                         self.stats_max[category][stat] = val
211         logger.debug('%s: done parsing', self.label)
212
213         self.job_tot = collections.defaultdict(
214             functools.partial(collections.defaultdict, int))
215         for task_id, task_stat in self.task_stats.iteritems():
216             for category, stat_last in task_stat.iteritems():
217                 for stat, val in stat_last.iteritems():
218                     if stat in ['cpus', 'cache', 'swap', 'rss']:
219                         # meaningless stats like 16 cpu cores x 5 tasks = 80
220                         continue
221                     self.job_tot[category][stat] += val
222         logger.debug('%s: done totals', self.label)
223
224     def long_label(self):
225         label = self.label
226         if self.finishtime:
227             label += ' -- elapsed time '
228             s = (self.finishtime - self.starttime).total_seconds()
229             if s > 86400:
230                 label += '{}d'.format(int(s/86400))
231             if s > 3600:
232                 label += '{}h'.format(int(s/3600) % 24)
233             if s > 60:
234                 label += '{}m'.format(int(s/60) % 60)
235             label += '{}s'.format(int(s) % 60)
236         return label
237
238     def text_report(self):
239         if not self.tasks:
240             return "(no report generated)\n"
241         return "\n".join(itertools.chain(
242             self._text_report_gen(),
243             self._recommend_gen())) + "\n"
244
245     def html_report(self):
246         return WEBCHART_CLASS(self.label, [self]).html()
247
248     def _text_report_gen(self):
249         yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
250         for category, stat_max in sorted(self.stats_max.iteritems()):
251             for stat, val in sorted(stat_max.iteritems()):
252                 if stat.endswith('__rate'):
253                     continue
254                 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
255                 val = self._format(val)
256                 tot = self._format(self.job_tot[category].get(stat, '-'))
257                 yield "\t".join([category, stat, str(val), max_rate, tot])
258         for args in (
259                 ('Number of tasks: {}',
260                  len(self.tasks),
261                  None),
262                 ('Max CPU time spent by a single task: {}s',
263                  self.stats_max['cpu']['user+sys'],
264                  None),
265                 ('Max CPU usage in a single interval: {}%',
266                  self.stats_max['cpu']['user+sys__rate'],
267                  lambda x: x * 100),
268                 ('Overall CPU usage: {}%',
269                  self.job_tot['cpu']['user+sys'] /
270                  self.job_tot['time']['elapsed']
271                  if self.job_tot['time']['elapsed'] > 0 else 0,
272                  lambda x: x * 100),
273                 ('Max memory used by a single task: {}GB',
274                  self.stats_max['mem']['rss'],
275                  lambda x: x / 1e9),
276                 ('Max network traffic in a single task: {}GB',
277                  self.stats_max['net:eth0']['tx+rx'] +
278                  self.stats_max['net:keep0']['tx+rx'],
279                  lambda x: x / 1e9),
280                 ('Max network speed in a single interval: {}MB/s',
281                  self.stats_max['net:eth0']['tx+rx__rate'] +
282                  self.stats_max['net:keep0']['tx+rx__rate'],
283                  lambda x: x / 1e6),
284                 ('Keep cache miss rate {}%',
285                  (float(self.job_tot['keepcache']['miss']) /
286                  float(self.job_tot['keepcalls']['get']))
287                  if self.job_tot['keepcalls']['get'] > 0 else 0,
288                  lambda x: x * 100.0),
289                 ('Keep cache utilization {}%',
290                  (float(self.job_tot['blkio:0:0']['read']) /
291                  float(self.job_tot['net:keep0']['rx']))
292                  if self.job_tot['net:keep0']['rx'] > 0 else 0,
293                  lambda x: x * 100.0)):
294             format_string, val, transform = args
295             if val == float('-Inf'):
296                 continue
297             if transform:
298                 val = transform(val)
299             yield "# "+format_string.format(self._format(val))
300
301     def _recommend_gen(self):
302         return itertools.chain(
303             self._recommend_cpu(),
304             self._recommend_ram(),
305             self._recommend_keep_cache())
306
307     def _recommend_cpu(self):
308         """Recommend asking for 4 cores if max CPU usage was 333%"""
309
310         constraint_key = self._map_runtime_constraint('vcpus')
311         cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
312         if cpu_max_rate == float('-Inf'):
313             logger.warning('%s: no CPU usage data', self.label)
314             return
315         used_cores = max(1, int(math.ceil(cpu_max_rate)))
316         asked_cores = self.existing_constraints.get(constraint_key)
317         if asked_cores is None or used_cores < asked_cores:
318             yield (
319                 '#!! {} max CPU usage was {}% -- '
320                 'try runtime_constraints "{}":{}'
321             ).format(
322                 self.label,
323                 int(math.ceil(cpu_max_rate*100)),
324                 constraint_key,
325                 int(used_cores))
326
327     def _recommend_ram(self):
328         """Recommend an economical RAM constraint for this job.
329
330         Nodes that are advertised as "8 gibibytes" actually have what
331         we might call "8 nearlygibs" of memory available for jobs.
332         Here, we calculate a whole number of nearlygibs that would
333         have sufficed to run the job, then recommend requesting a node
334         with that number of nearlygibs (expressed as mebibytes).
335
336         Requesting a node with "nearly 8 gibibytes" is our best hope
337         of getting a node that actually has nearly 8 gibibytes
338         available.  If the node manager is smart enough to account for
339         the discrepancy itself when choosing/creating a node, we'll
340         get an 8 GiB node with nearly 8 GiB available.  Otherwise, the
341         advertised size of the next-size-smaller node (say, 6 GiB)
342         will be too low to satisfy our request, so we will effectively
343         get rounded up to 8 GiB.
344
345         For example, if we need 7500 MiB, we can ask for 7500 MiB, and
346         we will generally get a node that is advertised as "8 GiB" and
347         has at least 7500 MiB available.  However, asking for 8192 MiB
348         would either result in an unnecessarily expensive 12 GiB node
349         (if node manager knows about the discrepancy), or an 8 GiB
350         node which has less than 8192 MiB available and is therefore
351         considered by crunch-dispatch to be too small to meet our
352         constraint.
353
354         When node manager learns how to predict the available memory
355         for each node type such that crunch-dispatch always agrees
356         that a node is big enough to run the job it was brought up
357         for, all this will be unnecessary.  We'll just ask for exactly
358         the memory we want -- even if that happens to be 8192 MiB.
359         """
360
361         constraint_key = self._map_runtime_constraint('ram')
362         used_bytes = self.stats_max['mem']['rss']
363         if used_bytes == float('-Inf'):
364             logger.warning('%s: no memory usage data', self.label)
365             return
366         used_mib = math.ceil(float(used_bytes) / 1048576)
367         asked_mib = self.existing_constraints.get(constraint_key)
368
369         nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
370         if asked_mib is None or (
371                 math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib)):
372             yield (
373                 '#!! {} max RSS was {} MiB -- '
374                 'try runtime_constraints "{}":{}'
375             ).format(
376                 self.label,
377                 int(used_mib),
378                 constraint_key,
379                 int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024*(2**20)/self._runtime_constraint_mem_unit()))
380
381     def _recommend_keep_cache(self):
382         """Recommend increasing keep cache if utilization < 80%"""
383         constraint_key = self._map_runtime_constraint('keep_cache_ram')
384         if self.job_tot['net:keep0']['rx'] == 0:
385             return
386         utilization = (float(self.job_tot['blkio:0:0']['read']) /
387                        float(self.job_tot['net:keep0']['rx']))
388         asked_mib = self.existing_constraints.get(constraint_key, 256)
389
390         if utilization < 0.8:
391             yield (
392                 '#!! {} Keep cache utilization was {:.2f}% -- '
393                 'try runtime_constraints "{}":{} (or more)'
394             ).format(
395                 self.label,
396                 utilization * 100.0,
397                 constraint_key,
398                 asked_mib*2*(2**20)/self._runtime_constraint_mem_unit())
399
400
401     def _format(self, val):
402         """Return a string representation of a stat.
403
404         {:.2f} for floats, default format for everything else."""
405         if isinstance(val, float):
406             return '{:.2f}'.format(val)
407         else:
408             return '{}'.format(val)
409
410     def _runtime_constraint_mem_unit(self):
411         if hasattr(self, 'runtime_constraint_mem_unit'):
412             return self.runtime_constraint_mem_unit
413         elif self.detected_crunch1:
414             return JobSummarizer.runtime_constraint_mem_unit
415         else:
416             return ContainerSummarizer.runtime_constraint_mem_unit
417
418     def _map_runtime_constraint(self, key):
419         if hasattr(self, 'map_runtime_constraint'):
420             return self.map_runtime_constraint[key]
421         elif self.detected_crunch1:
422             return JobSummarizer.map_runtime_constraint[key]
423         else:
424             return key
425
426
427 class CollectionSummarizer(Summarizer):
428     def __init__(self, collection_id, **kwargs):
429         super(CollectionSummarizer, self).__init__(
430             crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
431         self.label = collection_id
432
433
434 def NewSummarizer(process_or_uuid, **kwargs):
435     """Construct with the appropriate subclass for this uuid/object."""
436
437     if isinstance(process_or_uuid, dict):
438         process = process_or_uuid
439         uuid = process['uuid']
440     else:
441         uuid = process_or_uuid
442         process = None
443         arv = arvados.api('v1', model=OrderedJsonModel())
444
445     if '-dz642-' in uuid:
446         if process is None:
447             process = arv.containers().get(uuid=uuid).execute()
448         klass = ContainerTreeSummarizer
449     elif '-xvhdp-' in uuid:
450         if process is None:
451             process = arv.container_requests().get(uuid=uuid).execute()
452         klass = ContainerTreeSummarizer
453     elif '-8i9sb-' in uuid:
454         if process is None:
455             process = arv.jobs().get(uuid=uuid).execute()
456         klass = JobSummarizer
457     elif '-d1hrv-' in uuid:
458         if process is None:
459             process = arv.pipeline_instances().get(uuid=uuid).execute()
460         klass = PipelineSummarizer
461     elif '-4zz18-' in uuid:
462         return CollectionSummarizer(collection_id=uuid)
463     else:
464         raise ArgumentError("Unrecognized uuid %s", uuid)
465     return klass(process, uuid=uuid, **kwargs)
466
467
468 class ProcessSummarizer(Summarizer):
469     """Process is a job, pipeline, container, or container request."""
470
471     def __init__(self, process, label=None, **kwargs):
472         rdr = None
473         self.process = process
474         if label is None:
475             label = self.process.get('name', self.process['uuid'])
476         if self.process.get('log'):
477             try:
478                 rdr = crunchstat_summary.reader.CollectionReader(self.process['log'])
479             except arvados.errors.NotFoundError as e:
480                 logger.warning("Trying event logs after failing to read "
481                                "log collection %s: %s", self.process['log'], e)
482         if rdr is None:
483             rdr = crunchstat_summary.reader.LiveLogReader(self.process['uuid'])
484             label = label + ' (partial)'
485         super(ProcessSummarizer, self).__init__(rdr, label=label, **kwargs)
486         self.existing_constraints = self.process.get('runtime_constraints', {})
487
488
489 class JobSummarizer(ProcessSummarizer):
490     runtime_constraint_mem_unit = 1048576
491     map_runtime_constraint = {
492         'keep_cache_ram': 'keep_cache_mb_per_task',
493         'ram': 'min_ram_mb_per_node',
494         'vcpus': 'min_cores_per_node',
495     }
496
497
498 class ContainerSummarizer(ProcessSummarizer):
499     runtime_constraint_mem_unit = 1
500
501
502 class MultiSummarizer(object):
503     def __init__(self, children={}, label=None, threads=1, **kwargs):
504         self.throttle = threading.Semaphore(threads)
505         self.children = children
506         self.label = label
507
508     def run_and_release(self, target, *args, **kwargs):
509         try:
510             return target(*args, **kwargs)
511         finally:
512             self.throttle.release()
513
514     def run(self):
515         threads = []
516         for child in self.children.itervalues():
517             self.throttle.acquire()
518             t = threading.Thread(target=self.run_and_release, args=(child.run, ))
519             t.daemon = True
520             t.start()
521             threads.append(t)
522         for t in threads:
523             t.join()
524
525     def text_report(self):
526         txt = ''
527         for cname, child in self.children.iteritems():
528             if len(self.children) > 1:
529                 txt += '### Summary for {} ({})\n'.format(
530                     cname, child.process['uuid'])
531             txt += child.text_report()
532             txt += '\n'
533         return txt
534
535     def html_report(self):
536         return WEBCHART_CLASS(self.label, self.children.itervalues()).html()
537
538
539 class PipelineSummarizer(MultiSummarizer):
540     def __init__(self, instance, **kwargs):
541         children = collections.OrderedDict()
542         for cname, component in instance['components'].iteritems():
543             if 'job' not in component:
544                 logger.warning(
545                     "%s: skipping component with no job assigned", cname)
546             else:
547                 logger.info(
548                     "%s: job %s", cname, component['job']['uuid'])
549                 summarizer = JobSummarizer(component['job'], **kwargs)
550                 summarizer.label = '{} {}'.format(
551                     cname, component['job']['uuid'])
552                 children[cname] = summarizer
553         super(PipelineSummarizer, self).__init__(
554             children=children,
555             label=instance['uuid'],
556             **kwargs)
557
558
559 class ContainerTreeSummarizer(MultiSummarizer):
560     def __init__(self, root, **kwargs):
561         arv = arvados.api('v1', model=OrderedJsonModel())
562
563         label = kwargs.pop('label', None) or root.get('name') or root['uuid']
564         root['name'] = label
565
566         children = collections.OrderedDict()
567         todo = collections.deque((root, ))
568         while len(todo) > 0:
569             current = todo.popleft()
570             label = current['name']
571             sort_key = current['created_at']
572             if current['uuid'].find('-xvhdp-') > 0:
573                 current = arv.containers().get(uuid=current['container_uuid']).execute()
574
575             summer = ContainerSummarizer(current, label=label, **kwargs)
576             summer.sort_key = sort_key
577             children[current['uuid']] = summer
578
579             page_filters = []
580             while True:
581                 items = arv.container_requests().index(
582                     order=['uuid asc'],
583                     filters=page_filters+[
584                         ['requesting_container_uuid', '=', current['uuid']]],
585                 ).execute()['items']
586                 if not items:
587                     break
588                 page_filters = [['uuid', '>', items[-1]['uuid']]]
589                 for cr in items:
590                     if cr['container_uuid']:
591                         logger.debug('%s: container req %s', current['uuid'], cr['uuid'])
592                         cr['name'] = cr.get('name') or cr['uuid']
593                         todo.append(cr)
594         sorted_children = collections.OrderedDict()
595         for uuid in sorted(children.keys(), key=lambda uuid: children[uuid].sort_key):
596             sorted_children[uuid] = children[uuid]
597         super(ContainerTreeSummarizer, self).__init__(
598             children=sorted_children,
599             label=root['name'],
600             **kwargs)