19744: Report steps with low utilization at end of workflow
[arvados.git] / tools / crunchstat-summary / crunchstat_summary / summarizer.py
index 463c552c4f1eb5caf0868337858197a747bc8fa8..bc41fdae33272d3df98ad8c998bf5a05db308120 100644 (file)
@@ -12,16 +12,17 @@ import itertools
 import math
 import re
 import sys
 import math
 import re
 import sys
-import threading
 import _strptime
 import _strptime
+import arvados.util
+
+from concurrent.futures import ThreadPoolExecutor
 
 
-from arvados.api import OrderedJsonModel
 from crunchstat_summary import logger
 
 # Recommend memory constraints that are this multiple of an integral
 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
 # that have amounts like 7.5 GiB according to the kernel.)
 from crunchstat_summary import logger
 
 # Recommend memory constraints that are this multiple of an integral
 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
 # that have amounts like 7.5 GiB according to the kernel.)
-AVAILABLE_RAM_RATIO = 0.95
+AVAILABLE_RAM_RATIO = 0.90
 MB=2**20
 
 # Workaround datetime.datetime.strptime() thread-safety bug by calling
 MB=2**20
 
 # Workaround datetime.datetime.strptime() thread-safety bug by calling
@@ -64,8 +65,11 @@ class Summarizer(object):
         # are already suitable.  If applicable, the subclass
         # constructor will overwrite this with something useful.
         self.existing_constraints = {}
         # are already suitable.  If applicable, the subclass
         # constructor will overwrite this with something useful.
         self.existing_constraints = {}
+        self.node_info = {}
+        self.cost = 0
+        self.arv_config = {}
 
 
-        logger.debug("%s: logdata %s", self.label, logdata)
+        logger.info("%s: logdata %s", self.label, logdata)
 
     def run(self):
         logger.debug("%s: parsing logdata %s", self.label, self._logdata)
 
     def run(self):
         logger.debug("%s: parsing logdata %s", self.label, self._logdata)
@@ -73,78 +77,23 @@ class Summarizer(object):
             self._run(logdata)
 
     def _run(self, logdata):
             self._run(logdata)
 
     def _run(self, logdata):
