a67b72f89cdc84dc4736c77cff49ec3a0f142353
[arvados.git] / tools / crunchstat-summary / crunchstat_summary / summarizer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 import arvados
6 import collections
7 import crunchstat_summary.dygraphs
8 import crunchstat_summary.reader
9 import datetime
10 import functools
11 import itertools
12 import math
13 import re
14 import sys
15 import _strptime
16 import arvados.util
17
18 from concurrent.futures import ThreadPoolExecutor
19
20 from crunchstat_summary import logger
21
22 # Recommend memory constraints that are this multiple of an integral
23 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
24 # that have amounts like 7.5 GiB according to the kernel.)
25 AVAILABLE_RAM_RATIO = 0.90
26 MB=2**20
27
28 # Workaround datetime.datetime.strptime() thread-safety bug by calling
29 # it once before starting threads.  https://bugs.python.org/issue7980
30 datetime.datetime.strptime('1999-12-31_23:59:59', '%Y-%m-%d_%H:%M:%S')
31
32
33 WEBCHART_CLASS = crunchstat_summary.dygraphs.DygraphsChart
34
35
36 class Task(object):
37     def __init__(self):
38         self.starttime = None
39         self.finishtime = None
40         self.series = collections.defaultdict(list)
41
42
43 class Summarizer(object):
44     def __init__(self, logdata, label=None, skip_child_jobs=False, uuid=None, **kwargs):
45         self._logdata = logdata
46
47         self.uuid = uuid
48         self.label = label
49         self.starttime = None
50         self.finishtime = None
51         self._skip_child_jobs = skip_child_jobs
52
53         # stats_max: {category: {stat: val}}
54         self.stats_max = collections.defaultdict(
55             functools.partial(collections.defaultdict, lambda: 0))
56         # task_stats: {task_id: {category: {stat: val}}}
57         self.task_stats = collections.defaultdict(
58             functools.partial(collections.defaultdict, dict))
59
60         self.seq_to_uuid = {}
61         self.tasks = collections.defaultdict(Task)
62
63         # We won't bother recommending new runtime constraints if the
64         # constraints given when running the job are known to us and
65         # are already suitable.  If applicable, the subclass
66         # constructor will overwrite this with something useful.
67         self.existing_constraints = {}
68         self.node_info = {}
69
70         logger.info("%s: logdata %s", self.label, logdata)
71
72     def run(self):
73         logger.debug("%s: parsing logdata %s", self.label, self._logdata)
74         with self._logdata as logdata:
75             self._run(logdata)
76
77     def _run(self, logdata):
78         self.detected_crunch1 = False
79
80         if not self.node_info:
81             self.node_info = logdata.node_info()
82
83         for line in logdata:
84             if not self.detected_crunch1 and '-8i9sb-' in line:
85                 self.detected_crunch1 = True
86
87             if self.detected_crunch1:
88                 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
89                 if m:
90                     seq = int(m.group('seq'))
91                     uuid = m.group('task_uuid')
92                     self.seq_to_uuid[seq] = uuid
93                     logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
94                     continue
95
96                 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
97                 if m:
98                     task_id = self.seq_to_uuid[int(m.group('seq'))]
99                     elapsed = int(m.group('elapsed'))
100                     self.task_stats[task_id]['time'] = {'elapsed': elapsed}
101                     if elapsed > self.stats_max['time']['elapsed']:
102                         self.stats_max['time']['elapsed'] = elapsed
103                     continue
104
105                 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
106                 if m:
107                     uuid = m.group('uuid')
108                     if self._skip_child_jobs:
109                         logger.warning('%s: omitting stats from child job %s'
110                                        ' because --skip-child-jobs flag is on',
111                                        self.label, uuid)
112                         continue
113                     logger.debug('%s: follow %s', self.label, uuid)
114                     child_summarizer = NewSummarizer(uuid)
115                     child_summarizer.stats_max = self.stats_max
116                     child_summarizer.task_stats = self.task_stats
117                     child_summarizer.tasks = self.tasks
118                     child_summarizer.starttime = self.starttime
119                     child_summarizer.run()
120                     logger.debug('%s: done %s', self.label, uuid)
121                     continue
122
123                 # 2017-12-02_17:15:08 e51c5-8i9sb-mfp68stkxnqdd6m 63676 0 stderr crunchstat: keepcalls 0 put 2576 get -- interval 10.0000 seconds 0 put 2576 get
124                 m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr (?P<crunchstat>crunchstat: )(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
125                 if not m:
126                     continue
127             else:
128                 # crunch2
129                 # 2017-12-01T16:56:24.723509200Z crunchstat: keepcalls 0 put 3 get -- interval 10.0000 seconds 0 put 3 get
130                 m = re.search(r'^(?P<timestamp>\S+) (?P<crunchstat>crunchstat: )?(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
131                 if not m:
132                     continue
133
134             if self.label is None:
135                 try:
136                     self.label = m.group('job_uuid')
137                 except IndexError:
138                     self.label = 'label #1'
139             category = m.group('category')
140             if category.endswith(':'):
141                 # "stderr crunchstat: notice: ..."
142                 continue
143             elif category in ('error', 'caught'):
144                 continue
145             elif category in ('read', 'open', 'cgroup', 'CID', 'Running'):
146                 # "stderr crunchstat: read /proc/1234/net/dev: ..."
147                 # (old logs are less careful with unprefixed error messages)
148                 continue
149
150             if self.detected_crunch1:
151                 task_id = self.seq_to_uuid[int(m.group('seq'))]
152             else:
153                 task_id = 'container'
154             task = self.tasks[task_id]
155
156             # Use the first and last crunchstat timestamps as
157             # approximations of starttime and finishtime.
158             timestamp = m.group('timestamp')
159             if timestamp[10:11] == '_':
160                 timestamp = datetime.datetime.strptime(
161                     timestamp, '%Y-%m-%d_%H:%M:%S')
162             elif timestamp[10:11] == 'T':
163                 timestamp = datetime.datetime.strptime(
164                     timestamp[:19], '%Y-%m-%dT%H:%M:%S')
165             else:
166                 raise ValueError("Cannot parse timestamp {!r}".format(
167                     timestamp))
168
169             if task.starttime is None:
170                 logger.debug('%s: task %s starttime %s',
171                              self.label, task_id, timestamp)
172             if task.starttime is None or timestamp < task.starttime:
173                 task.starttime = timestamp
174             if task.finishtime is None or timestamp > task.finishtime:
175                 task.finishtime = timestamp
176
177             if self.starttime is None or timestamp < self.starttime:
178                 self.starttime = timestamp
179             if self.finishtime is None or timestamp > self.finishtime:
180                 self.finishtime = timestamp
181
182             if (not self.detected_crunch1) and task.starttime is not None and task.finishtime is not None:
183                 elapsed = (task.finishtime - task.starttime).seconds
184                 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
185                 if elapsed > self.stats_max['time']['elapsed']:
186                     self.stats_max['time']['elapsed'] = elapsed
187
188             this_interval_s = None
189             for group in ['current', 'interval']:
190                 if not m.group(group):
191                     continue
192                 category = m.group('category')
193                 words = m.group(group).split(' ')
194                 stats = {}
195                 try:
196                     for val, stat in zip(words[::2], words[1::2]):
197                         if '.' in val:
198                             stats[stat] = float(val)
199                         else:
200                             stats[stat] = int(val)
201                 except ValueError as e:
202                     # If the line doesn't start with 'crunchstat:' we
203                     # might have mistaken an error message for a
204                     # structured crunchstat line.
205                     if m.group("crunchstat") is None or m.group("category") == "crunchstat":
206                         logger.warning("%s: log contains message\n  %s", self.label, line)
207                     else:
208                         logger.warning(
209                             '%s: Error parsing value %r (stat %r, category %r): %r',
210                             self.label, val, stat, category, e)
211                         logger.warning('%s', line)
212                     continue
213                 if 'user' in stats or 'sys' in stats:
214                     stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
215                 if 'tx' in stats or 'rx' in stats:
216                     stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
217                 if group == 'interval':
218                     if 'seconds' in stats:
219                         this_interval_s = stats.get('seconds',0)
220                         del stats['seconds']
221                         if this_interval_s <= 0:
222                             logger.error(
223                                 "BUG? interval stat given with duration {!r}".
224                                 format(this_interval_s))
225                     else:
226                         logger.error('BUG? interval stat missing duration')
227                 for stat, val in stats.items():
228                     if group == 'interval' and this_interval_s:
229                             stat = stat + '__rate'
230                             val = val / this_interval_s
231                             if stat in ['user+sys__rate', 'user__rate', 'sys__rate', 'tx+rx__rate', 'rx__rate', 'tx__rate']:
232                                 task.series[category, stat].append(
233                                     (timestamp - self.starttime, val))
234                     else:
235                         if stat in ['rss','used','total']:
236                             task.series[category, stat].append(
237                                 (timestamp - self.starttime, val))
238                         self.task_stats[task_id][category][stat] = val
239                     if val > self.stats_max[category][stat]:
240                         self.stats_max[category][stat] = val
241         logger.debug('%s: done parsing', self.label)
242
243         self.job_tot = collections.defaultdict(
244             functools.partial(collections.defaultdict, int))
245         for task_id, task_stat in self.task_stats.items():
246             for category, stat_last in task_stat.items():
247                 for stat, val in stat_last.items():
248                     if stat in ['cpus', 'cache', 'swap', 'rss']:
249                         # meaningless stats like 16 cpu cores x 5 tasks = 80
250                         continue
251                     self.job_tot[category][stat] += val
252         logger.debug('%s: done totals', self.label)
253
254         missing_category = {
255             'cpu': 'CPU',
256             'mem': 'memory',
257             'net:': 'network I/O',
258             'statfs': 'storage space',
259         }
260         for task_stat in self.task_stats.values():
261             for category in task_stat.keys():
262                 for checkcat in missing_category:
263                     if checkcat.endswith(':'):
264                         if category.startswith(checkcat):
265                             missing_category.pop(checkcat)
266                             break
267                     else:
268                         if category == checkcat:
269                             missing_category.pop(checkcat)
270                             break
271         for catlabel in missing_category.values():
272             logger.warning('%s: %s stats are missing -- possible cluster configuration issue',
273                         self.label, catlabel)
274
275     def long_label(self):
276         label = self.label
277         if hasattr(self, 'process') and self.process['uuid'] not in label:
278             label = '{} ({})'.format(label, self.process['uuid'])
279         return label
280
281     def elapsed_time(self):
282         if not self.finishtime:
283             return ""
284         label = ""
285         s = (self.finishtime - self.starttime).total_seconds()
286         if s > 86400:
287             label += '{}d '.format(int(s/86400))
288         if s > 3600:
289             label += '{}h '.format(int(s/3600) % 24)
290         if s > 60:
291             label += '{}m '.format(int(s/60) % 60)
292         label += '{}s'.format(int(s) % 60)
293         return label
294
295     def text_report(self):
296         if not self.tasks:
297             return "(no report generated)\n"
298         return "\n".join(itertools.chain(
299             self._text_report_table_gen(lambda x: "\t".join(x),
300                                   lambda x: "\t".join(x)),
301             self._text_report_agg_gen(lambda x: "# {}: {}{}".format(x[0], x[1], x[2])),
302             self._recommend_gen(lambda x: "#!! "+x))) + "\n"
303
304     def html_report(self):
305         return WEBCHART_CLASS(self.label, [self]).html()
306
307     def _text_report_table_gen(self, headerformat, rowformat):
308         yield headerformat(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
309         for category, stat_max in sorted(self.stats_max.items()):
310             for stat, val in sorted(stat_max.items()):
311                 if stat.endswith('__rate'):
312                     continue
313                 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
314                 val = self._format(val)
315                 tot = self._format(self.job_tot[category].get(stat, '-'))
316                 yield rowformat([category, stat, str(val), max_rate, tot])
317
318     def _text_report_agg_gen(self, aggformat):
319         by_single_task = ""
320         if len(self.tasks) > 1:
321             by_single_task = " by a single task"
322
323         metrics = [
324             ('Elapsed time',
325              self.elapsed_time(),
326              None,
327              ''),
328
329             ('Estimated cost',
330              '${:.3f}'.format(self.cost),
331              None,
332              '') if self.cost > 0 else None,
333
334             ('Assigned instance type',
335              self.node_info.get('ProviderType'),
336              None,
337              '') if self.node_info.get('ProviderType') else None,
338
339             ('Instance hourly price',
340              '${:.3f}'.format(self.node_info.get('Price')),
341              None,
342              '') if self.node_info.get('Price') else None,
343
344             ('Max CPU usage in a single interval',
345              self.stats_max['cpu']['user+sys__rate'],
346              lambda x: x * 100,
347              '%'),
348
349             ('Overall CPU usage',
350              float(self.job_tot['cpu']['user+sys']) /
351              self.job_tot['time']['elapsed']
352              if self.job_tot['time']['elapsed'] > 0 else 0,
353              lambda x: x * 100,
354              '%'),
355
356             ('Requested CPU cores',
357              self.existing_constraints.get(self._map_runtime_constraint('vcpus')),
358              None,
359              ''),
360
361             ('Instance VCPUs',
362              self.node_info.get('VCPUs'),
363              None,
364              '') if self.node_info.get('VCPUs') else None,
365
366             ('Max memory used{}'.format(by_single_task),
367              self.stats_max['mem']['rss'],
368              lambda x: x / 2**20,
369              'MB'),
370
371             ('Requested RAM',
372              self.existing_constraints.get(self._map_runtime_constraint('ram')),
373              lambda x: x / 2**20,
374              'MB'),
375
376             ('Maximum RAM request for this instance type',
377              (self.node_info.get('RAM') - self.arv_config.get('Containers', {}).get('ReserveExtraRAM', {}))*.95,
378              lambda x: x / 2**20,
379              'MB'),
380
381             ('Max network traffic{}'.format(by_single_task),
382              self.stats_max['net:eth0']['tx+rx'] +
383              self.stats_max['net:keep0']['tx+rx'],
384              lambda x: x / 1e9,
385              'GB'),
386
387             ('Max network speed in a single interval',
388              self.stats_max['net:eth0']['tx+rx__rate'] +
389              self.stats_max['net:keep0']['tx+rx__rate'],
390              lambda x: x / 1e6,
391              'MB/s'),
392
393             ('Keep cache miss rate',
394              (float(self.job_tot['keepcache']['miss']) /
395               float(self.job_tot['keepcalls']['get']))
396              if self.job_tot['keepcalls']['get'] > 0 else 0,
397              lambda x: x * 100.0,
398              '%'),
399
400             ('Keep cache utilization',
401              (float(self.job_tot['blkio:0:0']['read']) /
402               float(self.job_tot['net:keep0']['rx']))
403              if self.job_tot['net:keep0']['rx'] > 0 else 0,
404              lambda x: x * 100.0,
405              '%'),
406
407             ('Temp disk utilization',
408              (float(self.job_tot['statfs']['used']) /
409               float(self.job_tot['statfs']['total']))
410              if self.job_tot['statfs']['total'] > 0 else 0,
411              lambda x: x * 100.0,
412              '%'),
413         ]
414
415         if len(self.tasks) > 1:
416             metrics.insert(0, ('Number of tasks',
417                  len(self.tasks),
418                  None,
419                  ''))
420         for args in metrics:
421             if args is None:
422                 continue
423             format_string, val, transform, suffix = args
424             if val == float('-Inf'):
425                 continue
426             if transform:
427                 val = transform(val)
428             yield aggformat((format_string, self._format(val), suffix))
429
430     def _recommend_gen(self, recommendformat):
431         # TODO recommend fixing job granularity if elapsed time is too short
432         return itertools.chain(
433             self._recommend_cpu(recommendformat),
434             self._recommend_ram(recommendformat),
435             self._recommend_keep_cache(recommendformat),
436             self._recommend_temp_disk(recommendformat),
437             )
438
439     def _recommend_cpu(self, recommendformat):
440         """Recommend asking for 4 cores if max CPU usage was 333%"""
441
442         constraint_key = self._map_runtime_constraint('vcpus')
443         cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
444         if cpu_max_rate == float('-Inf') or cpu_max_rate == 0.0:
445             logger.warning('%s: no CPU usage data', self.label)
446             return
447         # TODO Don't necessarily want to recommend on isolated max peak
448         # take average CPU usage into account as well or % time at max
449         used_cores = max(1, int(math.ceil(cpu_max_rate)))
450         asked_cores = self.existing_constraints.get(constraint_key)
451         if asked_cores is None:
452             asked_cores = 1
453         # TODO: This should be more nuanced in cases where max >> avg
454         if used_cores < asked_cores:
455             yield recommendformat(
456                 '{} max CPU usage was {}% -- '
457                 'try reducing runtime_constraints to "{}":{}'
458             ).format(
459                 self.label,
460                 math.ceil(cpu_max_rate*100),
461                 constraint_key,
462                 int(used_cores))
463
464     # FIXME: This needs to be updated to account for current a-d-c algorithms
465     def _recommend_ram(self, recommendformat):
466         """Recommend an economical RAM constraint for this job.
467
468         Nodes that are advertised as "8 gibibytes" actually have what
469         we might call "8 nearlygibs" of memory available for jobs.
470         Here, we calculate a whole number of nearlygibs that would
471         have sufficed to run the job, then recommend requesting a node
472         with that number of nearlygibs (expressed as mebibytes).
473
474         Requesting a node with "nearly 8 gibibytes" is our best hope
475         of getting a node that actually has nearly 8 gibibytes
476         available.  If the node manager is smart enough to account for
477         the discrepancy itself when choosing/creating a node, we'll
478         get an 8 GiB node with nearly 8 GiB available.  Otherwise, the
479         advertised size of the next-size-smaller node (say, 6 GiB)
480         will be too low to satisfy our request, so we will effectively
481         get rounded up to 8 GiB.
482
483         For example, if we need 7500 MiB, we can ask for 7500 MiB, and
484         we will generally get a node that is advertised as "8 GiB" and
485         has at least 7500 MiB available.  However, asking for 8192 MiB
486         would either result in an unnecessarily expensive 12 GiB node
487         (if node manager knows about the discrepancy), or an 8 GiB
488         node which has less than 8192 MiB available and is therefore
489         considered by crunch-dispatch to be too small to meet our
490         constraint.
491
492         When node manager learns how to predict the available memory
493         for each node type such that crunch-dispatch always agrees
494         that a node is big enough to run the job it was brought up
495         for, all this will be unnecessary.  We'll just ask for exactly
496         the memory we want -- even if that happens to be 8192 MiB.
497         """
498
499         constraint_key = self._map_runtime_constraint('ram')
500         used_bytes = self.stats_max['mem']['rss']
501         if used_bytes == float('-Inf'):
502             logger.warning('%s: no memory usage data', self.label)
503             return
504         used_mib = math.ceil(float(used_bytes) / MB)
505         asked_mib = self.existing_constraints.get(constraint_key) / MB
506
507         nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
508         ratio = 0.5
509         recommend_mib = int(math.ceil(nearlygibs(used_mib/ratio))*AVAILABLE_RAM_RATIO*1024)
510         if used_mib > 0 and (used_mib / asked_mib) < ratio and asked_mib > recommend_mib:
511             yield recommendformat(
512                 '{} requested {} MiB of RAM but actual RAM usage was below {}% at {} MiB -- '
513                 'suggest reducing RAM request to {} MiB'
514             ).format(
515                 self.label,
516                 int(asked_mib),
517                 int(100*ratio),
518                 int(used_mib),
519                 recommend_mib)
520
521     def _recommend_keep_cache(self, recommendformat):
522         """Recommend increasing keep cache if utilization < 80%"""
523         constraint_key = self._map_runtime_constraint('keep_cache_ram')
524         if self.job_tot['net:keep0']['rx'] == 0:
525             return
526         utilization = (float(self.job_tot['blkio:0:0']['read']) /
527                        float(self.job_tot['net:keep0']['rx']))
528         # FIXME: the default on this get won't work correctly
529         asked_cache = self.existing_constraints.get(constraint_key, 256) * self._runtime_constraint_mem_unit()
530
531         if utilization < 0.8:
532             yield recommendformat(
533                 '{} Keep cache utilization was {:.2f}% -- '
534                 'try doubling runtime_constraints to "{}":{} (or more)'
535             ).format(
536                 self.label,
537                 utilization * 100.0,
538                 constraint_key,
539                 math.ceil(asked_cache * 2 / self._runtime_constraint_mem_unit()))
540
541
542     def _recommend_temp_disk(self, recommendformat):
543         """Recommend decreasing temp disk if utilization < 50%"""
544         total = float(self.job_tot['statfs']['total'])
545         utilization = (float(self.job_tot['statfs']['used']) / total) if total > 0 else 0.0
546
547         if utilization < 50.8 and total > 0:
548             yield recommendformat(
549                 '{} max temp disk utilization was {:.0f}% of {:.0f} MiB -- '
550                 'consider reducing "tmpdirMin" and/or "outdirMin"'
551             ).format(
552                 self.label,
553                 utilization * 100.0,
554                 total / MB)
555
556
557     def _format(self, val):
558         """Return a string representation of a stat.
559
560         {:.2f} for floats, default format for everything else."""
561         if isinstance(val, float):
562             return '{:.2f}'.format(val)
563         else:
564             return '{}'.format(val)
565
566     def _runtime_constraint_mem_unit(self):
567         if hasattr(self, 'runtime_constraint_mem_unit'):
568             return self.runtime_constraint_mem_unit
569         elif self.detected_crunch1:
570             return JobSummarizer.runtime_constraint_mem_unit
571         else:
572             return ContainerRequestSummarizer.runtime_constraint_mem_unit
573
574     def _map_runtime_constraint(self, key):
575         if hasattr(self, 'map_runtime_constraint'):
576             return self.map_runtime_constraint[key]
577         elif self.detected_crunch1:
578             return JobSummarizer.map_runtime_constraint[key]
579         else:
580             return key
581
582
583 class CollectionSummarizer(Summarizer):
584     def __init__(self, collection_id, **kwargs):
585         super(CollectionSummarizer, self).__init__(
586             crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
587         self.label = collection_id
588
589
590 def NewSummarizer(process_or_uuid, **kwargs):
591     """Construct with the appropriate subclass for this uuid/object."""
592
593     if isinstance(process_or_uuid, dict):
594         process = process_or_uuid
595         uuid = process['uuid']
596     else:
597         uuid = process_or_uuid
598         process = None
599         arv = kwargs.get("arv") or arvados.api('v1')
600
601     if '-dz642-' in uuid:
602         if process is None:
603             # Get the associated CR. Doesn't matter which since they all have the same logs
604             crs = arv.container_requests().list(filters=[['container_uuid','=',uuid]],limit=1).execute()['items']
605             if len(crs) > 0:
606                 process = crs[0]
607         klass = ContainerRequestTreeSummarizer
608     elif '-xvhdp-' in uuid:
609         if process is None:
610             process = arv.container_requests().get(uuid=uuid).execute()
611         klass = ContainerRequestTreeSummarizer
612     elif '-8i9sb-' in uuid:
613         if process is None:
614             process = arv.jobs().get(uuid=uuid).execute()
615         klass = JobTreeSummarizer
616     elif '-d1hrv-' in uuid:
617         if process is None:
618             process = arv.pipeline_instances().get(uuid=uuid).execute()
619         klass = PipelineSummarizer
620     elif '-4zz18-' in uuid:
621         return CollectionSummarizer(collection_id=uuid)
622     else:
623         raise ArgumentError("Unrecognized uuid %s", uuid)
624     return klass(process, uuid=uuid, **kwargs)
625
626
627 class ProcessSummarizer(Summarizer):
628     """Process is a job, pipeline, or container request."""
629
630     def __init__(self, process, label=None, **kwargs):
631         rdr = None
632         self.process = process
633         arv = kwargs.get("arv") or arvados.api('v1')
634         if label is None:
635             label = self.process.get('name', self.process['uuid'])
636         # Pre-Arvados v1.4 everything is in 'log'
637         # For 1.4+ containers have no logs and container_requests have them in 'log_uuid', not 'log'
638         log_collection = self.process.get('log', self.process.get('log_uuid'))
639         if log_collection and self.process.get('state') != 'Uncommitted': # arvados.util.CR_UNCOMMITTED:
640             try:
641                 rdr = crunchstat_summary.reader.CollectionReader(log_collection, api_client=arv)
642             except arvados.errors.NotFoundError as e:
643                 logger.warning("Trying event logs after failing to read "
644                                "log collection %s: %s", self.process['log'], e)
645         if rdr is None:
646             uuid = self.process.get('container_uuid', self.process.get('uuid'))
647             rdr = crunchstat_summary.reader.LiveLogReader(uuid)
648             label = label + ' (partial)'
649
650         super(ProcessSummarizer, self).__init__(rdr, label=label, **kwargs)
651         self.existing_constraints = self.process.get('runtime_constraints', {})
652         self.arv_config = arv.config()
653         self.cost = self.process.get('cost', 0)
654
655
656
657 class JobSummarizer(ProcessSummarizer):
658     runtime_constraint_mem_unit = MB
659     map_runtime_constraint = {
660         'keep_cache_ram': 'keep_cache_mb_per_task',
661         'ram': 'min_ram_mb_per_node',
662         'vcpus': 'min_cores_per_node',
663     }
664
665
666 class ContainerRequestSummarizer(ProcessSummarizer):
667     runtime_constraint_mem_unit = 1
668
669
670 class MultiSummarizer(object):
671     def __init__(self, children={}, label=None, threads=1, **kwargs):
672         self.children = children
673         self.label = label
674         self.threadcount = threads
675
676     def run(self):
677         if self.threadcount > 1 and len(self.children) > 1:
678             completed = 0
679             def run_and_progress(child):
680                 try:
681                     child.run()
682                 except Exception as e:
683                     logger.exception("parse error")
684                 completed += 1
685                 logger.info("%s/%s summarized %s", completed, len(self.children), child.label)
686             with ThreadPoolExecutor(max_workers=self.threadcount) as tpe:
687                 for child in self.children.values():
688                     tpe.submit(run_and_progress, child)
689         else:
690             for child in self.children.values():
691                 child.run()
692
693     def text_report(self):
694         txt = ''
695         d = self._descendants()
696         for child in d.values():
697             if len(d) > 1:
698                 txt += '### Summary for {} ({})\n'.format(
699                     child.label, child.process['uuid'])
700             txt += child.text_report()
701             txt += '\n'
702         return txt
703
704     def _descendants(self):
705         """Dict of self and all descendants.
706
707         Nodes with nothing of their own to report (like
708         MultiSummarizers) are omitted.
709         """
710         d = collections.OrderedDict()
711         for key, child in self.children.items():
712             if isinstance(child, Summarizer):
713                 d[key] = child
714             if isinstance(child, MultiSummarizer):
715                 d.update(child._descendants())
716         return d
717
718     def html_report(self):
719         tophtml = ""
720         bottomhtml = ""
721         label = self.label
722         if len(self._descendants()) == 1:
723             summarizer = next(iter(self._descendants().values()))
724             tophtml = """{}\n<table class='aggtable'><tbody>{}</tbody></table>\n""".format(
725                 "\n".join(summarizer._recommend_gen(lambda x: "<p>{}</p>".format(x))),
726                 "\n".join(summarizer._text_report_agg_gen(lambda x: "<tr><th>{}</th><td>{}{}</td></tr>".format(*x))))
727
728             bottomhtml = """<table class='metricstable'><tbody>{}</tbody></table>\n""".format(
729                 "\n".join(summarizer._text_report_table_gen(lambda x: "<tr><th>{}</th><th>{}</th><th>{}</th><th>{}</th><th>{}</th></tr>".format(*x),
730                                                             lambda x: "<tr><td>{}</td><td>{}</td><td>{}</td><td>{}</td><td>{}</td></tr>".format(*x))))
731             label = summarizer.long_label()
732
733         return WEBCHART_CLASS(label, iter(self._descendants().values())).html(tophtml, bottomhtml)
734
735
736 class JobTreeSummarizer(MultiSummarizer):
737     """Summarizes a job and all children listed in its components field."""
738     def __init__(self, job, label=None, **kwargs):
739         arv = kwargs.get("arv") or arvados.api('v1')
740         label = label or job.get('name', job['uuid'])
741         children = collections.OrderedDict()
742         children[job['uuid']] = JobSummarizer(job, label=label, **kwargs)
743         if job.get('components', None):
744             preloaded = {}
745             for j in arv.jobs().index(
746                     limit=len(job['components']),
747                     filters=[['uuid','in',list(job['components'].values())]]).execute()['items']:
748                 preloaded[j['uuid']] = j
749             for cname in sorted(job['components'].keys()):
750                 child_uuid = job['components'][cname]
751                 j = (preloaded.get(child_uuid) or
752                      arv.jobs().get(uuid=child_uuid).execute())
753                 children[child_uuid] = JobTreeSummarizer(job=j, label=cname, **kwargs)
754
755         super(JobTreeSummarizer, self).__init__(
756             children=children,
757             label=label,
758             **kwargs)
759
760
761 class PipelineSummarizer(MultiSummarizer):
762     def __init__(self, instance, **kwargs):
763         children = collections.OrderedDict()
764         for cname, component in instance['components'].items():
765             if 'job' not in component:
766                 logger.warning(
767                     "%s: skipping component with no job assigned", cname)
768             else:
769                 logger.info(
770                     "%s: job %s", cname, component['job']['uuid'])
771                 summarizer = JobTreeSummarizer(component['job'], label=cname, **kwargs)
772                 summarizer.label = '{} {}'.format(
773                     cname, component['job']['uuid'])
774                 children[cname] = summarizer
775         super(PipelineSummarizer, self).__init__(
776             children=children,
777             label=instance['uuid'],
778             **kwargs)
779
780
781 class ContainerRequestTreeSummarizer(MultiSummarizer):
782     def __init__(self, root, skip_child_jobs=False, **kwargs):
783         arv = kwargs.get("arv") or arvados.api('v1')
784
785         label = kwargs.pop('label', None) or root.get('name') or root['uuid']
786         root['name'] = label
787
788         children = collections.OrderedDict()
789         todo = collections.deque((root, ))
790         while len(todo) > 0:
791             current = todo.popleft()
792             label = current['name']
793             sort_key = current['created_at']
794
795             summer = ContainerRequestSummarizer(current, label=label, **kwargs)
796             summer.sort_key = sort_key
797             children[current['uuid']] = summer
798
799             if skip_child_jobs:
800                 child_crs = arv.container_requests().list(filters=[['requesting_container_uuid', '=', current['container_uuid']]],
801                                                           limit=0).execute()
802                 logger.warning('%s: omitting stats from child containers'
803                                ' because --skip-child-jobs flag is on',
804                                label, child_crs['items_available'])
805             else:
806                 for cr in arvados.util.keyset_list_all(arv.container_requests().list,
807                                                        filters=[['requesting_container_uuid', '=', current['container_uuid']]]):
808                     if cr['container_uuid']:
809                         logger.debug('%s: container req %s', current['uuid'], cr['uuid'])
810                         cr['name'] = cr.get('name') or cr['uuid']
811                         todo.append(cr)
812         sorted_children = collections.OrderedDict()
813         for uuid in sorted(list(children.keys()), key=lambda uuid: children[uuid].sort_key):
814             sorted_children[uuid] = children[uuid]
815         super(ContainerRequestTreeSummarizer, self).__init__(
816             children=sorted_children,
817             label=root['name'],
818             **kwargs)