10472: add latent support for rolled up stats
[arvados.git] / tools / crunchstat-summary / crunchstat_summary / summarizer.py
index 6d026d3763a9b797ee3cf3bc249fbf279d22365b..f1c97c282c90612789204672e793f7be935939ae 100644 (file)
@@ -60,6 +60,21 @@ class Summarizer(object):
 
         logger.debug("%s: logdata %s", self.label, logdata)
 
+    def run_child(self, uuid):
+        if self._skip_child_jobs:
+            logger.warning('%s: omitting stats from child job %s'
+                           ' because --skip-child-jobs flag is on',
+                           self.label, uuid)
+            return
+        logger.debug('%s: follow %s', self.label, uuid)
+        child_summarizer = JobSummarizer(uuid)
+        child_summarizer.stats_max = self.stats_max
+        child_summarizer.task_stats = self.task_stats
+        child_summarizer.tasks = self.tasks
+        child_summarizer.starttime = self.starttime
+        child_summarizer.run()
+        logger.debug('%s: done %s', self.label, uuid)
+
     def run(self):
         logger.debug("%s: parsing logdata %s", self.label, self._logdata)
         for line in self._logdata:
@@ -80,22 +95,11 @@ class Summarizer(object):
                     self.stats_max['time']['elapsed'] = elapsed
                 continue
 
+            # Old style job logs only - newer style uses job['components']
+            uuid = None
             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
             if m:
-                uuid = m.group('uuid')
-                if self._skip_child_jobs:
-                    logger.warning('%s: omitting stats from child job %s'
-                                   ' because --skip-child-jobs flag is on',
-                                   self.label, uuid)
-                    continue
-                logger.debug('%s: follow %s', self.label, uuid)
-                child_summarizer = JobSummarizer(uuid)
-                child_summarizer.stats_max = self.stats_max
-                child_summarizer.task_stats = self.task_stats
-                child_summarizer.tasks = self.tasks
-                child_summarizer.starttime = self.starttime
-                child_summarizer.run()
-                logger.debug('%s: done %s', self.label, uuid)
+                self.run_child(m.group('uuid'))
                 continue
 
             m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
@@ -178,7 +182,12 @@ class Summarizer(object):
                             self.stats_max[category][stat] = val
             except Exception as e:
                 logger.info('Skipping malformed line: {}Error was: {}\n'.format(line, e))
-        logger.debug('%s: done parsing', self.label)
+        logger.debug('%s: done parsing log', self.label)
+
+        # Enabling this will roll up stats for all subjobs into the parent job
+        if False and 'components' in self.job:
+            for cname, component in self.job['components'].iteritems():
+                self.run_child(component)
 
         self.job_tot = collections.defaultdict(
             functools.partial(collections.defaultdict, int))
@@ -405,8 +414,8 @@ class JobSummarizer(Summarizer):
 
 class PipelineSummarizer(object):
     def __init__(self, pipeline_instance_uuid, **kwargs):
-        arv = arvados.api('v1', model=OrderedJsonModel())
-        instance = arv.pipeline_instances().get(
+        self.arv = arvados.api('v1', model=OrderedJsonModel())
+        instance = self.arv.pipeline_instances().get(
             uuid=pipeline_instance_uuid).execute()
         self.summarizers = collections.OrderedDict()
         for cname, component in instance['components'].iteritems():
@@ -414,14 +423,20 @@ class PipelineSummarizer(object):
                 logger.warning(
                     "%s: skipping component with no job assigned", cname)
             else:
-                logger.info(
-                    "%s: job %s", cname, component['job']['uuid'])
-                summarizer = JobSummarizer(component['job'], **kwargs)
-                summarizer.label = '{} {}'.format(
-                    cname, component['job']['uuid'])
-                self.summarizers[cname] = summarizer
+                self.summarize_job(cname, component['job'], **kwargs)
         self.label = pipeline_instance_uuid
 
+    def summarize_job(self, cname, job, **kwargs):
+        uuid = job['uuid']
+        logger.info("%s: job %s", cname, uuid)
+        summarizer = JobSummarizer(job, **kwargs)
+        summarizer.label = '{} {}'.format(cname, uuid)
+        self.summarizers[cname] = summarizer
+        if 'components' in job:
+            for cname, uuid in job['components'].iteritems():
+                subjob = self.arv.jobs().get(uuid=uuid).execute()
+                self.summarize_job(cname, subjob, **kwargs)
+
     def run(self):
         threads = []
         for summarizer in self.summarizers.itervalues():