19744: Remove jobs/pipeline templates from crunchstat-summary
[arvados.git] / tools / crunchstat-summary / crunchstat_summary / summarizer.py
index 9b6e5f16905054749d34004243792b5a9e24474c..65cee6c1765e0753e634ed7c4e6aa9fe49dfc56c 100644 (file)
@@ -66,6 +66,8 @@ class Summarizer(object):
         # constructor will overwrite this with something useful.
         self.existing_constraints = {}
         self.node_info = {}
+        self.cost = 0
+        self.arv_config = {}
 
         logger.info("%s: logdata %s", self.label, logdata)
 
@@ -75,82 +77,23 @@ class Summarizer(object):
             self._run(logdata)
 
     def _run(self, logdata):
-        self.detected_crunch1 = False
-
         if not self.node_info:
             self.node_info = logdata.node_info()
 
         for line in logdata:
-            if not self.detected_crunch1 and '-8i9sb-' in line:
-                self.detected_crunch1 = True
-
-            if self.detected_crunch1:
-                m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
-                if m:
-                    seq = int(m.group('seq'))
-                    uuid = m.group('task_uuid')
-                    self.seq_to_uuid[seq] = uuid
-                    logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
-                    continue
-
-                m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
-                if m:
-                    task_id = self.seq_to_uuid[int(m.group('seq'))]
-                    elapsed = int(m.group('elapsed'))
-                    self.task_stats[task_id]['time'] = {'elapsed': elapsed}
-                    if elapsed > self.stats_max['time']['elapsed']:
-                        self.stats_max['time']['elapsed'] = elapsed
-                    continue
-
-                m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
-                if m:
-                    uuid = m.group('uuid')
-                    if self._skip_child_jobs:
-                        logger.warning('%s: omitting stats from child job %s'
-                                       ' because --skip-child-jobs flag is on',
-                                       self.label, uuid)
-                        continue
-                    logger.debug('%s: follow %s', self.label, uuid)
-                    child_summarizer = NewSummarizer(uuid)
-                    child_summarizer.stats_max = self.stats_max
-                    child_summarizer.task_stats = self.task_stats
-                    child_summarizer.tasks = self.tasks
-                    child_summarizer.starttime = self.starttime
-                    child_summarizer.run()
-                    logger.debug('%s: done %s', self.label, uuid)
-                    continue
-
-                # 2017-12-02_17:15:08 e51c5-8i9sb-mfp68stkxnqdd6m 63676 0 stderr crunchstat: keepcalls 0 put 2576 get -- interval 10.0000 seconds 0 put 2576 get
-                m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr (?P<crunchstat>crunchstat: )(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
-                if not m:
-                    continue
-            else:
-                # crunch2
-                # 2017-12-01T16:56:24.723509200Z crunchstat: keepcalls 0 put 3 get -- interval 10.0000 seconds 0 put 3 get
-                m = re.search(r'^(?P<timestamp>\S+) (?P<crunchstat>crunchstat: )?(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
-                if not m:
-                    continue
+            # crunch2
+            # 2017-12-01T16:56:24.723509200Z crunchstat: keepcalls 0 put 3 get -- interval 10.0000 seconds 0 put 3 get
+            m = re.search(r'^(?P<timestamp>\S+) (?P<crunchstat>crunchstat: )?(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
+            if not m:
+                continue
 
             if self.label is None:
                 try:
                     self.label = m.group('job_uuid')
                 except IndexError:
                     self.label = 'label #1'
-            category = m.group('category')
-            if category.endswith(':'):
-                # "stderr crunchstat: notice: ..."
-                continue
-            elif category in ('error', 'caught'):
-                continue
-            elif category in ('read', 'open', 'cgroup', 'CID', 'Running'):
-                # "stderr crunchstat: read /proc/1234/net/dev: ..."
-                # (old logs are less careful with unprefixed error messages)
-                continue
 
-            if self.detected_crunch1:
-                task_id = self.seq_to_uuid[int(m.group('seq'))]
-            else:
-                task_id = 'container'
+            task_id = 'container'
             task = self.tasks[task_id]
 
             # Use the first and last crunchstat timestamps as
@@ -179,12 +122,23 @@ class Summarizer(object):
             if self.finishtime is None or timestamp > self.finishtime:
                 self.finishtime = timestamp
 
-            if (not self.detected_crunch1) and task.starttime is not None and task.finishtime is not None:
+            if task.starttime is not None and task.finishtime is not None:
                 elapsed = (task.finishtime - task.starttime).seconds
                 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
                 if elapsed > self.stats_max['time']['elapsed']:
                     self.stats_max['time']['elapsed'] = elapsed
 
+            category = m.group('category')
+            if category.endswith(':'):
+                # "stderr crunchstat: notice: ..."
+                continue
+            elif category in ('error', 'caught'):
+                continue
+            elif category in ('read', 'open', 'cgroup', 'CID', 'Running'):
+                # "stderr crunchstat: read /proc/1234/net/dev: ..."
+                # (old logs are less careful with unprefixed error messages)
+                continue
+
             this_interval_s = None
             for group in ['current', 'interval']:
                 if not m.group(group):
@@ -359,7 +313,7 @@ class Summarizer(object):
             ('Requested CPU cores',
              self.existing_constraints.get(self._map_runtime_constraint('vcpus')),
              None,
-             ''),
+             '') if self.existing_constraints.get(self._map_runtime_constraint('vcpus')) else None,
 
             ('Instance VCPUs',
              self.node_info.get('VCPUs'),
@@ -374,12 +328,12 @@ class Summarizer(object):
             ('Requested RAM',
              self.existing_constraints.get(self._map_runtime_constraint('ram')),
              lambda x: x / 2**20,
-             'MB'),
+             'MB') if self.existing_constraints.get(self._map_runtime_constraint('ram')) else None,
 
             ('Maximum RAM request for this instance type',
-             (self.node_info.get('RAM') - self.arv_config.get('Containers', {}).get('ReserveExtraRAM', {}))*.95,
+             (self.node_info.get('RAM') - self.arv_config.get('Containers', {}).get('ReserveExtraRAM', 0))*.95,
              lambda x: x / 2**20,
-             'MB'),
+             'MB') if self.node_info.get('RAM') else None,
 
             ('Max network traffic{}'.format(by_single_task),
              self.stats_max['net:eth0']['tx+rx'] +
@@ -509,6 +463,8 @@ class Summarizer(object):
         if used_bytes == float('-Inf'):
             logger.warning('%s: no memory usage data', self.label)
             return
+        if not self.existing_constraints.get(constraint_key):
+            return
         used_mib = math.ceil(float(used_bytes) / MB)
         asked_mib = self.existing_constraints.get(constraint_key) / MB
 
@@ -574,18 +530,11 @@ class Summarizer(object):
     def _runtime_constraint_mem_unit(self):
         if hasattr(self, 'runtime_constraint_mem_unit'):
             return self.runtime_constraint_mem_unit
-        elif self.detected_crunch1:
-            return JobSummarizer.runtime_constraint_mem_unit
         else:
             return ContainerRequestSummarizer.runtime_constraint_mem_unit
 
     def _map_runtime_constraint(self, key):
-        if hasattr(self, 'map_runtime_constraint'):
-            return self.map_runtime_constraint[key]
-        elif self.detected_crunch1:
-            return JobSummarizer.map_runtime_constraint[key]
-        else:
-            return key
+        return key
 
 
 class CollectionSummarizer(Summarizer):
@@ -617,14 +566,6 @@ def NewSummarizer(process_or_uuid, **kwargs):
         if process is None:
             process = arv.container_requests().get(uuid=uuid).execute()
         klass = ContainerRequestTreeSummarizer
-    elif '-8i9sb-' in uuid:
-        if process is None:
-            process = arv.jobs().get(uuid=uuid).execute()
-        klass = JobTreeSummarizer
-    elif '-d1hrv-' in uuid:
-        if process is None:
-            process = arv.pipeline_instances().get(uuid=uuid).execute()
-        klass = PipelineSummarizer
     elif '-4zz18-' in uuid:
         return CollectionSummarizer(collection_id=uuid)
     else:
@@ -646,7 +587,10 @@ class ProcessSummarizer(Summarizer):
         log_collection = self.process.get('log', self.process.get('log_uuid'))
         if log_collection and self.process.get('state') != 'Uncommitted': # arvados.util.CR_UNCOMMITTED:
             try:
-                rdr = crunchstat_summary.reader.CollectionReader(log_collection, api_client=arv)
+                rdr = crunchstat_summary.reader.CollectionReader(
+                    log_collection,
+                    api_client=arv,
+                    collection_object=kwargs.get("collection_object"))
             except arvados.errors.NotFoundError as e:
                 logger.warning("Trying event logs after failing to read "
                                "log collection %s: %s", self.process['log'], e)
@@ -661,16 +605,6 @@ class ProcessSummarizer(Summarizer):
         self.cost = self.process.get('cost', 0)
 
 
-
-class JobSummarizer(ProcessSummarizer):
-    runtime_constraint_mem_unit = MB
-    map_runtime_constraint = {
-        'keep_cache_ram': 'keep_cache_mb_per_task',
-        'ram': 'min_ram_mb_per_node',
-        'vcpus': 'min_cores_per_node',
-    }
-
-
 class ContainerRequestSummarizer(ProcessSummarizer):
     runtime_constraint_mem_unit = 1
 
@@ -741,51 +675,6 @@ class MultiSummarizer(object):
         return WEBCHART_CLASS(label, iter(self._descendants().values())).html(tophtml, bottomhtml)
 
 
-class JobTreeSummarizer(MultiSummarizer):
-    """Summarizes a job and all children listed in its components field."""
-    def __init__(self, job, label=None, **kwargs):
-        arv = kwargs.get("arv") or arvados.api('v1')
-        label = label or job.get('name', job['uuid'])
-        children = collections.OrderedDict()
-        children[job['uuid']] = JobSummarizer(job, label=label, **kwargs)
-        if job.get('components', None):
-            preloaded = {}
-            for j in arv.jobs().index(
-                    limit=len(job['components']),
-                    filters=[['uuid','in',list(job['components'].values())]]).execute()['items']:
-                preloaded[j['uuid']] = j
-            for cname in sorted(job['components'].keys()):
-                child_uuid = job['components'][cname]
-                j = (preloaded.get(child_uuid) or
-                     arv.jobs().get(uuid=child_uuid).execute())
-                children[child_uuid] = JobTreeSummarizer(job=j, label=cname, **kwargs)
-
-        super(JobTreeSummarizer, self).__init__(
-            children=children,
-            label=label,
-            **kwargs)
-
-
-class PipelineSummarizer(MultiSummarizer):
-    def __init__(self, instance, **kwargs):
-        children = collections.OrderedDict()
-        for cname, component in instance['components'].items():
-            if 'job' not in component:
-                logger.warning(
-                    "%s: skipping component with no job assigned", cname)
-            else:
-                logger.info(
-                    "%s: job %s", cname, component['job']['uuid'])
-                summarizer = JobTreeSummarizer(component['job'], label=cname, **kwargs)
-                summarizer.label = '{} {}'.format(
-                    cname, component['job']['uuid'])
-                children[cname] = summarizer
-        super(PipelineSummarizer, self).__init__(
-            children=children,
-            label=instance['uuid'],
-            **kwargs)
-
-
 class ContainerRequestTreeSummarizer(MultiSummarizer):
     def __init__(self, root, skip_child_jobs=False, **kwargs):
         arv = kwargs.get("arv") or arvados.api('v1')