19744: Don't warn about missing metrics when the elapsed time is short
[arvados.git] / tools / crunchstat-summary / crunchstat_summary / summarizer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 import arvados
6 import collections
7 import crunchstat_summary.dygraphs
8 import crunchstat_summary.reader
9 import datetime
10 import functools
11 import itertools
12 import math
13 import re
14 import sys
15 import _strptime
16 import arvados.util
17
18 from concurrent.futures import ThreadPoolExecutor
19
20 from crunchstat_summary import logger
21
22 # Recommend memory constraints that are this multiple of an integral
23 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
24 # that have amounts like 7.5 GiB according to the kernel.)
25 AVAILABLE_RAM_RATIO = 0.90
26 MB=2**20
27
28 # Workaround datetime.datetime.strptime() thread-safety bug by calling
29 # it once before starting threads.  https://bugs.python.org/issue7980
30 datetime.datetime.strptime('1999-12-31_23:59:59', '%Y-%m-%d_%H:%M:%S')
31
32
33 WEBCHART_CLASS = crunchstat_summary.dygraphs.DygraphsChart
34
35
36 class Task(object):
37     def __init__(self):
38         self.starttime = None
39         self.finishtime = None
40         self.series = collections.defaultdict(list)
41
42
43 class Summarizer(object):
44     def __init__(self, logdata, label=None, skip_child_jobs=False, uuid=None, **kwargs):
45         self._logdata = logdata
46
47         self.uuid = uuid
48         self.label = label
49         self.starttime = None
50         self.finishtime = None
51         self._skip_child_jobs = skip_child_jobs
52
53         # stats_max: {category: {stat: val}}
54         self.stats_max = collections.defaultdict(
55             functools.partial(collections.defaultdict, lambda: 0))
56         # task_stats: {task_id: {category: {stat: val}}}
57         self.task_stats = collections.defaultdict(
58             functools.partial(collections.defaultdict, dict))
59
60         self.seq_to_uuid = {}
61         self.tasks = collections.defaultdict(Task)
62
63         # We won't bother recommending new runtime constraints if the
64         # constraints given when running the job are known to us and
65         # are already suitable.  If applicable, the subclass
66         # constructor will overwrite this with something useful.
67         self.existing_constraints = {}
68         self.node_info = {}
69
70         logger.info("%s: logdata %s", self.label, logdata)
71
72     def run(self):
73         logger.debug("%s: parsing logdata %s", self.label, self._logdata)
74         with self._logdata as logdata:
75             self._run(logdata)
76
77     def _run(self, logdata):
78         self.detected_crunch1 = False
79
80         if not self.node_info:
81             self.node_info = logdata.node_info()
82
83         for line in logdata:
84             if not self.detected_crunch1 and '-8i9sb-' in line:
85                 self.detected_crunch1 = True
86
87             if self.detected_crunch1:
88                 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
89                 if m:
90                     seq = int(m.group('seq'))
91                     uuid = m.group('task_uuid')
92                     self.seq_to_uuid[seq] = uuid
93                     logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
94                     continue
95
96                 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
97                 if m:
98                     task_id = self.seq_to_uuid[int(m.group('seq'))]
99                     elapsed = int(m.group('elapsed'))
100                     self.task_stats[task_id]['time'] = {'elapsed': elapsed}
101                     if elapsed > self.stats_max['time']['elapsed']:
102                         self.stats_max['time']['elapsed'] = elapsed
103                     continue
104
105                 m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
106                 if m:
107                     uuid = m.group('uuid')
108                     if self._skip_child_jobs:
109                         logger.warning('%s: omitting stats from child job %s'
110                                        ' because --skip-child-jobs flag is on',
111                                        self.label, uuid)
112                         continue
113                     logger.debug('%s: follow %s', self.label, uuid)
114                     child_summarizer = NewSummarizer(uuid)
115                     child_summarizer.stats_max = self.stats_max
116                     child_summarizer.task_stats = self.task_stats
117                     child_summarizer.tasks = self.tasks
118                     child_summarizer.starttime = self.starttime
119                     child_summarizer.run()
120                     logger.debug('%s: done %s', self.label, uuid)
121                     continue
122
123                 # 2017-12-02_17:15:08 e51c5-8i9sb-mfp68stkxnqdd6m 63676 0 stderr crunchstat: keepcalls 0 put 2576 get -- interval 10.0000 seconds 0 put 2576 get
124                 m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr (?P<crunchstat>crunchstat: )(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
125                 if not m:
126                     continue
127             else:
128                 # crunch2
129                 # 2017-12-01T16:56:24.723509200Z crunchstat: keepcalls 0 put 3 get -- interval 10.0000 seconds 0 put 3 get
130                 m = re.search(r'^(?P<timestamp>\S+) (?P<crunchstat>crunchstat: )?(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
131                 if not m:
132                     continue
133
134             if self.label is None:
135                 try:
136                     self.label = m.group('job_uuid')
137                 except IndexError:
138                     self.label = 'label #1'
139             category = m.group('category')
140             if category.endswith(':'):
141                 # "stderr crunchstat: notice: ..."
142                 continue
143             elif category in ('error', 'caught'):
144                 continue
145             elif category in ('read', 'open', 'cgroup', 'CID', 'Running'):
146                 # "stderr crunchstat: read /proc/1234/net/dev: ..."
147                 # (old logs are less careful with unprefixed error messages)
148                 continue
149
150             if self.detected_crunch1:
151                 task_id = self.seq_to_uuid[int(m.group('seq'))]
152             else:
153                 task_id = 'container'
154             task = self.tasks[task_id]
155
156             # Use the first and last crunchstat timestamps as
157             # approximations of starttime and finishtime.
158             timestamp = m.group('timestamp')
159             if timestamp[10:11] == '_':
160                 timestamp = datetime.datetime.strptime(
161                     timestamp, '%Y-%m-%d_%H:%M:%S')
162             elif timestamp[10:11] == 'T':
163                 timestamp = datetime.datetime.strptime(
164                     timestamp[:19], '%Y-%m-%dT%H:%M:%S')
165             else:
166                 raise ValueError("Cannot parse timestamp {!r}".format(
167                     timestamp))
168
169             if task.starttime is None:
170                 logger.debug('%s: task %s starttime %s',
171                              self.label, task_id, timestamp)
172             if task.starttime is None or timestamp < task.starttime:
173                 task.starttime = timestamp
174             if task.finishtime is None or timestamp > task.finishtime:
175                 task.finishtime = timestamp
176
177             if self.starttime is None or timestamp < self.starttime:
178                 self.starttime = timestamp
179             if self.finishtime is None or timestamp > self.finishtime:
180                 self.finishtime = timestamp
181
182             if (not self.detected_crunch1) and task.starttime is not None and task.finishtime is not None:
183                 elapsed = (task.finishtime - task.starttime).seconds
184                 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
185                 if elapsed > self.stats_max['time']['elapsed']:
186                     self.stats_max['time']['elapsed'] = elapsed
187
188             this_interval_s = None
189             for group in ['current', 'interval']:
190                 if not m.group(group):
191                     continue
192                 category = m.group('category')
193                 words = m.group(group).split(' ')
194                 stats = {}
195                 try:
196                     for val, stat in zip(words[::2], words[1::2]):
197                         if '.' in val:
198                             stats[stat] = float(val)
199                         else:
200                             stats[stat] = int(val)
201                 except ValueError as e:
202                     # If the line doesn't start with 'crunchstat:' we
203                     # might have mistaken an error message for a
204                     # structured crunchstat line.
205                     if m.group("crunchstat") is None or m.group("category") == "crunchstat":
206                         logger.warning("%s: log contains message\n  %s", self.label, line)
207                     else:
208                         logger.warning(
209                             '%s: Error parsing value %r (stat %r, category %r): %r',
210                             self.label, val, stat, category, e)
211                         logger.warning('%s', line)
212                     continue
213                 if 'user' in stats or 'sys' in stats:
214                     stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
215                 if 'tx' in stats or 'rx' in stats:
216                     stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
217                 if group == 'interval':
218                     if 'seconds' in stats:
219                         this_interval_s = stats.get('seconds',0)
220                         del stats['seconds']
221                         if this_interval_s <= 0:
222                             logger.error(
223                                 "BUG? interval stat given with duration {!r}".
224                                 format(this_interval_s))
225                     else:
226                         logger.error('BUG? interval stat missing duration')
227                 for stat, val in stats.items():
228                     if group == 'interval' and this_interval_s:
229                             stat = stat + '__rate'
230                             val = val / this_interval_s
231                             if stat in ['user+sys__rate', 'user__rate', 'sys__rate', 'tx+rx__rate', 'rx__rate', 'tx__rate']:
232                                 task.series[category, stat].append(
233                                     (timestamp - self.starttime, val))
234                     else:
235                         if stat in ['rss','used','total']:
236                             task.series[category, stat].append(
237                                 (timestamp - self.starttime, val))
238                         self.task_stats[task_id][category][stat] = val
239                     if val > self.stats_max[category][stat]:
240                         self.stats_max[category][stat] = val
241         logger.debug('%s: done parsing', self.label)
242
243         self.job_tot = collections.defaultdict(
244             functools.partial(collections.defaultdict, int))
245         for task_id, task_stat in self.task_stats.items():
246             for category, stat_last in task_stat.items():
247                 for stat, val in stat_last.items():
248                     if stat in ['cpus', 'cache', 'swap', 'rss']:
249                         # meaningless stats like 16 cpu cores x 5 tasks = 80
250                         continue
251                     self.job_tot[category][stat] += val
252         logger.debug('%s: done totals', self.label)
253
254         if self.stats_max['time'].get('elapsed', 0) > 20:
255             # needs to have executed for at least 20 seconds or we may
256             # not have collected any metrics and these warnings are duds.
257             missing_category = {
258                 'cpu': 'CPU',
259                 'mem': 'memory',
260                 'net:': 'network I/O',
261                 'statfs': 'storage space',
262             }
263             for task_stat in self.task_stats.values():
264                 for category in task_stat.keys():
265                     for checkcat in missing_category:
266                         if checkcat.endswith(':'):
267                             if category.startswith(checkcat):
268                                 missing_category.pop(checkcat)
269                                 break
270                         else:
271                             if category == checkcat:
272                                 missing_category.pop(checkcat)
273                                 break
274             for catlabel in missing_category.values():
275                 logger.warning('%s: %s stats are missing -- possible cluster configuration issue',
276                             self.label, catlabel)
277
278     def long_label(self):
279         label = self.label
280         if hasattr(self, 'process') and self.process['uuid'] not in label:
281             label = '{} ({})'.format(label, self.process['uuid'])
282         return label
283
284     def elapsed_time(self):
285         if not self.finishtime:
286             return ""
287         label = ""
288         s = (self.finishtime - self.starttime).total_seconds()
289         if s > 86400:
290             label += '{}d '.format(int(s/86400))
291         if s > 3600:
292             label += '{}h '.format(int(s/3600) % 24)
293         if s > 60:
294             label += '{}m '.format(int(s/60) % 60)
295         label += '{}s'.format(int(s) % 60)
296         return label
297
298     def text_report(self):
299         if not self.tasks:
300             return "(no report generated)\n"
301         return "\n".join(itertools.chain(
302             self._text_report_table_gen(lambda x: "\t".join(x),
303                                   lambda x: "\t".join(x)),
304             self._text_report_agg_gen(lambda x: "# {}: {}{}".format(x[0], x[1], x[2])),
305             self._recommend_gen(lambda x: "#!! "+x))) + "\n"
306
307     def html_report(self):
308         return WEBCHART_CLASS(self.label, [self]).html()
309
310     def _text_report_table_gen(self, headerformat, rowformat):
311         yield headerformat(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
312         for category, stat_max in sorted(self.stats_max.items()):
313             for stat, val in sorted(stat_max.items()):
314                 if stat.endswith('__rate'):
315                     continue
316                 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
317                 val = self._format(val)
318                 tot = self._format(self.job_tot[category].get(stat, '-'))
319                 yield rowformat([category, stat, str(val), max_rate, tot])
320
321     def _text_report_agg_gen(self, aggformat):
322         by_single_task = ""
323         if len(self.tasks) > 1:
324             by_single_task = " by a single task"
325
326         metrics = [
327             ('Elapsed time',
328              self.elapsed_time(),
329              None,
330              ''),
331
332             ('Estimated cost',
333              '${:.3f}'.format(self.cost),
334              None,
335              '') if self.cost > 0 else None,
336
337             ('Assigned instance type',
338              self.node_info.get('ProviderType'),
339              None,
340              '') if self.node_info.get('ProviderType') else None,
341
342             ('Instance hourly price',
343              '${:.3f}'.format(self.node_info.get('Price')),
344              None,
345              '') if self.node_info.get('Price') else None,
346
347             ('Max CPU usage in a single interval',
348              self.stats_max['cpu']['user+sys__rate'],
349              lambda x: x * 100,
350              '%'),
351
352             ('Overall CPU usage',
353              float(self.job_tot['cpu']['user+sys']) /
354              self.job_tot['time']['elapsed']
355              if self.job_tot['time']['elapsed'] > 0 else 0,
356              lambda x: x * 100,
357              '%'),
358
359             ('Requested CPU cores',
360              self.existing_constraints.get(self._map_runtime_constraint('vcpus')),
361              None,
362              ''),
363
364             ('Instance VCPUs',
365              self.node_info.get('VCPUs'),
366              None,
367              '') if self.node_info.get('VCPUs') else None,
368
369             ('Max memory used{}'.format(by_single_task),
370              self.stats_max['mem']['rss'],
371              lambda x: x / 2**20,
372              'MB'),
373
374             ('Requested RAM',
375              self.existing_constraints.get(self._map_runtime_constraint('ram')),
376              lambda x: x / 2**20,
377              'MB'),
378
379             ('Maximum RAM request for this instance type',
380              (self.node_info.get('RAM') - self.arv_config.get('Containers', {}).get('ReserveExtraRAM', {}))*.95,
381              lambda x: x / 2**20,
382              'MB'),
383
384             ('Max network traffic{}'.format(by_single_task),
385              self.stats_max['net:eth0']['tx+rx'] +
386              self.stats_max['net:keep0']['tx+rx'],
387              lambda x: x / 1e9,
388              'GB'),
389
390             ('Max network speed in a single interval',
391              self.stats_max['net:eth0']['tx+rx__rate'] +
392              self.stats_max['net:keep0']['tx+rx__rate'],
393              lambda x: x / 1e6,
394              'MB/s'),
395
396             ('Keep cache miss rate',
397              (float(self.job_tot['keepcache']['miss']) /
398               float(self.job_tot['keepcalls']['get']))
399              if self.job_tot['keepcalls']['get'] > 0 else 0,
400              lambda x: x * 100.0,
401              '%'),
402
403             ('Keep cache utilization',
404              (float(self.job_tot['blkio:0:0']['read']) /
405               float(self.job_tot['net:keep0']['rx']))
406              if self.job_tot['net:keep0']['rx'] > 0 else 0,
407              lambda x: x * 100.0,
408              '%'),
409
410             ('Temp disk utilization',
411              (float(self.job_tot['statfs']['used']) /
412               float(self.job_tot['statfs']['total']))
413              if self.job_tot['statfs']['total'] > 0 else 0,
414              lambda x: x * 100.0,
415              '%'),
416         ]
417
418         if len(self.tasks) > 1:
419             metrics.insert(0, ('Number of tasks',
420                  len(self.tasks),
421                  None,
422                  ''))
423         for args in metrics:
424             if args is None:
425                 continue
426             format_string, val, transform, suffix = args
427             if val == float('-Inf'):
428                 continue
429             if transform:
430                 val = transform(val)
431             yield aggformat((format_string, self._format(val), suffix))
432
433     def _recommend_gen(self, recommendformat):
434         # TODO recommend fixing job granularity if elapsed time is too short
435         return itertools.chain(
436             self._recommend_cpu(recommendformat),
437             self._recommend_ram(recommendformat),
438             self._recommend_keep_cache(recommendformat),
439             self._recommend_temp_disk(recommendformat),
440             )
441
442     def _recommend_cpu(self, recommendformat):
443         """Recommend asking for 4 cores if max CPU usage was 333%"""
444
445         constraint_key = self._map_runtime_constraint('vcpus')
446         cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
447         if cpu_max_rate == float('-Inf') or cpu_max_rate == 0.0:
448             logger.warning('%s: no CPU usage data', self.label)
449             return
450         # TODO Don't necessarily want to recommend on isolated max peak
451         # take average CPU usage into account as well or % time at max
452         used_cores = max(1, int(math.ceil(cpu_max_rate)))
453         asked_cores = self.existing_constraints.get(constraint_key)
454         if asked_cores is None:
455             asked_cores = 1
456         # TODO: This should be more nuanced in cases where max >> avg
457         if used_cores < asked_cores:
458             yield recommendformat(
459                 '{} max CPU usage was {}% -- '
460                 'try reducing runtime_constraints to "{}":{}'
461             ).format(
462                 self.label,
463                 math.ceil(cpu_max_rate*100),
464                 constraint_key,
465                 int(used_cores))
466
467     # FIXME: This needs to be updated to account for current a-d-c algorithms
468     def _recommend_ram(self, recommendformat):
469         """Recommend an economical RAM constraint for this job.
470
471         Nodes that are advertised as "8 gibibytes" actually have what
472         we might call "8 nearlygibs" of memory available for jobs.
473         Here, we calculate a whole number of nearlygibs that would
474         have sufficed to run the job, then recommend requesting a node
475         with that number of nearlygibs (expressed as mebibytes).
476
477         Requesting a node with "nearly 8 gibibytes" is our best hope
478         of getting a node that actually has nearly 8 gibibytes
479         available.  If the node manager is smart enough to account for
480         the discrepancy itself when choosing/creating a node, we'll
481         get an 8 GiB node with nearly 8 GiB available.  Otherwise, the
482         advertised size of the next-size-smaller node (say, 6 GiB)
483         will be too low to satisfy our request, so we will effectively
484         get rounded up to 8 GiB.
485
486         For example, if we need 7500 MiB, we can ask for 7500 MiB, and
487         we will generally get a node that is advertised as "8 GiB" and
488         has at least 7500 MiB available.  However, asking for 8192 MiB
489         would either result in an unnecessarily expensive 12 GiB node
490         (if node manager knows about the discrepancy), or an 8 GiB
491         node which has less than 8192 MiB available and is therefore
492         considered by crunch-dispatch to be too small to meet our
493         constraint.
494
495         When node manager learns how to predict the available memory
496         for each node type such that crunch-dispatch always agrees
497         that a node is big enough to run the job it was brought up
498         for, all this will be unnecessary.  We'll just ask for exactly
499         the memory we want -- even if that happens to be 8192 MiB.
500         """
501
502         constraint_key = self._map_runtime_constraint('ram')
503         used_bytes = self.stats_max['mem']['rss']
504         if used_bytes == float('-Inf'):
505             logger.warning('%s: no memory usage data', self.label)
506             return
507         used_mib = math.ceil(float(used_bytes) / MB)
508         asked_mib = self.existing_constraints.get(constraint_key) / MB
509
510         nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
511         ratio = 0.5
512         recommend_mib = int(math.ceil(nearlygibs(used_mib/ratio))*AVAILABLE_RAM_RATIO*1024)
513         if used_mib > 0 and (used_mib / asked_mib) < ratio and asked_mib > recommend_mib:
514             yield recommendformat(
515                 '{} requested {} MiB of RAM but actual RAM usage was below {}% at {} MiB -- '
516                 'suggest reducing RAM request to {} MiB'
517             ).format(
518                 self.label,
519                 int(asked_mib),
520                 int(100*ratio),
521                 int(used_mib),
522                 recommend_mib)
523
524     def _recommend_keep_cache(self, recommendformat):
525         """Recommend increasing keep cache if utilization < 80%"""
526         constraint_key = self._map_runtime_constraint('keep_cache_ram')
527         if self.job_tot['net:keep0']['rx'] == 0:
528             return
529         utilization = (float(self.job_tot['blkio:0:0']['read']) /
530                        float(self.job_tot['net:keep0']['rx']))
531         # FIXME: the default on this get won't work correctly
532         asked_cache = self.existing_constraints.get(constraint_key, 256) * self._runtime_constraint_mem_unit()
533
534         if utilization < 0.8:
535             yield recommendformat(
536                 '{} Keep cache utilization was {:.2f}% -- '
537                 'try doubling runtime_constraints to "{}":{} (or more)'
538             ).format(
539                 self.label,
540                 utilization * 100.0,
541                 constraint_key,
542                 math.ceil(asked_cache * 2 / self._runtime_constraint_mem_unit()))
543
544
545     def _recommend_temp_disk(self, recommendformat):
546         """Recommend decreasing temp disk if utilization < 50%"""
547         total = float(self.job_tot['statfs']['total'])
548         utilization = (float(self.job_tot['statfs']['used']) / total) if total > 0 else 0.0
549
550         if utilization < 50.8 and total > 0:
551             yield recommendformat(
552                 '{} max temp disk utilization was {:.0f}% of {:.0f} MiB -- '
553                 'consider reducing "tmpdirMin" and/or "outdirMin"'
554             ).format(
555                 self.label,
556                 utilization * 100.0,
557                 total / MB)
558
559
560     def _format(self, val):
561         """Return a string representation of a stat.
562
563         {:.2f} for floats, default format for everything else."""
564         if isinstance(val, float):
565             return '{:.2f}'.format(val)
566         else:
567             return '{}'.format(val)
568
569     def _runtime_constraint_mem_unit(self):
570         if hasattr(self, 'runtime_constraint_mem_unit'):
571             return self.runtime_constraint_mem_unit
572         elif self.detected_crunch1:
573             return JobSummarizer.runtime_constraint_mem_unit
574         else:
575             return ContainerRequestSummarizer.runtime_constraint_mem_unit
576
577     def _map_runtime_constraint(self, key):
578         if hasattr(self, 'map_runtime_constraint'):
579             return self.map_runtime_constraint[key]
580         elif self.detected_crunch1:
581             return JobSummarizer.map_runtime_constraint[key]
582         else:
583             return key
584
585
586 class CollectionSummarizer(Summarizer):
587     def __init__(self, collection_id, **kwargs):
588         super(CollectionSummarizer, self).__init__(
589             crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
590         self.label = collection_id
591
592
593 def NewSummarizer(process_or_uuid, **kwargs):
594     """Construct with the appropriate subclass for this uuid/object."""
595
596     if isinstance(process_or_uuid, dict):
597         process = process_or_uuid
598         uuid = process['uuid']
599     else:
600         uuid = process_or_uuid
601         process = None
602         arv = kwargs.get("arv") or arvados.api('v1')
603
604     if '-dz642-' in uuid:
605         if process is None:
606             # Get the associated CR. Doesn't matter which since they all have the same logs
607             crs = arv.container_requests().list(filters=[['container_uuid','=',uuid]],limit=1).execute()['items']
608             if len(crs) > 0:
609                 process = crs[0]
610         klass = ContainerRequestTreeSummarizer
611     elif '-xvhdp-' in uuid:
612         if process is None:
613             process = arv.container_requests().get(uuid=uuid).execute()
614         klass = ContainerRequestTreeSummarizer
615     elif '-8i9sb-' in uuid:
616         if process is None:
617             process = arv.jobs().get(uuid=uuid).execute()
618         klass = JobTreeSummarizer
619     elif '-d1hrv-' in uuid:
620         if process is None:
621             process = arv.pipeline_instances().get(uuid=uuid).execute()
622         klass = PipelineSummarizer
623     elif '-4zz18-' in uuid:
624         return CollectionSummarizer(collection_id=uuid)
625     else:
626         raise ArgumentError("Unrecognized uuid %s", uuid)
627     return klass(process, uuid=uuid, **kwargs)
628
629
630 class ProcessSummarizer(Summarizer):
631     """Process is a job, pipeline, or container request."""
632
633     def __init__(self, process, label=None, **kwargs):
634         rdr = None
635         self.process = process
636         arv = kwargs.get("arv") or arvados.api('v1')
637         if label is None:
638             label = self.process.get('name', self.process['uuid'])
639         # Pre-Arvados v1.4 everything is in 'log'
640         # For 1.4+ containers have no logs and container_requests have them in 'log_uuid', not 'log'
641         log_collection = self.process.get('log', self.process.get('log_uuid'))
642         if log_collection and self.process.get('state') != 'Uncommitted': # arvados.util.CR_UNCOMMITTED:
643             try:
644                 rdr = crunchstat_summary.reader.CollectionReader(log_collection, api_client=arv)
645             except arvados.errors.NotFoundError as e:
646                 logger.warning("Trying event logs after failing to read "
647                                "log collection %s: %s", self.process['log'], e)
648         if rdr is None:
649             uuid = self.process.get('container_uuid', self.process.get('uuid'))
650             rdr = crunchstat_summary.reader.LiveLogReader(uuid)
651             label = label + ' (partial)'
652
653         super(ProcessSummarizer, self).__init__(rdr, label=label, **kwargs)
654         self.existing_constraints = self.process.get('runtime_constraints', {})
655         self.arv_config = arv.config()
656         self.cost = self.process.get('cost', 0)
657
658
659
660 class JobSummarizer(ProcessSummarizer):
661     runtime_constraint_mem_unit = MB
662     map_runtime_constraint = {
663         'keep_cache_ram': 'keep_cache_mb_per_task',
664         'ram': 'min_ram_mb_per_node',
665         'vcpus': 'min_cores_per_node',
666     }
667
668
669 class ContainerRequestSummarizer(ProcessSummarizer):
670     runtime_constraint_mem_unit = 1
671
672
673 class MultiSummarizer(object):
674     def __init__(self, children={}, label=None, threads=1, **kwargs):
675         self.children = children
676         self.label = label
677         self.threadcount = threads
678
679     def run(self):
680         if self.threadcount > 1 and len(self.children) > 1:
681             completed = 0
682             def run_and_progress(child):
683                 try:
684                     child.run()
685                 except Exception as e:
686                     logger.exception("parse error")
687                 completed += 1
688                 logger.info("%s/%s summarized %s", completed, len(self.children), child.label)
689             with ThreadPoolExecutor(max_workers=self.threadcount) as tpe:
690                 for child in self.children.values():
691                     tpe.submit(run_and_progress, child)
692         else:
693             for child in self.children.values():
694                 child.run()
695
696     def text_report(self):
697         txt = ''
698         d = self._descendants()
699         for child in d.values():
700             if len(d) > 1:
701                 txt += '### Summary for {} ({})\n'.format(
702                     child.label, child.process['uuid'])
703             txt += child.text_report()
704             txt += '\n'
705         return txt
706
707     def _descendants(self):
708         """Dict of self and all descendants.
709
710         Nodes with nothing of their own to report (like
711         MultiSummarizers) are omitted.
712         """
713         d = collections.OrderedDict()
714         for key, child in self.children.items():
715             if isinstance(child, Summarizer):
716                 d[key] = child
717             if isinstance(child, MultiSummarizer):
718                 d.update(child._descendants())
719         return d
720
721     def html_report(self):
722         tophtml = ""
723         bottomhtml = ""
724         label = self.label
725         if len(self._descendants()) == 1:
726             summarizer = next(iter(self._descendants().values()))
727             tophtml = """{}\n<table class='aggtable'><tbody>{}</tbody></table>\n""".format(
728                 "\n".join(summarizer._recommend_gen(lambda x: "<p>{}</p>".format(x))),
729                 "\n".join(summarizer._text_report_agg_gen(lambda x: "<tr><th>{}</th><td>{}{}</td></tr>".format(*x))))
730
731             bottomhtml = """<table class='metricstable'><tbody>{}</tbody></table>\n""".format(
732                 "\n".join(summarizer._text_report_table_gen(lambda x: "<tr><th>{}</th><th>{}</th><th>{}</th><th>{}</th><th>{}</th></tr>".format(*x),
733                                                             lambda x: "<tr><td>{}</td><td>{}</td><td>{}</td><td>{}</td><td>{}</td></tr>".format(*x))))
734             label = summarizer.long_label()
735
736         return WEBCHART_CLASS(label, iter(self._descendants().values())).html(tophtml, bottomhtml)
737
738
739 class JobTreeSummarizer(MultiSummarizer):
740     """Summarizes a job and all children listed in its components field."""
741     def __init__(self, job, label=None, **kwargs):
742         arv = kwargs.get("arv") or arvados.api('v1')
743         label = label or job.get('name', job['uuid'])
744         children = collections.OrderedDict()
745         children[job['uuid']] = JobSummarizer(job, label=label, **kwargs)
746         if job.get('components', None):
747             preloaded = {}
748             for j in arv.jobs().index(
749                     limit=len(job['components']),
750                     filters=[['uuid','in',list(job['components'].values())]]).execute()['items']:
751                 preloaded[j['uuid']] = j
752             for cname in sorted(job['components'].keys()):
753                 child_uuid = job['components'][cname]
754                 j = (preloaded.get(child_uuid) or
755                      arv.jobs().get(uuid=child_uuid).execute())
756                 children[child_uuid] = JobTreeSummarizer(job=j, label=cname, **kwargs)
757
758         super(JobTreeSummarizer, self).__init__(
759             children=children,
760             label=label,
761             **kwargs)
762
763
764 class PipelineSummarizer(MultiSummarizer):
765     def __init__(self, instance, **kwargs):
766         children = collections.OrderedDict()
767         for cname, component in instance['components'].items():
768             if 'job' not in component:
769                 logger.warning(
770                     "%s: skipping component with no job assigned", cname)
771             else:
772                 logger.info(
773                     "%s: job %s", cname, component['job']['uuid'])
774                 summarizer = JobTreeSummarizer(component['job'], label=cname, **kwargs)
775                 summarizer.label = '{} {}'.format(
776                     cname, component['job']['uuid'])
777                 children[cname] = summarizer
778         super(PipelineSummarizer, self).__init__(
779             children=children,
780             label=instance['uuid'],
781             **kwargs)
782
783
784 class ContainerRequestTreeSummarizer(MultiSummarizer):
785     def __init__(self, root, skip_child_jobs=False, **kwargs):
786         arv = kwargs.get("arv") or arvados.api('v1')
787
788         label = kwargs.pop('label', None) or root.get('name') or root['uuid']
789         root['name'] = label
790
791         children = collections.OrderedDict()
792         todo = collections.deque((root, ))
793         while len(todo) > 0:
794             current = todo.popleft()
795             label = current['name']
796             sort_key = current['created_at']
797
798             summer = ContainerRequestSummarizer(current, label=label, **kwargs)
799             summer.sort_key = sort_key
800             children[current['uuid']] = summer
801
802             if skip_child_jobs:
803                 child_crs = arv.container_requests().list(filters=[['requesting_container_uuid', '=', current['container_uuid']]],
804                                                           limit=0).execute()
805                 logger.warning('%s: omitting stats from child containers'
806                                ' because --skip-child-jobs flag is on',
807                                label, child_crs['items_available'])
808             else:
809                 for cr in arvados.util.keyset_list_all(arv.container_requests().list,
810                                                        filters=[['requesting_container_uuid', '=', current['container_uuid']]]):
811                     if cr['container_uuid']:
812                         logger.debug('%s: container req %s', current['uuid'], cr['uuid'])
813                         cr['name'] = cr.get('name') or cr['uuid']
814                         todo.append(cr)
815         sorted_children = collections.OrderedDict()
816         for uuid in sorted(list(children.keys()), key=lambda uuid: children[uuid].sort_key):
817             sorted_children[uuid] = children[uuid]
818         super(ContainerRequestTreeSummarizer, self).__init__(
819             children=sorted_children,
820             label=root['name'],
821             **kwargs)