19744: Remove jobs/pipeline templates from crunchstat-summary
[arvados.git] / tools / crunchstat-summary / crunchstat_summary / summarizer.py
index 463c552c4f1eb5caf0868337858197a747bc8fa8..65cee6c1765e0753e634ed7c4e6aa9fe49dfc56c 100644 (file)
@@ -12,16 +12,17 @@ import itertools
 import math
 import re
 import sys
 import math
 import re
 import sys
-import threading
 import _strptime
 import _strptime
+import arvados.util
+
+from concurrent.futures import ThreadPoolExecutor
 
 
-from arvados.api import OrderedJsonModel
 from crunchstat_summary import logger
 
 # Recommend memory constraints that are this multiple of an integral
 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
 # that have amounts like 7.5 GiB according to the kernel.)
 from crunchstat_summary import logger
 
 # Recommend memory constraints that are this multiple of an integral
 # number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
 # that have amounts like 7.5 GiB according to the kernel.)
-AVAILABLE_RAM_RATIO = 0.95
+AVAILABLE_RAM_RATIO = 0.90
 MB=2**20
 
 # Workaround datetime.datetime.strptime() thread-safety bug by calling
 MB=2**20
 
 # Workaround datetime.datetime.strptime() thread-safety bug by calling
@@ -64,8 +65,11 @@ class Summarizer(object):
         # are already suitable.  If applicable, the subclass
         # constructor will overwrite this with something useful.
         self.existing_constraints = {}
         # are already suitable.  If applicable, the subclass
         # constructor will overwrite this with something useful.
         self.existing_constraints = {}
+        self.node_info = {}
+        self.cost = 0
+        self.arv_config = {}
 
 
-        logger.debug("%s: logdata %s", self.label, logdata)
+        logger.info("%s: logdata %s", self.label, logdata)
 
     def run(self):
         logger.debug("%s: parsing logdata %s", self.label, self._logdata)
 
     def run(self):
         logger.debug("%s: parsing logdata %s", self.label, self._logdata)
@@ -73,78 +77,23 @@ class Summarizer(object):
             self._run(logdata)
 
     def _run(self, logdata):
             self._run(logdata)
 
     def _run(self, logdata):