-        self.detected_crunch1 = False
-        for line in logdata:
-            if not self.detected_crunch1 and '-8i9sb-' in line:
-                self.detected_crunch1 = True
-
-            if self.detected_crunch1:
-                m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
-                if m:
-                    seq = int(m.group('seq'))
-                    uuid = m.group('task_uuid')
-                    self.seq_to_uuid[seq] = uuid
-                    logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
-                    continue
-
-                m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
-                if m:
-                    task_id = self.seq_to_uuid[int(m.group('seq'))]
-                    elapsed = int(m.group('elapsed'))
-                    self.task_stats[task_id]['time'] = {'elapsed': elapsed}
-                    if elapsed > self.stats_max['time']['elapsed']:
-                        self.stats_max['time']['elapsed'] = elapsed
-                    continue
+        if not self.node_info:
+            self.node_info = logdata.node_info()
 
 
-                m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
-                if m:
-                    uuid = m.group('uuid')
-                    if self._skip_child_jobs:
-                        logger.warning('%s: omitting stats from child job %s'
-                                       ' because --skip-child-jobs flag is on',
-                                       self.label, uuid)
-                        continue
-                    logger.debug('%s: follow %s', self.label, uuid)
-                    child_summarizer = NewSummarizer(uuid)
-                    child_summarizer.stats_max = self.stats_max
-                    child_summarizer.task_stats = self.task_stats
-                    child_summarizer.tasks = self.tasks
-                    child_summarizer.starttime = self.starttime
-                    child_summarizer.run()
-                    logger.debug('%s: done %s', self.label, uuid)
-                    continue
-
-                # 2017-12-02_17:15:08 e51c5-8i9sb-mfp68stkxnqdd6m 63676 0 stderr crunchstat: keepcalls 0 put 2576 get -- interval 10.0000 seconds 0 put 2576 get
-                m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr (?P<crunchstat>crunchstat: )(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
-                if not m:
-                    continue
-            else:
-                # crunch2
-                # 2017-12-01T16:56:24.723509200Z crunchstat: keepcalls 0 put 3 get -- interval 10.0000 seconds 0 put 3 get
-                m = re.search(r'^(?P<timestamp>\S+) (?P<crunchstat>crunchstat: )?(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
-                if not m:
-                    continue
+        for line in logdata:
+            # crunch2
+            # 2017-12-01T16:56:24.723509200Z crunchstat: keepcalls 0 put 3 get -- interval 10.0000 seconds 0 put 3 get
+            m = re.search(r'^(?P<timestamp>\S+) (?P<crunchstat>crunchstat: )?(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
+            if not m:
+                continue
 
             if self.label is None:
                 try:
                     self.label = m.group('job_uuid')
                 except IndexError:
                     self.label = 'label #1'
 
             if self.label is None:
                 try:
                     self.label = m.group('job_uuid')
                 except IndexError:
                     self.label = 'label #1'
-            category = m.group('category')
-            if category.endswith(':'):
-                # "stderr crunchstat: notice: ..."
-                continue
-            elif category in ('error', 'caught'):
-                continue
-            elif category in ('read', 'open', 'cgroup', 'CID', 'Running'):
-                # "stderr crunchstat: read /proc/1234/net/dev: ..."
-                # (old logs are less careful with unprefixed error messages)
-                continue
 
 
-            if self.detected_crunch1:
-                task_id = self.seq_to_uuid[int(m.group('seq'))]
-            else:
-                task_id = 'container'
+            task_id = 'container'
             task = self.tasks[task_id]
 
             # Use the first and last crunchstat timestamps as
             task = self.tasks[task_id]
 
             # Use the first and last crunchstat timestamps as
@@ -173,12 +122,23 @@ class Summarizer(object):
             if self.finishtime is None or timestamp > self.finishtime:
                 self.finishtime = timestamp
 
             if self.finishtime is None or timestamp > self.finishtime:
                 self.finishtime = timestamp
 
-            if (not self.detected_crunch1) and task.starttime is not None and task.finishtime is not None:
+            if task.starttime is not None and task.finishtime is not None:
                 elapsed = (task.finishtime - task.starttime).seconds
                 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
                 if elapsed > self.stats_max['time']['elapsed']:
                     self.stats_max['time']['elapsed'] = elapsed
 
                 elapsed = (task.finishtime - task.starttime).seconds
                 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
                 if elapsed > self.stats_max['time']['elapsed']:
                     self.stats_max['time']['elapsed'] = elapsed
 
+            category = m.group('category')
+            if category.endswith(':'):
+                # "stderr crunchstat: notice: ..."
+                continue
+            elif category in ('error', 'caught'):
+                continue
+            elif category in ('read', 'open', 'cgroup', 'CID', 'Running'):
+                # "stderr crunchstat: read /proc/1234/net/dev: ..."
+                # (old logs are less careful with unprefixed error messages)
+                continue
+
             this_interval_s = None
             for group in ['current', 'interval']:
                 if not m.group(group):
             this_interval_s = None
             for group in ['current', 'interval']:
                 if not m.group(group):
@@ -245,34 +205,73 @@ class Summarizer(object):
                     self.job_tot[category][stat] += val
         logger.debug('%s: done totals', self.label)
 
                     self.job_tot[category][stat] += val
         logger.debug('%s: done totals', self.label)
 
+        if self.stats_max['time'].get('elapsed', 0) > 20:
+            # needs to have executed for at least 20 seconds or we may
+            # not have collected any metrics and these warnings are duds.
+            missing_category = {
+                'cpu': 'CPU',
+                'mem': 'memory',
+                'net:': 'network I/O',
+                'statfs': 'storage space',
+            }
+            for task_stat in self.task_stats.values():
+                for category in task_stat.keys():
+                    for checkcat in missing_category:
+                        if checkcat.endswith(':'):
+                            if category.startswith(checkcat):
+                                missing_category.pop(checkcat)
+                                break
+                        else:
+                            if category == checkcat:
+                                missing_category.pop(checkcat)
+                                break
+            for catlabel in missing_category.values():
+                logger.warning('%s: %s stats are missing -- possible cluster configuration issue',
+                            self.label, catlabel)
+
     def long_label(self):
         label = self.label
         if hasattr(self, 'process') and self.process['uuid'] not in label:
             label = '{} ({})'.format(label, self.process['uuid'])
     def long_label(self):
         label = self.label
         if hasattr(self, 'process') and self.process['uuid'] not in label:
             label = '{} ({})'.format(label, self.process['uuid'])
-        if self.finishtime:
-            label += ' -- elapsed time '
-            s = (self.finishtime - self.starttime).total_seconds()
-            if s > 86400:
-                label += '{}d'.format(int(s/86400))
-            if s > 3600:
-                label += '{}h'.format(int(s/3600) % 24)
-            if s > 60:
-                label += '{}m'.format(int(s/60) % 60)
-            label += '{}s'.format(int(s) % 60)
+        return label
+
+    def elapsed_time(self):
+        if not self.finishtime:
+            return ""
+        label = ""
+        s = (self.finishtime - self.starttime).total_seconds()
+        if s > 86400:
+            label += '{}d '.format(int(s/86400))
+        if s > 3600:
+            label += '{}h '.format(int(s/3600) % 24)
+        if s > 60:
+            label += '{}m '.format(int(s/60) % 60)
+        label += '{}s'.format(int(s) % 60)
         return label
 
     def text_report(self):
         if not self.tasks:
             return "(no report generated)\n"
         return "\n".join(itertools.chain(
         return label
 
     def text_report(self):
         if not self.tasks:
             return "(no report generated)\n"
         return "\n".join(itertools.chain(
-            self._text_report_gen(),
-            self._recommend_gen())) + "\n"
+            self._text_report_table_gen(lambda x: "\t".join(x),
+                                  lambda x: "\t".join(x)),
+            self._text_report_agg_gen(lambda x: "# {}: {}{}".format(x[0], x[1], x[2])),
+            self._recommend_gen(lambda x: "#!! "+x))) + "\n"
 
     def html_report(self):
 
     def html_report(self):
-        return WEBCHART_CLASS(self.label, [self]).html()
+        tophtml = """{}\n<table class='aggtable'><tbody>{}</tbody></table>\n""".format(
+            "\n".join(self._recommend_gen(lambda x: "<p>{}</p>".format(x))),
+            "\n".join(self._text_report_agg_gen(lambda x: "<tr><th>{}</th><td>{}{}</td></tr>".format(*x))))
+
+        bottomhtml = """<table class='metricstable'><tbody>{}</tbody></table>\n""".format(
+            "\n".join(self._text_report_table_gen(lambda x: "<tr><th>{}</th><th>{}</th><th>{}</th><th>{}</th><th>{}</th></tr>".format(*x),
+                                                        lambda x: "<tr><td>{}</td><td>{}</td><td>{}</td><td>{}</td><td>{}</td></tr>".format(*x))))
+        label = self.long_label()
 
 
-    def _text_report_gen(self):
-        yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
+        return WEBCHART_CLASS(label, [self]).html(tophtml, bottomhtml)
+
+    def _text_report_table_gen(self, headerformat, rowformat):
+        yield headerformat(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
         for category, stat_max in sorted(self.stats_max.items()):
             for stat, val in sorted(stat_max.items()):
                 if stat.endswith('__rate'):
         for category, stat_max in sorted(self.stats_max.items()):
             for stat, val in sorted(stat_max.items()):
                 if stat.endswith('__rate'):
@@ -280,66 +279,135 @@ class Summarizer(object):
                 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
                 val = self._format(val)
                 tot = self._format(self.job_tot[category].get(stat, '-'))
                 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
                 val = self._format(val)
                 tot = self._format(self.job_tot[category].get(stat, '-'))
-                yield "\t".join([category, stat, str(val), max_rate, tot])
-        for args in (
-                ('Number of tasks: {}',
+                yield rowformat([category, stat, str(val), max_rate, tot])
+
+    def _text_report_agg_gen(self, aggformat):
+        by_single_task = ""
+        if len(self.tasks) > 1:
+            by_single_task = " by a single task"
+
+        metrics = [
+            ('Elapsed time',
+             self.elapsed_time(),
+             None,
+             ''),
+
+            ('Estimated cost',
+             '${:.3f}'.format(self.cost),
+             None,
+             '') if self.cost > 0 else None,
+
+            ('Assigned instance type',
+             self.node_info.get('ProviderType'),
+             None,
+             '') if self.node_info.get('ProviderType') else None,
+
+            ('Instance hourly price',
+             '${:.3f}'.format(self.node_info.get('Price')),
+             None,
+             '') if self.node_info.get('Price') else None,
+
+            ('Max CPU usage in a single interval',
+             self.stats_max['cpu']['user+sys__rate'],
+             lambda x: x * 100,
+             '%'),
+
+            ('Overall CPU usage',
+             float(self.job_tot['cpu']['user+sys']) /
+             self.job_tot['time']['elapsed']
+             if self.job_tot['time']['elapsed'] > 0 else 0,
+             lambda x: x * 100,
+             '%'),
+
+            ('Requested CPU cores',
+             self.existing_constraints.get(self._map_runtime_constraint('vcpus')),
+             None,
+             '') if self.existing_constraints.get(self._map_runtime_constraint('vcpus')) else None,
+
+            ('Instance VCPUs',
+             self.node_info.get('VCPUs'),
+             None,
+             '') if self.node_info.get('VCPUs') else None,
+
+            ('Max memory used{}'.format(by_single_task),
+             self.stats_max['mem']['rss'],
+             lambda x: x / 2**20,
+             'MB'),
+
+            ('Requested RAM',
+             self.existing_constraints.get(self._map_runtime_constraint('ram')),
+             lambda x: x / 2**20,
+             'MB') if self.existing_constraints.get(self._map_runtime_constraint('ram')) else None,
+
+            ('Maximum RAM request for this instance type',
+             (self.node_info.get('RAM') - self.arv_config.get('Containers', {}).get('ReserveExtraRAM', 0))*.95,
+             lambda x: x / 2**20,
+             'MB') if self.node_info.get('RAM') else None,
+
+            ('Max network traffic{}'.format(by_single_task),
+             self.stats_max['net:eth0']['tx+rx'] +
+             self.stats_max['net:keep0']['tx+rx'],
+             lambda x: x / 1e9,
+             'GB'),
+
+            ('Max network speed in a single interval',
+             self.stats_max['net:eth0']['tx+rx__rate'] +
+             self.stats_max['net:keep0']['tx+rx__rate'],
+             lambda x: x / 1e6,
+             'MB/s'),
+
+            ('Keep cache miss rate',
+             (float(self.job_tot['keepcache']['miss']) /
+              float(self.job_tot['keepcalls']['get']))
+             if self.job_tot['keepcalls']['get'] > 0 else 0,
+             lambda x: x * 100.0,
+             '%'),
+
+            ('Keep cache utilization',
+             (float(self.job_tot['blkio:0:0']['read']) /
+              float(self.job_tot['net:keep0']['rx']))
+             if self.job_tot['net:keep0']['rx'] > 0 else 0,
+             lambda x: x * 100.0,
+             '%'),
+
+            ('Temp disk utilization',
+             (float(self.job_tot['statfs']['used']) /
+              float(self.job_tot['statfs']['total']))
+             if self.job_tot['statfs']['total'] > 0 else 0,
+             lambda x: x * 100.0,
+             '%'),
+        ]
+
+        if len(self.tasks) > 1:
+            metrics.insert(0, ('Number of tasks',
                  len(self.tasks),
                  len(self.tasks),
-                 None),
-                ('Max CPU time spent by a single task: {}s',
-                 self.stats_max['cpu']['user+sys'],
-                 None),
-                ('Max CPU usage in a single interval: {}%',
-                 self.stats_max['cpu']['user+sys__rate'],
-                 lambda x: x * 100),
-                ('Overall CPU usage: {}%',
-                 float(self.job_tot['cpu']['user+sys']) /
-                 self.job_tot['time']['elapsed']
-                 if self.job_tot['time']['elapsed'] > 0 else 0,
-                 lambda x: x * 100),
-                ('Max memory used by a single task: {}GB',
-                 self.stats_max['mem']['rss'],
-                 lambda x: x / 1e9),
-                ('Max network traffic in a single task: {}GB',
-                 self.stats_max['net:eth0']['tx+rx'] +
-                 self.stats_max['net:keep0']['tx+rx'],
-                 lambda x: x / 1e9),
-                ('Max network speed in a single interval: {}MB/s',
-                 self.stats_max['net:eth0']['tx+rx__rate'] +
-                 self.stats_max['net:keep0']['tx+rx__rate'],
-                 lambda x: x / 1e6),
-                ('Keep cache miss rate {}%',
-                 (float(self.job_tot['keepcache']['miss']) /
-                 float(self.job_tot['keepcalls']['get']))
-                 if self.job_tot['keepcalls']['get'] > 0 else 0,
-                 lambda x: x * 100.0),
-                ('Keep cache utilization {}%',
-                 (float(self.job_tot['blkio:0:0']['read']) /
-                 float(self.job_tot['net:keep0']['rx']))
-                 if self.job_tot['net:keep0']['rx'] > 0 else 0,
-                 lambda x: x * 100.0),
-               ('Temp disk utilization {}%',
-                 (float(self.job_tot['statfs']['used']) /
-                 float(self.job_tot['statfs']['total']))
-                 if self.job_tot['statfs']['total'] > 0 else 0,
-                 lambda x: x * 100.0),
-                ):
-            format_string, val, transform = args
+                 None,
+                 ''))
+        for args in metrics:
+            if args is None:
+                continue
+            format_string, val, transform, suffix = args
             if val == float('-Inf'):
                 continue
             if transform:
                 val = transform(val)
             if val == float('-Inf'):
                 continue
             if transform:
                 val = transform(val)
-            yield "# "+format_string.format(self._format(val))
+            yield aggformat((format_string, self._format(val), suffix))
 
 
-    def _recommend_gen(self):
+    def _recommend_gen(self, recommendformat):
         # TODO recommend fixing job granularity if elapsed time is too short
         # TODO recommend fixing job granularity if elapsed time is too short
+
+        if self.stats_max['time'].get('elapsed', 0) <= 20:
+            # Not enough data
+            return []
+
         return itertools.chain(
         return itertools.chain(
-            self._recommend_cpu(),
-            self._recommend_ram(),
-            self._recommend_keep_cache(),
-            self._recommend_temp_disk(),
+            self._recommend_cpu(recommendformat),
+            self._recommend_ram(recommendformat),
+            self._recommend_keep_cache(recommendformat),
+            self._recommend_temp_disk(recommendformat),
             )
 
             )
 
-    def _recommend_cpu(self):
+    def _recommend_cpu(self, recommendformat):
         """Recommend asking for 4 cores if max CPU usage was 333%"""
 
         constraint_key = self._map_runtime_constraint('vcpus')
         """Recommend asking for 4 cores if max CPU usage was 333%"""
 
         constraint_key = self._map_runtime_constraint('vcpus')
@@ -353,19 +421,17 @@ class Summarizer(object):
         asked_cores = self.existing_constraints.get(constraint_key)
         if asked_cores is None:
             asked_cores = 1
         asked_cores = self.existing_constraints.get(constraint_key)
         if asked_cores is None:
             asked_cores = 1
-        # TODO: This should be more nuanced in cases where max >> avg
-        if used_cores < asked_cores:
-            yield (
-                '#!! {} max CPU usage was {}% -- '
-                'try reducing runtime_constraints to "{}":{}'
+
+        if used_cores < (asked_cores*.5):
+            yield recommendformat(
+                '{} peak CPU usage was only {}% out of possible {}% ({} cores requested)'
             ).format(
                 self.label,
                 math.ceil(cpu_max_rate*100),
             ).format(
                 self.label,
                 math.ceil(cpu_max_rate*100),
-                constraint_key,
-                int(used_cores))
+                asked_cores*100, asked_cores)
 
     # FIXME: This needs to be updated to account for current a-d-c algorithms
 
     # FIXME: This needs to be updated to account for current a-d-c algorithms
-    def _recommend_ram(self):
+    def _recommend_ram(self, recommendformat):
         """Recommend an economical RAM constraint for this job.
 
         Nodes that are advertised as "8 gibibytes" actually have what
         """Recommend an economical RAM constraint for this job.
 
         Nodes that are advertised as "8 gibibytes" actually have what
@@ -404,55 +470,63 @@ class Summarizer(object):
         if used_bytes == float('-Inf'):
             logger.warning('%s: no memory usage data', self.label)
             return
         if used_bytes == float('-Inf'):
             logger.warning('%s: no memory usage data', self.label)
             return
+        if not self.existing_constraints.get(constraint_key):
+            return
         used_mib = math.ceil(float(used_bytes) / MB)
         used_mib = math.ceil(float(used_bytes) / MB)
-        asked_mib = self.existing_constraints.get(constraint_key)
+        asked_mib = self.existing_constraints.get(constraint_key) / MB
 
         nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
 
         nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
-        if used_mib > 0 and (asked_mib is None or (
-                math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib))):
-            yield (
-                '#!! {} max RSS was {} MiB -- '
-                'try reducing runtime_constraints to "{}":{}'
+        ratio = 0.5
+        recommend_mib = int(math.ceil(nearlygibs(used_mib/ratio))*AVAILABLE_RAM_RATIO*1024)
+        if used_mib > 0 and (used_mib / asked_mib) < ratio and asked_mib > recommend_mib:
+            yield recommendformat(
+                '{} peak RAM usage was only {}% ({} MiB used / {} MiB requested)'
             ).format(
                 self.label,
             ).format(
                 self.label,
+                int(math.ceil(100*(used_mib / asked_mib))),
                 int(used_mib),
                 int(used_mib),
-                constraint_key,
-                int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024*(MB)/self._runtime_constraint_mem_unit()))
+                int(asked_mib))
+
+    def _recommend_keep_cache(self, recommendformat):
+        """Recommend increasing keep cache if utilization < 50%.
+
+        This means the amount of data returned to the program is less
+        than 50% of the amount of data actually downloaded by
+        arv-mount.
+        """
 
 
-    def _recommend_keep_cache(self):
-        """Recommend increasing keep cache if utilization < 80%"""
-        constraint_key = self._map_runtime_constraint('keep_cache_ram')
         if self.job_tot['net:keep0']['rx'] == 0:
             return
         if self.job_tot['net:keep0']['rx'] == 0:
             return
+
+        miss_rate = (float(self.job_tot['keepcache']['miss']) /
+                     float(self.job_tot['keepcalls']['get']))
+
         utilization = (float(self.job_tot['blkio:0:0']['read']) /
                        float(self.job_tot['net:keep0']['rx']))
         # FIXME: the default on this get won't work correctly
         utilization = (float(self.job_tot['blkio:0:0']['read']) /
                        float(self.job_tot['net:keep0']['rx']))
         # FIXME: the default on this get won't work correctly
-        asked_cache = self.existing_constraints.get(constraint_key, 256) * self._runtime_constraint_mem_unit()
+        asked_cache = self.existing_constraints.get('keep_cache_ram') or self.existing_constraints.get('keep_cache_disk')
 
 
-        if utilization < 0.8:
-            yield (
-                '#!! {} Keep cache utilization was {:.2f}% -- '
-                'try doubling runtime_constraints to "{}":{} (or more)'
+        if utilization < 0.5 and miss_rate > .05:
+            yield recommendformat(
+                '{} Keep cache utilization was only {:.2f}% and miss rate was {:.2f}% -- '
+                'recommend increasing keep_cache'
             ).format(
                 self.label,
                 utilization * 100.0,
             ).format(
                 self.label,
                 utilization * 100.0,
-                constraint_key,
-                math.ceil(asked_cache * 2 / self._runtime_constraint_mem_unit()))
+                miss_rate * 100.0)
 
 
 
 
-    def _recommend_temp_disk(self):
-        """Recommend decreasing temp disk if utilization < 50%"""
-        total = float(self.job_tot['statfs']['total'])
-        utilization = (float(self.job_tot['statfs']['used']) / total) if total > 0 else 0.0
+    def _recommend_temp_disk(self, recommendformat):
+        """This recommendation is disabled for the time being.  It was
+        using the total disk on the node and not the amount of disk
+        requested, so it would trigger a false positive basically
+        every time.  To get the amount of disk requested we need to
+        fish it out of the mounts, which is extra work I don't want do
+        right now.  You can find the old code at commit 616d135e77
 
 
-        if utilization < 50.8 and total > 0:
-            yield (
-                '#!! {} max temp disk utilization was {:.0f}% of {:.0f} MiB -- '
-                'consider reducing "tmpdirMin" and/or "outdirMin"'
-            ).format(
-                self.label,
-                utilization * 100.0,
-                total / MB)
+        """
+
+        return []
 
 
     def _format(self, val):
 
 
     def _format(self, val):
@@ -467,18 +541,11 @@ class Summarizer(object):
     def _runtime_constraint_mem_unit(self):
         if hasattr(self, 'runtime_constraint_mem_unit'):
             return self.runtime_constraint_mem_unit
     def _runtime_constraint_mem_unit(self):
         if hasattr(self, 'runtime_constraint_mem_unit'):
             return self.runtime_constraint_mem_unit
-        elif self.detected_crunch1:
-            return JobSummarizer.runtime_constraint_mem_unit
         else:
             return ContainerRequestSummarizer.runtime_constraint_mem_unit
 
     def _map_runtime_constraint(self, key):
         else:
             return ContainerRequestSummarizer.runtime_constraint_mem_unit
 
     def _map_runtime_constraint(self, key):
-        if hasattr(self, 'map_runtime_constraint'):
-            return self.map_runtime_constraint[key]
-        elif self.detected_crunch1:
-            return JobSummarizer.map_runtime_constraint[key]
-        else:
-            return key
+        return key
 
 
 class CollectionSummarizer(Summarizer):
 
 
 class CollectionSummarizer(Summarizer):
@@ -497,7 +564,7 @@ def NewSummarizer(process_or_uuid, **kwargs):
     else:
         uuid = process_or_uuid
         process = None
     else:
         uuid = process_or_uuid
         process = None
-        arv = arvados.api('v1', model=OrderedJsonModel())
+        arv = kwargs.get("arv") or arvados.api('v1')
 
     if '-dz642-' in uuid:
         if process is None:
 
     if '-dz642-' in uuid:
         if process is None:
@@ -510,14 +577,6 @@ def NewSummarizer(process_or_uuid, **kwargs):
         if process is None:
             process = arv.container_requests().get(uuid=uuid).execute()
         klass = ContainerRequestTreeSummarizer
         if process is None:
             process = arv.container_requests().get(uuid=uuid).execute()
         klass = ContainerRequestTreeSummarizer
-    elif '-8i9sb-' in uuid:
-        if process is None:
-            process = arv.jobs().get(uuid=uuid).execute()
-        klass = JobTreeSummarizer
-    elif '-d1hrv-' in uuid:
-        if process is None:
-            process = arv.pipeline_instances().get(uuid=uuid).execute()
-        klass = PipelineSummarizer
     elif '-4zz18-' in uuid:
         return CollectionSummarizer(collection_id=uuid)
     else:
     elif '-4zz18-' in uuid:
         return CollectionSummarizer(collection_id=uuid)
     else:
@@ -531,6 +590,7 @@ class ProcessSummarizer(Summarizer):
     def __init__(self, process, label=None, **kwargs):
         rdr = None
         self.process = process
     def __init__(self, process, label=None, **kwargs):
         rdr = None
         self.process = process
+        arv = kwargs.get("arv") or arvados.api('v1')
         if label is None:
             label = self.process.get('name', self.process['uuid'])
         # Pre-Arvados v1.4 everything is in 'log'
         if label is None:
             label = self.process.get('name', self.process['uuid'])
         # Pre-Arvados v1.4 everything is in 'log'
@@ -538,7 +598,10 @@ class ProcessSummarizer(Summarizer):
         log_collection = self.process.get('log', self.process.get('log_uuid'))
         if log_collection and self.process.get('state') != 'Uncommitted': # arvados.util.CR_UNCOMMITTED:
             try:
         log_collection = self.process.get('log', self.process.get('log_uuid'))
         if log_collection and self.process.get('state') != 'Uncommitted': # arvados.util.CR_UNCOMMITTED:
             try:
-                rdr = crunchstat_summary.reader.CollectionReader(log_collection)
+                rdr = crunchstat_summary.reader.CollectionReader(
+                    log_collection,
+                    api_client=arv,
+                    collection_object=kwargs.get("collection_object"))
             except arvados.errors.NotFoundError as e:
                 logger.warning("Trying event logs after failing to read "
                                "log collection %s: %s", self.process['log'], e)
             except arvados.errors.NotFoundError as e:
                 logger.warning("Trying event logs after failing to read "
                                "log collection %s: %s", self.process['log'], e)
@@ -546,17 +609,11 @@ class ProcessSummarizer(Summarizer):
             uuid = self.process.get('container_uuid', self.process.get('uuid'))
             rdr = crunchstat_summary.reader.LiveLogReader(uuid)
             label = label + ' (partial)'
             uuid = self.process.get('container_uuid', self.process.get('uuid'))
             rdr = crunchstat_summary.reader.LiveLogReader(uuid)
             label = label + ' (partial)'
+
         super(ProcessSummarizer, self).__init__(rdr, label=label, **kwargs)
         self.existing_constraints = self.process.get('runtime_constraints', {})
         super(ProcessSummarizer, self).__init__(rdr, label=label, **kwargs)
         self.existing_constraints = self.process.get('runtime_constraints', {})
-
-
-class JobSummarizer(ProcessSummarizer):
-    runtime_constraint_mem_unit = MB
-    map_runtime_constraint = {
-        'keep_cache_ram': 'keep_cache_mb_per_task',
-        'ram': 'min_ram_mb_per_node',
-        'vcpus': 'min_cores_per_node',
-    }
+        self.arv_config = arv.config()
+        self.cost = self.process.get('cost', 0)
 
 
 class ContainerRequestSummarizer(ProcessSummarizer):
 
 
 class ContainerRequestSummarizer(ProcessSummarizer):
@@ -565,26 +622,26 @@ class ContainerRequestSummarizer(ProcessSummarizer):
 
 class MultiSummarizer(object):
     def __init__(self, children={}, label=None, threads=1, **kwargs):
 
 class MultiSummarizer(object):
     def __init__(self, children={}, label=None, threads=1, **kwargs):
-        self.throttle = threading.Semaphore(threads)
         self.children = children
         self.label = label
         self.children = children
         self.label = label
-
-    def run_and_release(self, target, *args, **kwargs):
-        try:
-            return target(*args, **kwargs)
-        finally:
-            self.throttle.release()
+        self.threadcount = threads
 
     def run(self):
 
     def run(self):
-        threads = []
-        for child in self.children.values():
-            self.throttle.acquire()
-            t = threading.Thread(target=self.run_and_release, args=(child.run, ))
-            t.daemon = True
-            t.start()
-            threads.append(t)
-        for t in threads:
-            t.join()
+        if self.threadcount > 1 and len(self.children) > 1:
+            completed = 0
+            def run_and_progress(child):
+                try:
+                    child.run()
+                except Exception as e:
+                    logger.exception("parse error")
+                completed += 1
+                logger.info("%s/%s summarized %s", completed, len(self.children), child.label)
+            with ThreadPoolExecutor(max_workers=self.threadcount) as tpe:
+                for child in self.children.values():
+                    tpe.submit(run_and_progress, child)
+        else:
+            for child in self.children.values():
+                child.run()
 
     def text_report(self):
         txt = ''
 
     def text_report(self):
         txt = ''
@@ -612,57 +669,26 @@ class MultiSummarizer(object):
         return d
 
     def html_report(self):
         return d
 
     def html_report(self):
-        return WEBCHART_CLASS(self.label, iter(self._descendants().values())).html()
+        tophtml = ""
+        bottomhtml = ""
+        label = self.label
+        if len(self._descendants()) == 1:
+            summarizer = next(iter(self._descendants().values()))
+            tophtml = """{}\n<table class='aggtable'><tbody>{}</tbody></table>\n""".format(
+                "\n".join(summarizer._recommend_gen(lambda x: "<p>{}</p>".format(x))),
+                "\n".join(summarizer._text_report_agg_gen(lambda x: "<tr><th>{}</th><td>{}{}</td></tr>".format(*x))))
 
 
+            bottomhtml = """<table class='metricstable'><tbody>{}</tbody></table>\n""".format(
+                "\n".join(summarizer._text_report_table_gen(lambda x: "<tr><th>{}</th><th>{}</th><th>{}</th><th>{}</th><th>{}</th></tr>".format(*x),
+                                                            lambda x: "<tr><td>{}</td><td>{}</td><td>{}</td><td>{}</td><td>{}</td></tr>".format(*x))))
+            label = summarizer.long_label()
 
 
-class JobTreeSummarizer(MultiSummarizer):
-    """Summarizes a job and all children listed in its components field."""
-    def __init__(self, job, label=None, **kwargs):
-        arv = arvados.api('v1', model=OrderedJsonModel())
-        label = label or job.get('name', job['uuid'])
-        children = collections.OrderedDict()
-        children[job['uuid']] = JobSummarizer(job, label=label, **kwargs)
-        if job.get('components', None):
-            preloaded = {}
-            for j in arv.jobs().index(
-                    limit=len(job['components']),
-                    filters=[['uuid','in',list(job['components'].values())]]).execute()['items']:
-                preloaded[j['uuid']] = j
-            for cname in sorted(job['components'].keys()):
-                child_uuid = job['components'][cname]
-                j = (preloaded.get(child_uuid) or
-                     arv.jobs().get(uuid=child_uuid).execute())
-                children[child_uuid] = JobTreeSummarizer(job=j, label=cname, **kwargs)
-
-        super(JobTreeSummarizer, self).__init__(
-            children=children,
-            label=label,
-            **kwargs)
-
-
-class PipelineSummarizer(MultiSummarizer):
-    def __init__(self, instance, **kwargs):
-        children = collections.OrderedDict()
-        for cname, component in instance['components'].items():
-            if 'job' not in component:
-                logger.warning(
-                    "%s: skipping component with no job assigned", cname)
-            else:
-                logger.info(
-                    "%s: job %s", cname, component['job']['uuid'])
-                summarizer = JobTreeSummarizer(component['job'], label=cname, **kwargs)
-                summarizer.label = '{} {}'.format(
-                    cname, component['job']['uuid'])
-                children[cname] = summarizer
-        super(PipelineSummarizer, self).__init__(
-            children=children,
-            label=instance['uuid'],
-            **kwargs)
+        return WEBCHART_CLASS(label, iter(self._descendants().values())).html(tophtml, bottomhtml)
 
 
 class ContainerRequestTreeSummarizer(MultiSummarizer):
     def __init__(self, root, skip_child_jobs=False, **kwargs):
 
 
 class ContainerRequestTreeSummarizer(MultiSummarizer):
     def __init__(self, root, skip_child_jobs=False, **kwargs):
-        arv = arvados.api('v1', model=OrderedJsonModel())
+        arv = kwargs.get("arv") or arvados.api('v1')
 
         label = kwargs.pop('label', None) or root.get('name') or root['uuid']
         root['name'] = label
 
         label = kwargs.pop('label', None) or root.get('name') or root['uuid']
         root['name'] = label
@@ -678,22 +704,15 @@ class ContainerRequestTreeSummarizer(MultiSummarizer):
             summer.sort_key = sort_key
             children[current['uuid']] = summer
 
             summer.sort_key = sort_key
             children[current['uuid']] = summer
 
-            page_filters = []
-            while True:
-                child_crs = arv.container_requests().index(
-                    order=['uuid asc'],
-                    filters=page_filters+[
-                        ['requesting_container_uuid', '=', current['container_uuid']]],
-                ).execute()
-                if not child_crs['items']:
-                    break
-                elif skip_child_jobs:
-                    logger.warning('%s: omitting stats from %d child containers'
-                                   ' because --skip-child-jobs flag is on',
-                                   label, child_crs['items_available'])
-                    break
-                page_filters = [['uuid', '>', child_crs['items'][-1]['uuid']]]
-                for cr in child_crs['items']:
+            if skip_child_jobs:
+                child_crs = arv.container_requests().list(filters=[['requesting_container_uuid', '=', current['container_uuid']]],
+                                                          limit=0).execute()
+                logger.warning('%s: omitting stats from child containers'
+                               ' because --skip-child-jobs flag is on',
+                               label, child_crs['items_available'])
+            else:
+                for cr in arvados.util.keyset_list_all(arv.container_requests().list,
+                                                       filters=[['requesting_container_uuid', '=', current['container_uuid']]]):
                     if cr['container_uuid']:
                         logger.debug('%s: container req %s', current['uuid'], cr['uuid'])
                         cr['name'] = cr.get('name') or cr['uuid']
                     if cr['container_uuid']:
                         logger.debug('%s: container req %s', current['uuid'], cr['uuid'])
                         cr['name'] = cr.get('name') or cr['uuid']