19744: Adjust some of the recommendations
[arvados.git] / tools / crunchstat-summary / crunchstat_summary / summarizer.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 import arvados
6 import collections
7 import crunchstat_summary.dygraphs
8 import crunchstat_summary.reader
9 import datetime
10 import functools
11 import itertools
12 import math
13 import re
14 import sys
15 import _strptime
16 import arvados.util
17
18 from concurrent.futures import ThreadPoolExecutor
19
20 from crunchstat_summary import logger
21
22 # Recommend memory constraints that are this multiple of an integral
23 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
24 # that have amounts like 7.5 GiB according to the kernel.)
25 AVAILABLE_RAM_RATIO = 0.90
26 MB=2**20
27
28 # Workaround datetime.datetime.strptime() thread-safety bug by calling
29 # it once before starting threads.  https://bugs.python.org/issue7980
30 datetime.datetime.strptime('1999-12-31_23:59:59', '%Y-%m-%d_%H:%M:%S')
31
32
33 WEBCHART_CLASS = crunchstat_summary.dygraphs.DygraphsChart
34
35
36 class Task(object):
37     def __init__(self):
38         self.starttime = None
39         self.finishtime = None
40         self.series = collections.defaultdict(list)
41
42
43 class Summarizer(object):
44     def __init__(self, logdata, label=None, skip_child_jobs=False, uuid=None, **kwargs):
45         self._logdata = logdata
46
47         self.uuid = uuid
48         self.label = label
49         self.starttime = None
50         self.finishtime = None
51         self._skip_child_jobs = skip_child_jobs
52
53         # stats_max: {category: {stat: val}}
54         self.stats_max = collections.defaultdict(
55             functools.partial(collections.defaultdict, lambda: 0))
56         # task_stats: {task_id: {category: {stat: val}}}
57         self.task_stats = collections.defaultdict(
58             functools.partial(collections.defaultdict, dict))
59
60         self.seq_to_uuid = {}
61         self.tasks = collections.defaultdict(Task)
62
63         # We won't bother recommending new runtime constraints if the
64         # constraints given when running the job are known to us and
65         # are already suitable.  If applicable, the subclass
66         # constructor will overwrite this with something useful.
67         self.existing_constraints = {}
68         self.node_info = {}
69         self.cost = 0
70         self.arv_config = {}
71
72         logger.info("%s: logdata %s", self.label, logdata)
73
74     def run(self):
75         logger.debug("%s: parsing logdata %s", self.label, self._logdata)
76         with self._logdata as logdata:
77             self._run(logdata)
78
79     def _run(self, logdata):
80         if not self.node_info:
81             self.node_info = logdata.node_info()
82
83         for line in logdata:
84             # crunch2
85             # 2017-12-01T16:56:24.723509200Z crunchstat: keepcalls 0 put 3 get -- interval 10.0000 seconds 0 put 3 get
86             m = re.search(r'^(?P<timestamp>\S+) (?P<crunchstat>crunchstat: )?(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
87             if not m:
88                 continue
89
90             if self.label is None:
91                 try:
92                     self.label = m.group('job_uuid')
93                 except IndexError:
94                     self.label = 'label #1'
95
96             task_id = 'container'
97             task = self.tasks[task_id]
98
99             # Use the first and last crunchstat timestamps as
100             # approximations of starttime and finishtime.
101             timestamp = m.group('timestamp')
102             if timestamp[10:11] == '_':
103                 timestamp = datetime.datetime.strptime(
104                     timestamp, '%Y-%m-%d_%H:%M:%S')
105             elif timestamp[10:11] == 'T':
106                 timestamp = datetime.datetime.strptime(
107                     timestamp[:19], '%Y-%m-%dT%H:%M:%S')
108             else:
109                 raise ValueError("Cannot parse timestamp {!r}".format(
110                     timestamp))
111
112             if task.starttime is None:
113                 logger.debug('%s: task %s starttime %s',
114                              self.label, task_id, timestamp)
115             if task.starttime is None or timestamp < task.starttime:
116                 task.starttime = timestamp
117             if task.finishtime is None or timestamp > task.finishtime:
118                 task.finishtime = timestamp
119
120             if self.starttime is None or timestamp < self.starttime:
121                 self.starttime = timestamp
122             if self.finishtime is None or timestamp > self.finishtime:
123                 self.finishtime = timestamp
124
125             if task.starttime is not None and task.finishtime is not None:
126                 elapsed = (task.finishtime - task.starttime).seconds
127                 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
128                 if elapsed > self.stats_max['time']['elapsed']:
129                     self.stats_max['time']['elapsed'] = elapsed
130
131             category = m.group('category')
132             if category.endswith(':'):
133                 # "stderr crunchstat: notice: ..."
134                 continue
135             elif category in ('error', 'caught'):
136                 continue
137             elif category in ('read', 'open', 'cgroup', 'CID', 'Running'):
138                 # "stderr crunchstat: read /proc/1234/net/dev: ..."
139                 # (old logs are less careful with unprefixed error messages)
140                 continue
141
142             this_interval_s = None
143             for group in ['current', 'interval']:
144                 if not m.group(group):
145                     continue
146                 category = m.group('category')
147                 words = m.group(group).split(' ')
148                 stats = {}
149                 try:
150                     for val, stat in zip(words[::2], words[1::2]):
151                         if '.' in val:
152                             stats[stat] = float(val)
153                         else:
154                             stats[stat] = int(val)
155                 except ValueError as e:
156                     # If the line doesn't start with 'crunchstat:' we
157                     # might have mistaken an error message for a
158                     # structured crunchstat line.
159                     if m.group("crunchstat") is None or m.group("category") == "crunchstat":
160                         logger.warning("%s: log contains message\n  %s", self.label, line)
161                     else:
162                         logger.warning(
163                             '%s: Error parsing value %r (stat %r, category %r): %r',
164                             self.label, val, stat, category, e)
165                         logger.warning('%s', line)
166                     continue
167                 if 'user' in stats or 'sys' in stats:
168                     stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
169                 if 'tx' in stats or 'rx' in stats:
170                     stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
171                 if group == 'interval':
172                     if 'seconds' in stats:
173                         this_interval_s = stats.get('seconds',0)
174                         del stats['seconds']
175                         if this_interval_s <= 0:
176                             logger.error(
177                                 "BUG? interval stat given with duration {!r}".
178                                 format(this_interval_s))
179                     else:
180                         logger.error('BUG? interval stat missing duration')
181                 for stat, val in stats.items():
182                     if group == 'interval' and this_interval_s:
183                             stat = stat + '__rate'
184                             val = val / this_interval_s
185                             if stat in ['user+sys__rate', 'user__rate', 'sys__rate', 'tx+rx__rate', 'rx__rate', 'tx__rate']:
186                                 task.series[category, stat].append(
187                                     (timestamp - self.starttime, val))
188                     else:
189                         if stat in ['rss','used','total']:
190                             task.series[category, stat].append(
191                                 (timestamp - self.starttime, val))
192                         self.task_stats[task_id][category][stat] = val
193                     if val > self.stats_max[category][stat]:
194                         self.stats_max[category][stat] = val
195         logger.debug('%s: done parsing', self.label)
196
197         self.job_tot = collections.defaultdict(
198             functools.partial(collections.defaultdict, int))
199         for task_id, task_stat in self.task_stats.items():
200             for category, stat_last in task_stat.items():
201                 for stat, val in stat_last.items():
202                     if stat in ['cpus', 'cache', 'swap', 'rss']:
203                         # meaningless stats like 16 cpu cores x 5 tasks = 80
204                         continue
205                     self.job_tot[category][stat] += val
206         logger.debug('%s: done totals', self.label)
207
208         if self.stats_max['time'].get('elapsed', 0) > 20:
209             # needs to have executed for at least 20 seconds or we may
210             # not have collected any metrics and these warnings are duds.
211             missing_category = {
212                 'cpu': 'CPU',
213                 'mem': 'memory',
214                 'net:': 'network I/O',
215                 'statfs': 'storage space',
216             }
217             for task_stat in self.task_stats.values():
218                 for category in task_stat.keys():
219                     for checkcat in missing_category:
220                         if checkcat.endswith(':'):
221                             if category.startswith(checkcat):
222                                 missing_category.pop(checkcat)
223                                 break
224                         else:
225                             if category == checkcat:
226                                 missing_category.pop(checkcat)
227                                 break
228             for catlabel in missing_category.values():
229                 logger.warning('%s: %s stats are missing -- possible cluster configuration issue',
230                             self.label, catlabel)
231
232     def long_label(self):
233         label = self.label
234         if hasattr(self, 'process') and self.process['uuid'] not in label:
235             label = '{} ({})'.format(label, self.process['uuid'])
236         return label
237
238     def elapsed_time(self):
239         if not self.finishtime:
240             return ""
241         label = ""
242         s = (self.finishtime - self.starttime).total_seconds()
243         if s > 86400:
244             label += '{}d '.format(int(s/86400))
245         if s > 3600:
246             label += '{}h '.format(int(s/3600) % 24)
247         if s > 60:
248             label += '{}m '.format(int(s/60) % 60)
249         label += '{}s'.format(int(s) % 60)
250         return label
251
252     def text_report(self):
253         if not self.tasks:
254             return "(no report generated)\n"
255         return "\n".join(itertools.chain(
256             self._text_report_table_gen(lambda x: "\t".join(x),
257                                   lambda x: "\t".join(x)),
258             self._text_report_agg_gen(lambda x: "# {}: {}{}".format(x[0], x[1], x[2])),
259             self._recommend_gen(lambda x: "#!! "+x))) + "\n"
260
261     def html_report(self):
262         tophtml = """{}\n<table class='aggtable'><tbody>{}</tbody></table>\n""".format(
263             "\n".join(self._recommend_gen(lambda x: "<p>{}</p>".format(x))),
264             "\n".join(self._text_report_agg_gen(lambda x: "<tr><th>{}</th><td>{}{}</td></tr>".format(*x))))
265
266         bottomhtml = """<table class='metricstable'><tbody>{}</tbody></table>\n""".format(
267             "\n".join(self._text_report_table_gen(lambda x: "<tr><th>{}</th><th>{}</th><th>{}</th><th>{}</th><th>{}</th></tr>".format(*x),
268                                                         lambda x: "<tr><td>{}</td><td>{}</td><td>{}</td><td>{}</td><td>{}</td></tr>".format(*x))))
269         label = self.long_label()
270
271         return WEBCHART_CLASS(label, [self]).html(tophtml, bottomhtml)
272
273     def _text_report_table_gen(self, headerformat, rowformat):
274         yield headerformat(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
275         for category, stat_max in sorted(self.stats_max.items()):
276             for stat, val in sorted(stat_max.items()):
277                 if stat.endswith('__rate'):
278                     continue
279                 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
280                 val = self._format(val)
281                 tot = self._format(self.job_tot[category].get(stat, '-'))
282                 yield rowformat([category, stat, str(val), max_rate, tot])
283
284     def _text_report_agg_gen(self, aggformat):
285         by_single_task = ""
286         if len(self.tasks) > 1:
287             by_single_task = " by a single task"
288
289         metrics = [
290             ('Elapsed time',
291              self.elapsed_time(),
292              None,
293              ''),
294
295             ('Estimated cost',
296              '${:.3f}'.format(self.cost),
297              None,
298              '') if self.cost > 0 else None,
299
300             ('Assigned instance type',
301              self.node_info.get('ProviderType'),
302              None,
303              '') if self.node_info.get('ProviderType') else None,
304
305             ('Instance hourly price',
306              '${:.3f}'.format(self.node_info.get('Price')),
307              None,
308              '') if self.node_info.get('Price') else None,
309
310             ('Max CPU usage in a single interval',
311              self.stats_max['cpu']['user+sys__rate'],
312              lambda x: x * 100,
313              '%'),
314
315             ('Overall CPU usage',
316              float(self.job_tot['cpu']['user+sys']) /
317              self.job_tot['time']['elapsed']
318              if self.job_tot['time']['elapsed'] > 0 else 0,
319              lambda x: x * 100,
320              '%'),
321
322             ('Requested CPU cores',
323              self.existing_constraints.get(self._map_runtime_constraint('vcpus')),
324              None,
325              '') if self.existing_constraints.get(self._map_runtime_constraint('vcpus')) else None,
326
327             ('Instance VCPUs',
328              self.node_info.get('VCPUs'),
329              None,
330              '') if self.node_info.get('VCPUs') else None,
331
332             ('Max memory used{}'.format(by_single_task),
333              self.stats_max['mem']['rss'],
334              lambda x: x / 2**20,
335              'MB'),
336
337             ('Requested RAM',
338              self.existing_constraints.get(self._map_runtime_constraint('ram')),
339              lambda x: x / 2**20,
340              'MB') if self.existing_constraints.get(self._map_runtime_constraint('ram')) else None,
341
342             ('Maximum RAM request for this instance type',
343              (self.node_info.get('RAM') - self.arv_config.get('Containers', {}).get('ReserveExtraRAM', 0))*.95,
344              lambda x: x / 2**20,
345              'MB') if self.node_info.get('RAM') else None,
346
347             ('Max network traffic{}'.format(by_single_task),
348              self.stats_max['net:eth0']['tx+rx'] +
349              self.stats_max['net:keep0']['tx+rx'],
350              lambda x: x / 1e9,
351              'GB'),
352
353             ('Max network speed in a single interval',
354              self.stats_max['net:eth0']['tx+rx__rate'] +
355              self.stats_max['net:keep0']['tx+rx__rate'],
356              lambda x: x / 1e6,
357              'MB/s'),
358
359             ('Keep cache miss rate',
360              (float(self.job_tot['keepcache']['miss']) /
361               float(self.job_tot['keepcalls']['get']))
362              if self.job_tot['keepcalls']['get'] > 0 else 0,
363              lambda x: x * 100.0,
364              '%'),
365
366             ('Keep cache utilization',
367              (float(self.job_tot['blkio:0:0']['read']) /
368               float(self.job_tot['net:keep0']['rx']))
369              if self.job_tot['net:keep0']['rx'] > 0 else 0,
370              lambda x: x * 100.0,
371              '%'),
372
373             ('Temp disk utilization',
374              (float(self.job_tot['statfs']['used']) /
375               float(self.job_tot['statfs']['total']))
376              if self.job_tot['statfs']['total'] > 0 else 0,
377              lambda x: x * 100.0,
378              '%'),
379         ]
380
381         if len(self.tasks) > 1:
382             metrics.insert(0, ('Number of tasks',
383                  len(self.tasks),
384                  None,
385                  ''))
386         for args in metrics:
387             if args is None:
388                 continue
389             format_string, val, transform, suffix = args
390             if val == float('-Inf'):
391                 continue
392             if transform:
393                 val = transform(val)
394             yield aggformat((format_string, self._format(val), suffix))
395
396     def _recommend_gen(self, recommendformat):
397         # TODO recommend fixing job granularity if elapsed time is too short
398
399         if self.stats_max['time'].get('elapsed', 0) <= 20:
400             # Not enough data
401             return []
402
403         return itertools.chain(
404             self._recommend_cpu(recommendformat),
405             self._recommend_ram(recommendformat),
406             self._recommend_keep_cache(recommendformat),
407             self._recommend_temp_disk(recommendformat),
408             )
409
410     def _recommend_cpu(self, recommendformat):
411         """Recommend asking for 4 cores if max CPU usage was 333%"""
412
413         constraint_key = self._map_runtime_constraint('vcpus')
414         cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
415         if cpu_max_rate == float('-Inf') or cpu_max_rate == 0.0:
416             logger.warning('%s: no CPU usage data', self.label)
417             return
418         # TODO Don't necessarily want to recommend on isolated max peak
419         # take average CPU usage into account as well or % time at max
420         used_cores = max(1, int(math.ceil(cpu_max_rate)))
421         asked_cores = self.existing_constraints.get(constraint_key)
422         if asked_cores is None:
423             asked_cores = 1
424         # TODO: This should be more nuanced in cases where max >> avg
425         if used_cores < asked_cores:
426             yield recommendformat(
427                 '{} max CPU usage was {}% -- '
428                 'try reducing runtime_constraints to "{}":{}'
429             ).format(
430                 self.label,
431                 math.ceil(cpu_max_rate*100),
432                 constraint_key,
433                 int(used_cores))
434
435     # FIXME: This needs to be updated to account for current a-d-c algorithms
436     def _recommend_ram(self, recommendformat):
437         """Recommend an economical RAM constraint for this job.
438
439         Nodes that are advertised as "8 gibibytes" actually have what
440         we might call "8 nearlygibs" of memory available for jobs.
441         Here, we calculate a whole number of nearlygibs that would
442         have sufficed to run the job, then recommend requesting a node
443         with that number of nearlygibs (expressed as mebibytes).
444
445         Requesting a node with "nearly 8 gibibytes" is our best hope
446         of getting a node that actually has nearly 8 gibibytes
447         available.  If the node manager is smart enough to account for
448         the discrepancy itself when choosing/creating a node, we'll
449         get an 8 GiB node with nearly 8 GiB available.  Otherwise, the
450         advertised size of the next-size-smaller node (say, 6 GiB)
451         will be too low to satisfy our request, so we will effectively
452         get rounded up to 8 GiB.
453
454         For example, if we need 7500 MiB, we can ask for 7500 MiB, and
455         we will generally get a node that is advertised as "8 GiB" and
456         has at least 7500 MiB available.  However, asking for 8192 MiB
457         would either result in an unnecessarily expensive 12 GiB node
458         (if node manager knows about the discrepancy), or an 8 GiB
459         node which has less than 8192 MiB available and is therefore
460         considered by crunch-dispatch to be too small to meet our
461         constraint.
462
463         When node manager learns how to predict the available memory
464         for each node type such that crunch-dispatch always agrees
465         that a node is big enough to run the job it was brought up
466         for, all this will be unnecessary.  We'll just ask for exactly
467         the memory we want -- even if that happens to be 8192 MiB.
468         """
469
470         constraint_key = self._map_runtime_constraint('ram')
471         used_bytes = self.stats_max['mem']['rss']
472         if used_bytes == float('-Inf'):
473             logger.warning('%s: no memory usage data', self.label)
474             return
475         if not self.existing_constraints.get(constraint_key):
476             return
477         used_mib = math.ceil(float(used_bytes) / MB)
478         asked_mib = self.existing_constraints.get(constraint_key) / MB
479
480         nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
481         ratio = 0.5
482         recommend_mib = int(math.ceil(nearlygibs(used_mib/ratio))*AVAILABLE_RAM_RATIO*1024)
483         if used_mib > 0 and (used_mib / asked_mib) < ratio and asked_mib > recommend_mib:
484             yield recommendformat(
485                 '{} requested {} MiB of RAM but actual RAM usage was below {}% at {} MiB -- '
486                 'suggest reducing RAM request to {} MiB'
487             ).format(
488                 self.label,
489                 int(asked_mib),
490                 int(100*ratio),
491                 int(used_mib),
492                 recommend_mib)
493
494     def _recommend_keep_cache(self, recommendformat):
495         """Recommend increasing keep cache if utilization < 50%.
496
497         This means the amount of data returned to the program is less
498         than 50% of the amount of data actually downloaded by
499         arv-mount.
500         """
501
502         constraint_key = self._map_runtime_constraint('keep_cache_ram')
503         if self.job_tot['net:keep0']['rx'] == 0:
504             return
505         utilization = (float(self.job_tot['blkio:0:0']['read']) /
506                        float(self.job_tot['net:keep0']['rx']))
507         # FIXME: the default on this get won't work correctly
508         asked_cache = self.existing_constraints.get('keep_cache_ram') or self.existing_constraints.get('keep_cache_disk')
509
510         if utilization < 0.5:
511             yield recommendformat(
512                 '{} Keep cache utilization was {:.2f}% -- '
513                 'try increasing keep_cache to {} MB'
514             ).format(
515                 self.label,
516                 utilization * 100.0,
517                 math.ceil((asked_cache * 2) / (1024*1024)))
518
519
520     def _recommend_temp_disk(self, recommendformat):
521         """Recommend decreasing temp disk if utilization < 50%.
522
523         This recommendation is disabled for the time being.  It uses
524         the total disk on the node and not the amount of disk
525         requested, so it triggers a false positive basically every
526         time.  To get the amount of disk requested we need to fish it
527         out of the mounts, which is extra work I don't want do right
528         now.
529         """
530
531         return []
532
533         # total = float(self.job_tot['statfs']['total'])
534         # utilization = (float(self.job_tot['statfs']['used']) / total) if total > 0 else 0.0
535
536         # if utilization < 50.0 and total > 0:
537         #     yield recommendformat(
538         #         '{} max temp disk utilization was {:.0f}% of {:.0f} MiB -- '
539         #         'consider reducing "tmpdirMin" and/or "outdirMin"'
540         #     ).format(
541         #         self.label,
542         #         utilization * 100.0,
543         #         total / MB)
544
545
546     def _format(self, val):
547         """Return a string representation of a stat.
548
549         {:.2f} for floats, default format for everything else."""
550         if isinstance(val, float):
551             return '{:.2f}'.format(val)
552         else:
553             return '{}'.format(val)
554
555     def _runtime_constraint_mem_unit(self):
556         if hasattr(self, 'runtime_constraint_mem_unit'):
557             return self.runtime_constraint_mem_unit
558         else:
559             return ContainerRequestSummarizer.runtime_constraint_mem_unit
560
561     def _map_runtime_constraint(self, key):
562         return key
563
564
565 class CollectionSummarizer(Summarizer):
566     def __init__(self, collection_id, **kwargs):
567         super(CollectionSummarizer, self).__init__(
568             crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
569         self.label = collection_id
570
571
572 def NewSummarizer(process_or_uuid, **kwargs):
573     """Construct with the appropriate subclass for this uuid/object."""
574
575     if isinstance(process_or_uuid, dict):
576         process = process_or_uuid
577         uuid = process['uuid']
578     else:
579         uuid = process_or_uuid
580         process = None
581         arv = kwargs.get("arv") or arvados.api('v1')
582
583     if '-dz642-' in uuid:
584         if process is None:
585             # Get the associated CR. Doesn't matter which since they all have the same logs
586             crs = arv.container_requests().list(filters=[['container_uuid','=',uuid]],limit=1).execute()['items']
587             if len(crs) > 0:
588                 process = crs[0]
589         klass = ContainerRequestTreeSummarizer
590     elif '-xvhdp-' in uuid:
591         if process is None:
592             process = arv.container_requests().get(uuid=uuid).execute()
593         klass = ContainerRequestTreeSummarizer
594     elif '-4zz18-' in uuid:
595         return CollectionSummarizer(collection_id=uuid)
596     else:
597         raise ArgumentError("Unrecognized uuid %s", uuid)
598     return klass(process, uuid=uuid, **kwargs)
599
600
601 class ProcessSummarizer(Summarizer):
602     """Process is a job, pipeline, or container request."""
603
604     def __init__(self, process, label=None, **kwargs):
605         rdr = None
606         self.process = process
607         arv = kwargs.get("arv") or arvados.api('v1')
608         if label is None:
609             label = self.process.get('name', self.process['uuid'])
610         # Pre-Arvados v1.4 everything is in 'log'
611         # For 1.4+ containers have no logs and container_requests have them in 'log_uuid', not 'log'
612         log_collection = self.process.get('log', self.process.get('log_uuid'))
613         if log_collection and self.process.get('state') != 'Uncommitted': # arvados.util.CR_UNCOMMITTED:
614             try:
615                 rdr = crunchstat_summary.reader.CollectionReader(
616                     log_collection,
617                     api_client=arv,
618                     collection_object=kwargs.get("collection_object"))
619             except arvados.errors.NotFoundError as e:
620                 logger.warning("Trying event logs after failing to read "
621                                "log collection %s: %s", self.process['log'], e)
622         if rdr is None:
623             uuid = self.process.get('container_uuid', self.process.get('uuid'))
624             rdr = crunchstat_summary.reader.LiveLogReader(uuid)
625             label = label + ' (partial)'
626
627         super(ProcessSummarizer, self).__init__(rdr, label=label, **kwargs)
628         self.existing_constraints = self.process.get('runtime_constraints', {})
629         self.arv_config = arv.config()
630         self.cost = self.process.get('cost', 0)
631
632
633 class ContainerRequestSummarizer(ProcessSummarizer):
634     runtime_constraint_mem_unit = 1
635
636
637 class MultiSummarizer(object):
638     def __init__(self, children={}, label=None, threads=1, **kwargs):
639         self.children = children
640         self.label = label
641         self.threadcount = threads
642
643     def run(self):
644         if self.threadcount > 1 and len(self.children) > 1:
645             completed = 0
646             def run_and_progress(child):
647                 try:
648                     child.run()
649                 except Exception as e:
650                     logger.exception("parse error")
651                 completed += 1
652                 logger.info("%s/%s summarized %s", completed, len(self.children), child.label)
653             with ThreadPoolExecutor(max_workers=self.threadcount) as tpe:
654                 for child in self.children.values():
655                     tpe.submit(run_and_progress, child)
656         else:
657             for child in self.children.values():
658                 child.run()
659
660     def text_report(self):
661         txt = ''
662         d = self._descendants()
663         for child in d.values():
664             if len(d) > 1:
665                 txt += '### Summary for {} ({})\n'.format(
666                     child.label, child.process['uuid'])
667             txt += child.text_report()
668             txt += '\n'
669         return txt
670
671     def _descendants(self):
672         """Dict of self and all descendants.
673
674         Nodes with nothing of their own to report (like
675         MultiSummarizers) are omitted.
676         """
677         d = collections.OrderedDict()
678         for key, child in self.children.items():
679             if isinstance(child, Summarizer):
680                 d[key] = child
681             if isinstance(child, MultiSummarizer):
682                 d.update(child._descendants())
683         return d
684
685     def html_report(self):
686         tophtml = ""
687         bottomhtml = ""
688         label = self.label
689         if len(self._descendants()) == 1:
690             summarizer = next(iter(self._descendants().values()))
691             tophtml = """{}\n<table class='aggtable'><tbody>{}</tbody></table>\n""".format(
692                 "\n".join(summarizer._recommend_gen(lambda x: "<p>{}</p>".format(x))),
693                 "\n".join(summarizer._text_report_agg_gen(lambda x: "<tr><th>{}</th><td>{}{}</td></tr>".format(*x))))
694
695             bottomhtml = """<table class='metricstable'><tbody>{}</tbody></table>\n""".format(
696                 "\n".join(summarizer._text_report_table_gen(lambda x: "<tr><th>{}</th><th>{}</th><th>{}</th><th>{}</th><th>{}</th></tr>".format(*x),
697                                                             lambda x: "<tr><td>{}</td><td>{}</td><td>{}</td><td>{}</td><td>{}</td></tr>".format(*x))))
698             label = summarizer.long_label()
699
700         return WEBCHART_CLASS(label, iter(self._descendants().values())).html(tophtml, bottomhtml)
701
702
703 class ContainerRequestTreeSummarizer(MultiSummarizer):
704     def __init__(self, root, skip_child_jobs=False, **kwargs):
705         arv = kwargs.get("arv") or arvados.api('v1')
706
707         label = kwargs.pop('label', None) or root.get('name') or root['uuid']
708         root['name'] = label
709
710         children = collections.OrderedDict()
711         todo = collections.deque((root, ))
712         while len(todo) > 0:
713             current = todo.popleft()
714             label = current['name']
715             sort_key = current['created_at']
716
717             summer = ContainerRequestSummarizer(current, label=label, **kwargs)
718             summer.sort_key = sort_key
719             children[current['uuid']] = summer
720
721             if skip_child_jobs:
722                 child_crs = arv.container_requests().list(filters=[['requesting_container_uuid', '=', current['container_uuid']]],
723                                                           limit=0).execute()
724                 logger.warning('%s: omitting stats from child containers'
725                                ' because --skip-child-jobs flag is on',
726                                label, child_crs['items_available'])
727             else:
728                 for cr in arvados.util.keyset_list_all(arv.container_requests().list,
729                                                        filters=[['requesting_container_uuid', '=', current['container_uuid']]]):
730                     if cr['container_uuid']:
731                         logger.debug('%s: container req %s', current['uuid'], cr['uuid'])
732                         cr['name'] = cr.get('name') or cr['uuid']
733                         todo.append(cr)
734         sorted_children = collections.OrderedDict()
735         for uuid in sorted(list(children.keys()), key=lambda uuid: children[uuid].sort_key):
736             sorted_children[uuid] = children[uuid]
737         super(ContainerRequestTreeSummarizer, self).__init__(
738             children=sorted_children,
739             label=root['name'],
740             **kwargs)