19744: Add --enable/disable-usage-report
[arvados.git] / tools / crunchstat-summary / crunchstat_summary / summarizer.py
index a67b72f89cdc84dc4736c77cff49ec3a0f142353..a7c2b0a3831c7f667adb1435ce8ce0cfccfb91c5 100644 (file)
@@ -66,6 +66,8 @@ class Summarizer(object):
         # constructor will overwrite this with something useful.
         self.existing_constraints = {}
         self.node_info = {}
+        self.cost = 0
+        self.arv_config = {}
 
         logger.info("%s: logdata %s", self.label, logdata)
 
@@ -75,82 +77,23 @@ class Summarizer(object):
             self._run(logdata)
 
     def _run(self, logdata):
-        self.detected_crunch1 = False
-
         if not self.node_info:
             self.node_info = logdata.node_info()
 
         for line in logdata:
-            if not self.detected_crunch1 and '-8i9sb-' in line:
-                self.detected_crunch1 = True
-
-            if self.detected_crunch1:
-                m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
-                if m:
-                    seq = int(m.group('seq'))
-                    uuid = m.group('task_uuid')
-                    self.seq_to_uuid[seq] = uuid
-                    logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
-                    continue
-
-                m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
-                if m:
-                    task_id = self.seq_to_uuid[int(m.group('seq'))]
-                    elapsed = int(m.group('elapsed'))
-                    self.task_stats[task_id]['time'] = {'elapsed': elapsed}
-                    if elapsed > self.stats_max['time']['elapsed']:
-                        self.stats_max['time']['elapsed'] = elapsed
-                    continue
-
-                m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
-                if m:
-                    uuid = m.group('uuid')
-                    if self._skip_child_jobs:
-                        logger.warning('%s: omitting stats from child job %s'
-                                       ' because --skip-child-jobs flag is on',
-                                       self.label, uuid)
-                        continue
-                    logger.debug('%s: follow %s', self.label, uuid)
-                    child_summarizer = NewSummarizer(uuid)
-                    child_summarizer.stats_max = self.stats_max
-                    child_summarizer.task_stats = self.task_stats
-                    child_summarizer.tasks = self.tasks
-                    child_summarizer.starttime = self.starttime
-                    child_summarizer.run()
-                    logger.debug('%s: done %s', self.label, uuid)
-                    continue
-
-                # 2017-12-02_17:15:08 e51c5-8i9sb-mfp68stkxnqdd6m 63676 0 stderr crunchstat: keepcalls 0 put 2576 get -- interval 10.0000 seconds 0 put 2576 get
-                m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr (?P<crunchstat>crunchstat: )(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
-                if not m:
-                    continue
-            else:
-                # crunch2
-                # 2017-12-01T16:56:24.723509200Z crunchstat: keepcalls 0 put 3 get -- interval 10.0000 seconds 0 put 3 get
-                m = re.search(r'^(?P<timestamp>\S+) (?P<crunchstat>crunchstat: )?(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
-                if not m:
-                    continue
+            # crunch2
+            # 2017-12-01T16:56:24.723509200Z crunchstat: keepcalls 0 put 3 get -- interval 10.0000 seconds 0 put 3 get
+            m = re.search(r'^(?P<timestamp>\S+) (?P<crunchstat>crunchstat: )?(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
+            if not m:
+                continue
 
             if self.label is None:
                 try:
                     self.label = m.group('job_uuid')
                 except IndexError:
                     self.label = 'label #1'
-            category = m.group('category')
-            if category.endswith(':'):
-                # "stderr crunchstat: notice: ..."
-                continue
-            elif category in ('error', 'caught'):
-                continue
-            elif category in ('read', 'open', 'cgroup', 'CID', 'Running'):
-                # "stderr crunchstat: read /proc/1234/net/dev: ..."
-                # (old logs are less careful with unprefixed error messages)
-                continue
 
-            if self.detected_crunch1:
-                task_id = self.seq_to_uuid[int(m.group('seq'))]
-            else:
-                task_id = 'container'
+            task_id = 'container'
             task = self.tasks[task_id]
 
             # Use the first and last crunchstat timestamps as
@@ -179,12 +122,23 @@ class Summarizer(object):
             if self.finishtime is None or timestamp > self.finishtime:
                 self.finishtime = timestamp
 
-            if (not self.detected_crunch1) and task.starttime is not None and task.finishtime is not None:
+            if task.starttime is not None and task.finishtime is not None:
                 elapsed = (task.finishtime - task.starttime).seconds
                 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
                 if elapsed > self.stats_max['time']['elapsed']:
                     self.stats_max['time']['elapsed'] = elapsed
 
+            category = m.group('category')
+            if category.endswith(':'):
+                # "stderr crunchstat: notice: ..."
+                continue
+            elif category in ('error', 'caught'):
+                continue
+            elif category in ('read', 'open', 'cgroup', 'CID', 'Running'):
+                # "stderr crunchstat: read /proc/1234/net/dev: ..."
+                # (old logs are less careful with unprefixed error messages)
+                continue
+
             this_interval_s = None
             for group in ['current', 'interval']:
                 if not m.group(group):
@@ -251,26 +205,29 @@ class Summarizer(object):
                     self.job_tot[category][stat] += val
         logger.debug('%s: done totals', self.label)
 
-        missing_category = {
-            'cpu': 'CPU',
-            'mem': 'memory',
-            'net:': 'network I/O',
-            'statfs': 'storage space',
-        }
-        for task_stat in self.task_stats.values():
-            for category in task_stat.keys():
-                for checkcat in missing_category:
-                    if checkcat.endswith(':'):
-                        if category.startswith(checkcat):
-                            missing_category.pop(checkcat)
-                            break
-                    else:
-                        if category == checkcat:
-                            missing_category.pop(checkcat)
-                            break
-        for catlabel in missing_category.values():
-            logger.warning('%s: %s stats are missing -- possible cluster configuration issue',
-                        self.label, catlabel)
+        if self.stats_max['time'].get('elapsed', 0) > 20:
+            # needs to have executed for at least 20 seconds or we may
+            # not have collected any metrics and these warnings are duds.
+            missing_category = {
+                'cpu': 'CPU',
+                'mem': 'memory',
+                'net:': 'network I/O',
+                'statfs': 'storage space',
+            }
+            for task_stat in self.task_stats.values():
+                for category in task_stat.keys():
+                    for checkcat in missing_category:
+                        if checkcat.endswith(':'):
+                            if category.startswith(checkcat):
+                                missing_category.pop(checkcat)
+                                break
+                        else:
+                            if category == checkcat:
+                                missing_category.pop(checkcat)
+                                break
+            for catlabel in missing_category.values():
+                logger.warning('%s: %s stats are missing -- possible cluster configuration issue',
+                            self.label, catlabel)
 
     def long_label(self):
         label = self.label
@@ -302,7 +259,16 @@ class Summarizer(object):
             self._recommend_gen(lambda x: "#!! "+x))) + "\n"
 
     def html_report(self):
-        return WEBCHART_CLASS(self.label, [self]).html()
+        tophtml = """{}\n<table class='aggtable'><tbody>{}</tbody></table>\n""".format(
+            "\n".join(self._recommend_gen(lambda x: "<p>{}</p>".format(x))),
+            "\n".join(self._text_report_agg_gen(lambda x: "<tr><th>{}</th><td>{}{}</td></tr>".format(*x))))
+
+        bottomhtml = """<table class='metricstable'><tbody>{}</tbody></table>\n""".format(
+            "\n".join(self._text_report_table_gen(lambda x: "<tr><th>{}</th><th>{}</th><th>{}</th><th>{}</th><th>{}</th></tr>".format(*x),
+                                                        lambda x: "<tr><td>{}</td><td>{}</td><td>{}</td><td>{}</td><td>{}</td></tr>".format(*x))))
+        label = self.long_label()
+
+        return WEBCHART_CLASS(label, [self]).html(tophtml, bottomhtml)
 
     def _text_report_table_gen(self, headerformat, rowformat):
         yield headerformat(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
@@ -356,7 +322,7 @@ class Summarizer(object):
             ('Requested CPU cores',
              self.existing_constraints.get(self._map_runtime_constraint('vcpus')),
              None,
-             ''),
+             '') if self.existing_constraints.get(self._map_runtime_constraint('vcpus')) else None,
 
             ('Instance VCPUs',
              self.node_info.get('VCPUs'),
@@ -371,12 +337,12 @@ class Summarizer(object):
             ('Requested RAM',
              self.existing_constraints.get(self._map_runtime_constraint('ram')),
              lambda x: x / 2**20,
-             'MB'),
+             'MB') if self.existing_constraints.get(self._map_runtime_constraint('ram')) else None,
 
             ('Maximum RAM request for this instance type',
-             (self.node_info.get('RAM') - self.arv_config.get('Containers', {}).get('ReserveExtraRAM', {}))*.95,
+             (self.node_info.get('RAM') - self.arv_config.get('Containers', {}).get('ReserveExtraRAM', 0))*.95,
              lambda x: x / 2**20,
-             'MB'),
+             'MB') if self.node_info.get('RAM') else None,
 
             ('Max network traffic{}'.format(by_single_task),
              self.stats_max['net:eth0']['tx+rx'] +
@@ -429,6 +395,11 @@ class Summarizer(object):
 
     def _recommend_gen(self, recommendformat):
         # TODO recommend fixing job granularity if elapsed time is too short
+
+        if self.stats_max['time'].get('elapsed', 0) <= 20:
+            # Not enough data
+            return []
+
         return itertools.chain(
             self._recommend_cpu(recommendformat),
             self._recommend_ram(recommendformat),
@@ -451,7 +422,7 @@ class Summarizer(object):
         if asked_cores is None:
             asked_cores = 1
         # TODO: This should be more nuanced in cases where max >> avg
-        if used_cores < asked_cores:
+        if used_cores < (asked_cores*.5):
             yield recommendformat(
                 '{} max CPU usage was {}% -- '
                 'try reducing runtime_constraints to "{}":{}'
@@ -501,6 +472,8 @@ class Summarizer(object):
         if used_bytes == float('-Inf'):
             logger.warning('%s: no memory usage data', self.label)
             return
+        if not self.existing_constraints.get(constraint_key):
+            return
         used_mib = math.ceil(float(used_bytes) / MB)
         asked_mib = self.existing_constraints.get(constraint_key) / MB
 
@@ -519,39 +492,42 @@ class Summarizer(object):
                 recommend_mib)
 
     def _recommend_keep_cache(self, recommendformat):
-        """Recommend increasing keep cache if utilization < 80%"""
+        """Recommend increasing keep cache if utilization < 50%.
+
+        This means the amount of data returned to the program is less
+        than 50% of the amount of data actually downloaded by
+        arv-mount.
+        """
+
         constraint_key = self._map_runtime_constraint('keep_cache_ram')
         if self.job_tot['net:keep0']['rx'] == 0:
             return
         utilization = (float(self.job_tot['blkio:0:0']['read']) /
                        float(self.job_tot['net:keep0']['rx']))
         # FIXME: the default on this get won't work correctly
-        asked_cache = self.existing_constraints.get(constraint_key, 256) * self._runtime_constraint_mem_unit()
+        asked_cache = self.existing_constraints.get('keep_cache_ram') or self.existing_constraints.get('keep_cache_disk')
 
-        if utilization < 0.8:
+        if utilization < 0.5:
             yield recommendformat(
                 '{} Keep cache utilization was {:.2f}% -- '
-                'try doubling runtime_constraints to "{}":{} (or more)'
+                'try increasing keep_cache to {} MB'
             ).format(
                 self.label,
                 utilization * 100.0,
-                constraint_key,
-                math.ceil(asked_cache * 2 / self._runtime_constraint_mem_unit()))
+                math.ceil((asked_cache * 2) / (1024*1024)))
 
 
     def _recommend_temp_disk(self, recommendformat):
-        """Recommend decreasing temp disk if utilization < 50%"""
-        total = float(self.job_tot['statfs']['total'])
-        utilization = (float(self.job_tot['statfs']['used']) / total) if total > 0 else 0.0
+        """This recommendation is disabled for the time being.  It was
+        using the total disk on the node and not the amount of disk
+        requested, so it would trigger a false positive basically
+        every time.  To get the amount of disk requested we need to
+        fish it out of the mounts, which is extra work I don't want do
+        right now.  You can find the old code at commit 616d135e77
 
-        if utilization < 50.8 and total > 0:
-            yield recommendformat(
-                '{} max temp disk utilization was {:.0f}% of {:.0f} MiB -- '
-                'consider reducing "tmpdirMin" and/or "outdirMin"'
-            ).format(
-                self.label,
-                utilization * 100.0,
-                total / MB)
+        """
+
+        return []
 
 
     def _format(self, val):
@@ -566,18 +542,11 @@ class Summarizer(object):
     def _runtime_constraint_mem_unit(self):
         if hasattr(self, 'runtime_constraint_mem_unit'):
             return self.runtime_constraint_mem_unit
-        elif self.detected_crunch1:
-            return JobSummarizer.runtime_constraint_mem_unit
         else:
             return ContainerRequestSummarizer.runtime_constraint_mem_unit
 
     def _map_runtime_constraint(self, key):
-        if hasattr(self, 'map_runtime_constraint'):
-            return self.map_runtime_constraint[key]
-        elif self.detected_crunch1:
-            return JobSummarizer.map_runtime_constraint[key]
-        else:
-            return key
+        return key
 
 
 class CollectionSummarizer(Summarizer):
@@ -609,14 +578,6 @@ def NewSummarizer(process_or_uuid, **kwargs):
         if process is None:
             process = arv.container_requests().get(uuid=uuid).execute()
         klass = ContainerRequestTreeSummarizer
-    elif '-8i9sb-' in uuid:
-        if process is None:
-            process = arv.jobs().get(uuid=uuid).execute()
-        klass = JobTreeSummarizer
-    elif '-d1hrv-' in uuid:
-        if process is None:
-            process = arv.pipeline_instances().get(uuid=uuid).execute()
-        klass = PipelineSummarizer
     elif '-4zz18-' in uuid:
         return CollectionSummarizer(collection_id=uuid)
     else:
@@ -638,7 +599,10 @@ class ProcessSummarizer(Summarizer):
         log_collection = self.process.get('log', self.process.get('log_uuid'))
         if log_collection and self.process.get('state') != 'Uncommitted': # arvados.util.CR_UNCOMMITTED:
             try:
-                rdr = crunchstat_summary.reader.CollectionReader(log_collection, api_client=arv)
+                rdr = crunchstat_summary.reader.CollectionReader(
+                    log_collection,
+                    api_client=arv,
+                    collection_object=kwargs.get("collection_object"))
             except arvados.errors.NotFoundError as e:
                 logger.warning("Trying event logs after failing to read "
                                "log collection %s: %s", self.process['log'], e)
@@ -653,16 +617,6 @@ class ProcessSummarizer(Summarizer):
         self.cost = self.process.get('cost', 0)
 
 
-
-class JobSummarizer(ProcessSummarizer):
-    runtime_constraint_mem_unit = MB
-    map_runtime_constraint = {
-        'keep_cache_ram': 'keep_cache_mb_per_task',
-        'ram': 'min_ram_mb_per_node',
-        'vcpus': 'min_cores_per_node',
-    }
-
-
 class ContainerRequestSummarizer(ProcessSummarizer):
     runtime_constraint_mem_unit = 1
 
@@ -733,51 +687,6 @@ class MultiSummarizer(object):
         return WEBCHART_CLASS(label, iter(self._descendants().values())).html(tophtml, bottomhtml)
 
 
-class JobTreeSummarizer(MultiSummarizer):
-    """Summarizes a job and all children listed in its components field."""
-    def __init__(self, job, label=None, **kwargs):
-        arv = kwargs.get("arv") or arvados.api('v1')
-        label = label or job.get('name', job['uuid'])
-        children = collections.OrderedDict()
-        children[job['uuid']] = JobSummarizer(job, label=label, **kwargs)
-        if job.get('components', None):
-            preloaded = {}
-            for j in arv.jobs().index(
-                    limit=len(job['components']),
-                    filters=[['uuid','in',list(job['components'].values())]]).execute()['items']:
-                preloaded[j['uuid']] = j
-            for cname in sorted(job['components'].keys()):
-                child_uuid = job['components'][cname]
-                j = (preloaded.get(child_uuid) or
-                     arv.jobs().get(uuid=child_uuid).execute())
-                children[child_uuid] = JobTreeSummarizer(job=j, label=cname, **kwargs)
-
-        super(JobTreeSummarizer, self).__init__(
-            children=children,
-            label=label,
-            **kwargs)
-
-
-class PipelineSummarizer(MultiSummarizer):
-    def __init__(self, instance, **kwargs):
-        children = collections.OrderedDict()
-        for cname, component in instance['components'].items():
-            if 'job' not in component:
-                logger.warning(
-                    "%s: skipping component with no job assigned", cname)
-            else:
-                logger.info(
-                    "%s: job %s", cname, component['job']['uuid'])
-                summarizer = JobTreeSummarizer(component['job'], label=cname, **kwargs)
-                summarizer.label = '{} {}'.format(
-                    cname, component['job']['uuid'])
-                children[cname] = summarizer
-        super(PipelineSummarizer, self).__init__(
-            children=children,
-            label=instance['uuid'],
-            **kwargs)
-
-
 class ContainerRequestTreeSummarizer(MultiSummarizer):
     def __init__(self, root, skip_child_jobs=False, **kwargs):
         arv = kwargs.get("arv") or arvados.api('v1')