-        self.detected_crunch1 = False
-        for line in logdata:
-            if not self.detected_crunch1 and '-8i9sb-' in line:
-                self.detected_crunch1 = True
-
-            if self.detected_crunch1:
-                m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
-                if m:
-                    seq = int(m.group('seq'))
-                    uuid = m.group('task_uuid')
-                    self.seq_to_uuid[seq] = uuid
-                    logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
-                    continue
-
-                m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
-                if m:
-                    task_id = self.seq_to_uuid[int(m.group('seq'))]
-                    elapsed = int(m.group('elapsed'))
-                    self.task_stats[task_id]['time'] = {'elapsed': elapsed}
-                    if elapsed > self.stats_max['time']['elapsed']:
-                        self.stats_max['time']['elapsed'] = elapsed
-                    continue
-
-                m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
-                if m:
-                    uuid = m.group('uuid')
-                    if self._skip_child_jobs:
-                        logger.warning('%s: omitting stats from child job %s'
-                                       ' because --skip-child-jobs flag is on',
-                                       self.label, uuid)
-                        continue
-                    logger.debug('%s: follow %s', self.label, uuid)
-                    child_summarizer = NewSummarizer(uuid)
-                    child_summarizer.stats_max = self.stats_max
-                    child_summarizer.task_stats = self.task_stats
-                    child_summarizer.tasks = self.tasks
-                    child_summarizer.starttime = self.starttime
-                    child_summarizer.run()
-                    logger.debug('%s: done %s', self.label, uuid)
-                    continue
+        if not self.node_info:
+            self.node_info = logdata.node_info()
 
 
-                # 2017-12-02_17:15:08 e51c5-8i9sb-mfp68stkxnqdd6m 63676 0 stderr crunchstat: keepcalls 0 put 2576 get -- interval 10.0000 seconds 0 put 2576 get
-                m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr (?P<crunchstat>crunchstat: )(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
-                if not m:
-                    continue
-            else:
-                # crunch2
-                # 2017-12-01T16:56:24.723509200Z crunchstat: keepcalls 0 put 3 get -- interval 10.0000 seconds 0 put 3 get
-                m = re.search(r'^(?P<timestamp>\S+) (?P<crunchstat>crunchstat: )?(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
-                if not m:
-                    continue
+        for line in logdata:
+            # crunch2
+            # 2017-12-01T16:56:24.723509200Z crunchstat: keepcalls 0 put 3 get -- interval 10.0000 seconds 0 put 3 get
+            m = re.search(r'^(?P<timestamp>\S+) (?P<crunchstat>crunchstat: )?(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
+            if not m:
+                continue
 
             if self.label is None:
                 try:
                     self.label = m.group('job_uuid')
                 except IndexError:
                     self.label = 'label #1'
 
             if self.label is None:
                 try:
                     self.label = m.group('job_uuid')
                 except IndexError:
                     self.label = 'label #1'
-            category = m.group('category')
-            if category.endswith(':'):
-                # "stderr crunchstat: notice: ..."
-                continue
-            elif category in ('error', 'caught'):
-                continue
-            elif category in ('read', 'open', 'cgroup', 'CID', 'Running'):
-                # "stderr crunchstat: read /proc/1234/net/dev: ..."
-                # (old logs are less careful with unprefixed error messages)
-                continue
 
 
-            if self.detected_crunch1:
-                task_id = self.seq_to_uuid[int(m.group('seq'))]
-            else:
-                task_id = 'container'
+            task_id = 'container'
             task = self.tasks[task_id]
 
             # Use the first and last crunchstat timestamps as
             task = self.tasks[task_id]
 
             # Use the first and last crunchstat timestamps as
@@ -173,12 +122,23 @@ class Summarizer(object):
             if self.finishtime is None or timestamp > self.finishtime:
                 self.finishtime = timestamp
 
             if self.finishtime is None or timestamp > self.finishtime:
                 self.finishtime = timestamp
 
-            if (not self.detected_crunch1) and task.starttime is not None and task.finishtime is not None:
+            if task.starttime is not None and task.finishtime is not None:
                 elapsed = (task.finishtime - task.starttime).seconds
                 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
                 if elapsed > self.stats_max['time']['elapsed']:
                     self.stats_max['time']['elapsed'] = elapsed
 
                 elapsed = (task.finishtime - task.starttime).seconds
                 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
                 if elapsed > self.stats_max['time']['elapsed']:
                     self.stats_max['time']['elapsed'] = elapsed
 
+            category = m.group('category')
+            if category.endswith(':'):
+                # "stderr crunchstat: notice: ..."
+                continue
+            elif category in ('error', 'caught'):
+                continue
+            elif category in ('read', 'open', 'cgroup', 'CID', 'Running'):
+                # "stderr crunchstat: read /proc/1234/net/dev: ..."
+                # (old logs are less careful with unprefixed error messages)
+                continue
+
             this_interval_s = None
             for group in ['current', 'interval']:
                 if not m.group(group):
             this_interval_s = None
             for group in ['current', 'interval']:
                 if not m.group(group):
@@ -245,34 +205,64 @@ class Summarizer(object):
                     self.job_tot[category][stat] += val
         logger.debug('%s: done totals', self.label)
 
                     self.job_tot[category][stat] += val
         logger.debug('%s: done totals', self.label)
 
+        if self.stats_max['time'].get('elapsed', 0) > 20:
+            # needs to have executed for at least 20 seconds or we may
+            # not have collected any metrics and these warnings are duds.
+            missing_category = {
+                'cpu': 'CPU',
+                'mem': 'memory',
+                'net:': 'network I/O',
+                'statfs': 'storage space',
+            }
+            for task_stat in self.task_stats.values():
+                for category in task_stat.keys():
+                    for checkcat in missing_category:
+                        if checkcat.endswith(':'):
+                            if category.startswith(checkcat):
+                                missing_category.pop(checkcat)
+                                break
+                        else:
+                            if category == checkcat:
+                                missing_category.pop(checkcat)
+                                break
+            for catlabel in missing_category.values():
+                logger.warning('%s: %s stats are missing -- possible cluster configuration issue',
+                            self.label, catlabel)
+
     def long_label(self):
         label = self.label
         if hasattr(self, 'process') and self.process['uuid'] not in label:
             label = '{} ({})'.format(label, self.process['uuid'])
     def long_label(self):
         label = self.label
         if hasattr(self, 'process') and self.process['uuid'] not in label:
             label = '{} ({})'.format(label, self.process['uuid'])
-        if self.finishtime:
-            label += ' -- elapsed time '
-            s = (self.finishtime - self.starttime).total_seconds()
-            if s > 86400:
-                label += '{}d'.format(int(s/86400))
-            if s > 3600:
-                label += '{}h'.format(int(s/3600) % 24)
-            if s > 60:
-                label += '{}m'.format(int(s/60) % 60)
-            label += '{}s'.format(int(s) % 60)
+        return label
+
+    def elapsed_time(self):
+        if not self.finishtime:
+            return ""
+        label = ""
+        s = (self.finishtime - self.starttime).total_seconds()
+        if s > 86400:
+            label += '{}d '.format(int(s/86400))
+        if s > 3600:
+            label += '{}h '.format(int(s/3600) % 24)
+        if s > 60:
+            label += '{}m '.format(int(s/60) % 60)
+        label += '{}s'.format(int(s) % 60)
         return label
 
     def text_report(self):
         if not self.tasks:
             return "(no report generated)\n"
         return "\n".join(itertools.chain(
         return label
 
     def text_report(self):
         if not self.tasks:
             return "(no report generated)\n"
         return "\n".join(itertools.chain(
-            self._text_report_gen(),
-            self._recommend_gen())) + "\n"
+            self._text_report_table_gen(lambda x: "\t".join(x),
+                                  lambda x: "\t".join(x)),
+            self._text_report_agg_gen(lambda x: "# {}: {}{}".format(x[0], x[1], x[2])),
+            self._recommend_gen(lambda x: "#!! "+x))) + "\n"
 
     def html_report(self):
         return WEBCHART_CLASS(self.label, [self]).html()
 
 
     def html_report(self):
         return WEBCHART_CLASS(self.label, [self]).html()
 
-    def _text_report_gen(self):
-        yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
+    def _text_report_table_gen(self, headerformat, rowformat):
+        yield headerformat(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
         for category, stat_max in sorted(self.stats_max.items()):
             for stat, val in sorted(stat_max.items()):
                 if stat.endswith('__rate'):
         for category, stat_max in sorted(self.stats_max.items()):
             for stat, val in sorted(stat_max.items()):
                 if stat.endswith('__rate'):
@@ -280,66 +270,135 @@ class Summarizer(object):
                 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
                 val = self._format(val)
                 tot = self._format(self.job_tot[category].get(stat, '-'))
                 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
                 val = self._format(val)
                 tot = self._format(self.job_tot[category].get(stat, '-'))
-                yield "\t".join([category, stat, str(val), max_rate, tot])
-        for args in (
-                ('Number of tasks: {}',
+                yield rowformat([category, stat, str(val), max_rate, tot])
+
+    def _text_report_agg_gen(self, aggformat):
+        by_single_task = ""
+        if len(self.tasks) > 1:
+            by_single_task = " by a single task"
+
+        metrics = [
+            ('Elapsed time',
+             self.elapsed_time(),
+             None,
+             ''),
+
+            ('Estimated cost',
+             '${:.3f}'.format(self.cost),
+             None,
+             '') if self.cost > 0 else None,
+
+            ('Assigned instance type',
+             self.node_info.get('ProviderType'),
+             None,
+             '') if self.node_info.get('ProviderType') else None,
+
+            ('Instance hourly price',
+             '${:.3f}'.format(self.node_info.get('Price')),
+             None,
+             '') if self.node_info.get('Price') else None,
+
+            ('Max CPU usage in a single interval',
+             self.stats_max['cpu']['user+sys__rate'],
+             lambda x: x * 100,
+             '%'),
+
+            ('Overall CPU usage',
+             float(self.job_tot['cpu']['user+sys']) /
+             self.job_tot['time']['elapsed']
+             if self.job_tot['time']['elapsed'] > 0 else 0,
+             lambda x: x * 100,
+             '%'),
+
+            ('Requested CPU cores',
+             self.existing_constraints.get(self._map_runtime_constraint('vcpus')),
+             None,
+             '') if self.existing_constraints.get(self._map_runtime_constraint('vcpus')) else None,
+
+            ('Instance VCPUs',
+             self.node_info.get('VCPUs'),
+             None,
+             '') if self.node_info.get('VCPUs') else None,
+
+            ('Max memory used{}'.format(by_single_task),
+             self.stats_max['mem']['rss'],
+             lambda x: x / 2**20,
+             'MB'),
+
+            ('Requested RAM',
+             self.existing_constraints.get(self._map_runtime_constraint('ram')),
+             lambda x: x / 2**20,
+             'MB') if self.existing_constraints.get(self._map_runtime_constraint('ram')) else None,
+
+            ('Maximum RAM request for this instance type',
+             (self.node_info.get('RAM') - self.arv_config.get('Containers', {}).get('ReserveExtraRAM', 0))*.95,
+             lambda x: x / 2**20,
+             'MB') if self.node_info.get('RAM') else None,
+
+            ('Max network traffic{}'.format(by_single_task),
+             self.stats_max['net:eth0']['tx+rx'] +
+             self.stats_max['net:keep0']['tx+rx'],
+             lambda x: x / 1e9,
+             'GB'),
+
+            ('Max network speed in a single interval',
+             self.stats_max['net:eth0']['tx+rx__rate'] +
+             self.stats_max['net:keep0']['tx+rx__rate'],
+             lambda x: x / 1e6,
+             'MB/s'),
+
+            ('Keep cache miss rate',
+             (float(self.job_tot['keepcache']['miss']) /
+              float(self.job_tot['keepcalls']['get']))
+             if self.job_tot['keepcalls']['get'] > 0 else 0,
+             lambda x: x * 100.0,
+             '%'),
+
+            ('Keep cache utilization',
+             (float(self.job_tot['blkio:0:0']['read']) /
+              float(self.job_tot['net:keep0']['rx']))
+             if self.job_tot['net:keep0']['rx'] > 0 else 0,
+             lambda x: x * 100.0,
+             '%'),
+
+            ('Temp disk utilization',
+             (float(self.job_tot['statfs']['used']) /
+              float(self.job_tot['statfs']['total']))
+             if self.job_tot['statfs']['total'] > 0 else 0,
+             lambda x: x * 100.0,
+             '%'),
+        ]
+
+        if len(self.tasks) > 1:
+            metrics.insert(0, ('Number of tasks',
                  len(self.tasks),
                  len(self.tasks),
-                 None),
-                ('Max CPU time spent by a single task: {}s',
-                 self.stats_max['cpu']['user+sys'],
-                 None),
-                ('Max CPU usage in a single interval: {}%',
-                 self.stats_max['cpu']['user+sys__rate'],
-                 lambda x: x * 100),
-                ('Overall CPU usage: {}%',
-                 float(self.job_tot['cpu']['user+sys']) /
-                 self.job_tot['time']['elapsed']
-                 if self.job_tot['time']['elapsed'] > 0 else 0,
-                 lambda x: x * 100),
-                ('Max memory used by a single task: {}GB',
-                 self.stats_max['mem']['rss'],
-                 lambda x: x / 1e9),
-                ('Max network traffic in a single task: {}GB',
-                 self.stats_max['net:eth0']['tx+rx'] +
-                 self.stats_max['net:keep0']['tx+rx'],
-                 lambda x: x / 1e9),
-                ('Max network speed in a single interval: {}MB/s',
-                 self.stats_max['net:eth0']['tx+rx__rate'] +
-                 self.stats_max['net:keep0']['tx+rx__rate'],
-                 lambda x: x / 1e6),
-                ('Keep cache miss rate {}%',
-                 (float(self.job_tot['keepcache']['miss']) /
-                 float(self.job_tot['keepcalls']['get']))
-                 if self.job_tot['keepcalls']['get'] > 0 else 0,
-                 lambda x: x * 100.0),
-                ('Keep cache utilization {}%',
-                 (float(self.job_tot['blkio:0:0']['read']) /
-                 float(self.job_tot['net:keep0']['rx']))
-                 if self.job_tot['net:keep0']['rx'] > 0 else 0,
-                 lambda x: x * 100.0),
-               ('Temp disk utilization {}%',
-                 (float(self.job_tot['statfs']['used']) /
-                 float(self.job_tot['statfs']['total']))
-                 if self.job_tot['statfs']['total'] > 0 else 0,
-                 lambda x: x * 100.0),
-                ):
-            format_string, val, transform = args
+                 None,
+                 ''))
+        for args in metrics:
+            if args is None:
+                continue
+            format_string, val, transform, suffix = args
             if val == float('-Inf'):
                 continue
             if transform:
                 val = transform(val)
             if val == float('-Inf'):
                 continue
             if transform:
                 val = transform(val)
-            yield "# "+format_string.format(self._format(val))
+            yield aggformat((format_string, self._format(val), suffix))
 
 
-    def _recommend_gen(self):
+    def _recommend_gen(self, recommendformat):
         # TODO recommend fixing job granularity if elapsed time is too short
         # TODO recommend fixing job granularity if elapsed time is too short
+
+        if self.stats_max['time'].get('elapsed', 0) <= 20:
+            # Not enough data
+            return []
+
         return itertools.chain(
         return itertools.chain(
-            self._recommend_cpu(),
-            self._recommend_ram(),
-            self._recommend_keep_cache(),
-            self._recommend_temp_disk(),
+            self._recommend_cpu(recommendformat),
+            self._recommend_ram(recommendformat),
+            self._recommend_keep_cache(recommendformat),
+            self._recommend_temp_disk(recommendformat),
             )
 
             )
 
-    def _recommend_cpu(self):
+    def _recommend_cpu(self, recommendformat):
         """Recommend asking for 4 cores if max CPU usage was 333%"""
 
         constraint_key = self._map_runtime_constraint('vcpus')
         """Recommend asking for 4 cores if max CPU usage was 333%"""
 
         constraint_key = self._map_runtime_constraint('vcpus')
@@ -355,8 +414,8 @@ class Summarizer(object):
             asked_cores = 1
         # TODO: This should be more nuanced in cases where max >> avg
         if used_cores < asked_cores:
             asked_cores = 1
         # TODO: This should be more nuanced in cases where max >> avg
         if used_cores < asked_cores:
-            yield (
-                '#!! {} max CPU usage was {}% -- '
+            yield recommendformat(
+                '{} max CPU usage was {}% -- '
                 'try reducing runtime_constraints to "{}":{}'
             ).format(
                 self.label,
                 'try reducing runtime_constraints to "{}":{}'
             ).format(
                 self.label,
@@ -365,7 +424,7 @@ class Summarizer(object):
                 int(used_cores))
 
     # FIXME: This needs to be updated to account for current a-d-c algorithms
                 int(used_cores))
 
     # FIXME: This needs to be updated to account for current a-d-c algorithms
-    def _recommend_ram(self):
+    def _recommend_ram(self, recommendformat):
         """Recommend an economical RAM constraint for this job.
 
         Nodes that are advertised as "8 gibibytes" actually have what
         """Recommend an economical RAM constraint for this job.
 
         Nodes that are advertised as "8 gibibytes" actually have what
@@ -404,22 +463,26 @@ class Summarizer(object):
         if used_bytes == float('-Inf'):
             logger.warning('%s: no memory usage data', self.label)
             return
         if used_bytes == float('-Inf'):
             logger.warning('%s: no memory usage data', self.label)
             return
+        if not self.existing_constraints.get(constraint_key):
+            return
         used_mib = math.ceil(float(used_bytes) / MB)
         used_mib = math.ceil(float(used_bytes) / MB)
-        asked_mib = self.existing_constraints.get(constraint_key)
+        asked_mib = self.existing_constraints.get(constraint_key) / MB
 
         nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
 
         nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
-        if used_mib > 0 and (asked_mib is None or (
-                math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib))):
-            yield (
-                '#!! {} max RSS was {} MiB -- '
-                'try reducing runtime_constraints to "{}":{}'
+        ratio = 0.5
+        recommend_mib = int(math.ceil(nearlygibs(used_mib/ratio))*AVAILABLE_RAM_RATIO*1024)
+        if used_mib > 0 and (used_mib / asked_mib) < ratio and asked_mib > recommend_mib:
+            yield recommendformat(
+                '{} requested {} MiB of RAM but actual RAM usage was below {}% at {} MiB -- '
+                'suggest reducing RAM request to {} MiB'
             ).format(
                 self.label,
             ).format(
                 self.label,
+                int(asked_mib),
+                int(100*ratio),
                 int(used_mib),
                 int(used_mib),
-                constraint_key,
-                int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024*(MB)/self._runtime_constraint_mem_unit()))
+                recommend_mib)
 
 
-    def _recommend_keep_cache(self):
+    def _recommend_keep_cache(self, recommendformat):
         """Recommend increasing keep cache if utilization < 80%"""
         constraint_key = self._map_runtime_constraint('keep_cache_ram')
         if self.job_tot['net:keep0']['rx'] == 0:
         """Recommend increasing keep cache if utilization < 80%"""
         constraint_key = self._map_runtime_constraint('keep_cache_ram')
         if self.job_tot['net:keep0']['rx'] == 0:
@@ -430,8 +493,8 @@ class Summarizer(object):
         asked_cache = self.existing_constraints.get(constraint_key, 256) * self._runtime_constraint_mem_unit()
 
         if utilization < 0.8:
         asked_cache = self.existing_constraints.get(constraint_key, 256) * self._runtime_constraint_mem_unit()
 
         if utilization < 0.8:
-            yield (
-                '#!! {} Keep cache utilization was {:.2f}% -- '
+            yield recommendformat(
+                '{} Keep cache utilization was {:.2f}% -- '
                 'try doubling runtime_constraints to "{}":{} (or more)'
             ).format(
                 self.label,
                 'try doubling runtime_constraints to "{}":{} (or more)'
             ).format(
                 self.label,
@@ -440,14 +503,14 @@ class Summarizer(object):
                 math.ceil(asked_cache * 2 / self._runtime_constraint_mem_unit()))
 
 
                 math.ceil(asked_cache * 2 / self._runtime_constraint_mem_unit()))
 
 
-    def _recommend_temp_disk(self):
+    def _recommend_temp_disk(self, recommendformat):
         """Recommend decreasing temp disk if utilization < 50%"""
         total = float(self.job_tot['statfs']['total'])
         utilization = (float(self.job_tot['statfs']['used']) / total) if total > 0 else 0.0
 
         if utilization < 50.8 and total > 0:
         """Recommend decreasing temp disk if utilization < 50%"""
         total = float(self.job_tot['statfs']['total'])
         utilization = (float(self.job_tot['statfs']['used']) / total) if total > 0 else 0.0
 
         if utilization < 50.8 and total > 0:
-            yield (
-                '#!! {} max temp disk utilization was {:.0f}% of {:.0f} MiB -- '
+            yield recommendformat(
+                '{} max temp disk utilization was {:.0f}% of {:.0f} MiB -- '
                 'consider reducing "tmpdirMin" and/or "outdirMin"'
             ).format(
                 self.label,
                 'consider reducing "tmpdirMin" and/or "outdirMin"'
             ).format(
                 self.label,
@@ -467,18 +530,11 @@ class Summarizer(object):
     def _runtime_constraint_mem_unit(self):
         if hasattr(self, 'runtime_constraint_mem_unit'):
             return self.runtime_constraint_mem_unit
     def _runtime_constraint_mem_unit(self):
         if hasattr(self, 'runtime_constraint_mem_unit'):
             return self.runtime_constraint_mem_unit
-        elif self.detected_crunch1:
-            return JobSummarizer.runtime_constraint_mem_unit
         else:
             return ContainerRequestSummarizer.runtime_constraint_mem_unit
 
     def _map_runtime_constraint(self, key):
         else:
             return ContainerRequestSummarizer.runtime_constraint_mem_unit
 
     def _map_runtime_constraint(self, key):
-        if hasattr(self, 'map_runtime_constraint'):
-            return self.map_runtime_constraint[key]
-        elif self.detected_crunch1:
-            return JobSummarizer.map_runtime_constraint[key]
-        else:
-            return key
+        return key
 
 
 class CollectionSummarizer(Summarizer):
 
 
 class CollectionSummarizer(Summarizer):
@@ -497,7 +553,7 @@ def NewSummarizer(process_or_uuid, **kwargs):
     else:
         uuid = process_or_uuid
         process = None
     else:
         uuid = process_or_uuid
         process = None
-        arv = arvados.api('v1', model=OrderedJsonModel())
+        arv = kwargs.get("arv") or arvados.api('v1')
 
     if '-dz642-' in uuid:
         if process is None:
 
     if '-dz642-' in uuid:
         if process is None:
@@ -510,14 +566,6 @@ def NewSummarizer(process_or_uuid, **kwargs):
         if process is None:
             process = arv.container_requests().get(uuid=uuid).execute()
         klass = ContainerRequestTreeSummarizer
         if process is None:
             process = arv.container_requests().get(uuid=uuid).execute()
         klass = ContainerRequestTreeSummarizer
-    elif '-8i9sb-' in uuid:
-        if process is None:
-            process = arv.jobs().get(uuid=uuid).execute()
-        klass = JobTreeSummarizer
-    elif '-d1hrv-' in uuid:
-        if process is None:
-            process = arv.pipeline_instances().get(uuid=uuid).execute()
-        klass = PipelineSummarizer
     elif '-4zz18-' in uuid:
         return CollectionSummarizer(collection_id=uuid)
     else:
     elif '-4zz18-' in uuid:
         return CollectionSummarizer(collection_id=uuid)
     else:
@@ -531,6 +579,7 @@ class ProcessSummarizer(Summarizer):
     def __init__(self, process, label=None, **kwargs):
         rdr = None
         self.process = process
     def __init__(self, process, label=None, **kwargs):
         rdr = None
         self.process = process
+        arv = kwargs.get("arv") or arvados.api('v1')
         if label is None:
             label = self.process.get('name', self.process['uuid'])
         # Pre-Arvados v1.4 everything is in 'log'
         if label is None:
             label = self.process.get('name', self.process['uuid'])
         # Pre-Arvados v1.4 everything is in 'log'
@@ -538,7 +587,10 @@ class ProcessSummarizer(Summarizer):
         log_collection = self.process.get('log', self.process.get('log_uuid'))
         if log_collection and self.process.get('state') != 'Uncommitted': # arvados.util.CR_UNCOMMITTED:
             try:
         log_collection = self.process.get('log', self.process.get('log_uuid'))
         if log_collection and self.process.get('state') != 'Uncommitted': # arvados.util.CR_UNCOMMITTED:
             try:
-                rdr = crunchstat_summary.reader.CollectionReader(log_collection)
+                rdr = crunchstat_summary.reader.CollectionReader(
+                    log_collection,
+                    api_client=arv,
+                    collection_object=kwargs.get("collection_object"))
             except arvados.errors.NotFoundError as e:
                 logger.warning("Trying event logs after failing to read "
                                "log collection %s: %s", self.process['log'], e)
             except arvados.errors.NotFoundError as e:
                 logger.warning("Trying event logs after failing to read "
                                "log collection %s: %s", self.process['log'], e)
@@ -546,17 +598,11 @@ class ProcessSummarizer(Summarizer):
             uuid = self.process.get('container_uuid', self.process.get('uuid'))
             rdr = crunchstat_summary.reader.LiveLogReader(uuid)
             label = label + ' (partial)'
             uuid = self.process.get('container_uuid', self.process.get('uuid'))
             rdr = crunchstat_summary.reader.LiveLogReader(uuid)
             label = label + ' (partial)'
+
         super(ProcessSummarizer, self).__init__(rdr, label=label, **kwargs)
         self.existing_constraints = self.process.get('runtime_constraints', {})
         super(ProcessSummarizer, self).__init__(rdr, label=label, **kwargs)
         self.existing_constraints = self.process.get('runtime_constraints', {})
-
-
-class JobSummarizer(ProcessSummarizer):
-    runtime_constraint_mem_unit = MB
-    map_runtime_constraint = {
-        'keep_cache_ram': 'keep_cache_mb_per_task',
-        'ram': 'min_ram_mb_per_node',
-        'vcpus': 'min_cores_per_node',
-    }
+        self.arv_config = arv.config()
+        self.cost = self.process.get('cost', 0)
 
 
 class ContainerRequestSummarizer(ProcessSummarizer):
 
 
 class ContainerRequestSummarizer(ProcessSummarizer):
@@ -565,26 +611,26 @@ class ContainerRequestSummarizer(ProcessSummarizer):
 
 class MultiSummarizer(object):
     def __init__(self, children={}, label=None, threads=1, **kwargs):
 
 class MultiSummarizer(object):
     def __init__(self, children={}, label=None, threads=1, **kwargs):
-        self.throttle = threading.Semaphore(threads)
         self.children = children
         self.label = label
         self.children = children
         self.label = label
-
-    def run_and_release(self, target, *args, **kwargs):
-        try:
-            return target(*args, **kwargs)
-        finally:
-            self.throttle.release()
+        self.threadcount = threads
 
     def run(self):
 
     def run(self):
-        threads = []
-        for child in self.children.values():
-            self.throttle.acquire()
-            t = threading.Thread(target=self.run_and_release, args=(child.run, ))
-            t.daemon = True
-            t.start()
-            threads.append(t)
-        for t in threads:
-            t.join()
+        if self.threadcount > 1 and len(self.children) > 1:
+            completed = 0
+            def run_and_progress(child):
+                try:
+                    child.run()
+                except Exception as e:
+                    logger.exception("parse error")
+                completed += 1
+                logger.info("%s/%s summarized %s", completed, len(self.children), child.label)
+            with ThreadPoolExecutor(max_workers=self.threadcount) as tpe:
+                for child in self.children.values():
+                    tpe.submit(run_and_progress, child)
+        else:
+            for child in self.children.values():
+                child.run()
 
     def text_report(self):
         txt = ''
 
     def text_report(self):
         txt = ''
@@ -612,57 +658,26 @@ class MultiSummarizer(object):
         return d
 
     def html_report(self):
         return d
 
     def html_report(self):
-        return WEBCHART_CLASS(self.label, iter(self._descendants().values())).html()
-
-
-class JobTreeSummarizer(MultiSummarizer):
-    """Summarizes a job and all children listed in its components field."""
-    def __init__(self, job, label=None, **kwargs):
-        arv = arvados.api('v1', model=OrderedJsonModel())
-        label = label or job.get('name', job['uuid'])
-        children = collections.OrderedDict()
-        children[job['uuid']] = JobSummarizer(job, label=label, **kwargs)
-        if job.get('components', None):
-            preloaded = {}
-            for j in arv.jobs().index(
-                    limit=len(job['components']),
-                    filters=[['uuid','in',list(job['components'].values())]]).execute()['items']:
-                preloaded[j['uuid']] = j
-            for cname in sorted(job['components'].keys()):
-                child_uuid = job['components'][cname]
-                j = (preloaded.get(child_uuid) or
-                     arv.jobs().get(uuid=child_uuid).execute())
-                children[child_uuid] = JobTreeSummarizer(job=j, label=cname, **kwargs)
-
-        super(JobTreeSummarizer, self).__init__(
-            children=children,
-            label=label,
-            **kwargs)
+        tophtml = ""
+        bottomhtml = ""
+        label = self.label
+        if len(self._descendants()) == 1:
+            summarizer = next(iter(self._descendants().values()))
+            tophtml = """{}\n<table class='aggtable'><tbody>{}</tbody></table>\n""".format(
+                "\n".join(summarizer._recommend_gen(lambda x: "<p>{}</p>".format(x))),
+                "\n".join(summarizer._text_report_agg_gen(lambda x: "<tr><th>{}</th><td>{}{}</td></tr>".format(*x))))
 
 
+            bottomhtml = """<table class='metricstable'><tbody>{}</tbody></table>\n""".format(
+                "\n".join(summarizer._text_report_table_gen(lambda x: "<tr><th>{}</th><th>{}</th><th>{}</th><th>{}</th><th>{}</th></tr>".format(*x),
+                                                            lambda x: "<tr><td>{}</td><td>{}</td><td>{}</td><td>{}</td><td>{}</td></tr>".format(*x))))
+            label = summarizer.long_label()
 
 
-class PipelineSummarizer(MultiSummarizer):
-    def __init__(self, instance, **kwargs):
-        children = collections.OrderedDict()
-        for cname, component in instance['components'].items():
-            if 'job' not in component:
-                logger.warning(
-                    "%s: skipping component with no job assigned", cname)
-            else:
-                logger.info(
-                    "%s: job %s", cname, component['job']['uuid'])
-                summarizer = JobTreeSummarizer(component['job'], label=cname, **kwargs)
-                summarizer.label = '{} {}'.format(
-                    cname, component['job']['uuid'])
-                children[cname] = summarizer
-        super(PipelineSummarizer, self).__init__(
-            children=children,
-            label=instance['uuid'],
-            **kwargs)
+        return WEBCHART_CLASS(label, iter(self._descendants().values())).html(tophtml, bottomhtml)
 
 
 class ContainerRequestTreeSummarizer(MultiSummarizer):
     def __init__(self, root, skip_child_jobs=False, **kwargs):
 
 
 class ContainerRequestTreeSummarizer(MultiSummarizer):
     def __init__(self, root, skip_child_jobs=False, **kwargs):
-        arv = arvados.api('v1', model=OrderedJsonModel())
+        arv = kwargs.get("arv") or arvados.api('v1')
 
         label = kwargs.pop('label', None) or root.get('name') or root['uuid']
         root['name'] = label
 
         label = kwargs.pop('label', None) or root.get('name') or root['uuid']
         root['name'] = label
@@ -678,22 +693,15 @@ class ContainerRequestTreeSummarizer(MultiSummarizer):
             summer.sort_key = sort_key
             children[current['uuid']] = summer
 
             summer.sort_key = sort_key
             children[current['uuid']] = summer
 
-            page_filters = []
-            while True:
-                child_crs = arv.container_requests().index(
-                    order=['uuid asc'],
-                    filters=page_filters+[
-                        ['requesting_container_uuid', '=', current['container_uuid']]],
-                ).execute()
-                if not child_crs['items']:
-                    break
-                elif skip_child_jobs:
-                    logger.warning('%s: omitting stats from %d child containers'
-                                   ' because --skip-child-jobs flag is on',
-                                   label, child_crs['items_available'])
-                    break
-                page_filters = [['uuid', '>', child_crs['items'][-1]['uuid']]]
-                for cr in child_crs['items']:
+            if skip_child_jobs:
+                child_crs = arv.container_requests().list(filters=[['requesting_container_uuid', '=', current['container_uuid']]],
+                                                          limit=0).execute()
+                logger.warning('%s: omitting stats from child containers'
+                               ' because --skip-child-jobs flag is on',
+                               label, child_crs['items_available'])
+            else:
+                for cr in arvados.util.keyset_list_all(arv.container_requests().list,
+                                                       filters=[['requesting_container_uuid', '=', current['container_uuid']]]):
                     if cr['container_uuid']:
                         logger.debug('%s: container req %s', current['uuid'], cr['uuid'])
                         cr['name'] = cr.get('name') or cr['uuid']
                     if cr['container_uuid']:
                         logger.debug('%s: container req %s', current['uuid'], cr['uuid'])
                         cr['name'] = cr.get('name') or cr['uuid']