8123: Add option to include stats from child jobs.
authorTom Clegg <tom@curoverse.com>
Thu, 24 Dec 2015 18:51:35 +0000 (13:51 -0500)
committerTom Clegg <tom@curoverse.com>
Tue, 12 Jan 2016 15:47:21 +0000 (10:47 -0500)
tools/crunchstat-summary/crunchstat_summary/command.py
tools/crunchstat-summary/crunchstat_summary/summarizer.py
tools/crunchstat-summary/tests/logfile_20151204190335.txt.gz.report
tools/crunchstat-summary/tests/logfile_20151210063411.txt.gz.report
tools/crunchstat-summary/tests/logfile_20151210063439.txt.gz.report
tools/crunchstat-summary/tests/test_examples.py

index 2c7574c88b9b09176bba85d934760749237a8267..a5339ddf3665677e36cbe99bef207e44527917e3 100644 (file)
@@ -20,6 +20,9 @@ class ArgumentParser(argparse.ArgumentParser):
         src.add_argument(
             '--log-file', type=str,
             help='Read log data from a regular file')
+        self.add_argument(
+            '--include-child-jobs', action='store_true',
+            help='Include stats from child jobs')
         self.add_argument(
             '--format', type=str, choices=('html', 'text'), default='text',
             help='Report format')
@@ -40,18 +43,21 @@ class Command(object):
             logger.setLevel(logging.INFO)
 
     def run(self):
+        kwargs = {
+            'include_child_jobs': self.args.include_child_jobs,
+        }
         if self.args.pipeline_instance:
-            self.summer = summarizer.PipelineSummarizer(self.args.pipeline_instance)
+            self.summer = summarizer.PipelineSummarizer(self.args.pipeline_instance, **kwargs)
         elif self.args.job:
-            self.summer = summarizer.JobSummarizer(self.args.job)
+            self.summer = summarizer.JobSummarizer(self.args.job, **kwargs)
         elif self.args.log_file:
             if self.args.log_file.endswith('.gz'):
                 fh = gzip.open(self.args.log_file)
             else:
                 fh = open(self.args.log_file)
-            self.summer = summarizer.Summarizer(fh)
+            self.summer = summarizer.Summarizer(fh, **kwargs)
         else:
-            self.summer = summarizer.Summarizer(sys.stdin)
+            self.summer = summarizer.Summarizer(sys.stdin, **kwargs)
         return self.summer.run()
 
     def report(self):
index ba1919fed855f446b9babcb97dd8689b0112b468..ccee1e24d20e045c2f6cfd9b171167a182e57c96 100644 (file)
@@ -28,15 +28,14 @@ class Task(object):
 class Summarizer(object):
     existing_constraints = {}
 
-    def __init__(self, logdata, label=None):
+    def __init__(self, logdata, label=None, include_child_jobs=True):
         self._logdata = logdata
+
         self.label = label
         self.starttime = None
         self.finishtime = None
-        logger.debug("%s: logdata %s", self.label, repr(logdata))
+        self._include_child_jobs = include_child_jobs
 
-    def run(self):
-        logger.debug("%s: parsing log data", self.label)
         # stats_max: {category: {stat: val}}
         self.stats_max = collections.defaultdict(
             functools.partial(collections.defaultdict,
@@ -44,19 +43,52 @@ class Summarizer(object):
         # task_stats: {task_id: {category: {stat: val}}}
         self.task_stats = collections.defaultdict(
             functools.partial(collections.defaultdict, dict))
+
+        self.seq_to_uuid = {}
         self.tasks = collections.defaultdict(Task)
+
+        logger.debug("%s: logdata %s", self.label, repr(logdata))
+
+    def run(self):
+        logger.debug("%s: parsing log data", self.label)
         for line in self._logdata:
+            m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
+            if m:
+                seq = int(m.group('seq'))
+                uuid = m.group('task_uuid')
+                self.seq_to_uuid[seq] = uuid
+                logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
+                continue
+
             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) success in (?P<elapsed>\d+) seconds', line)
             if m:
-                task_id = m.group('seq')
+                task_id = self.seq_to_uuid[int(m.group('seq'))]
                 elapsed = int(m.group('elapsed'))
                 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
                 if elapsed > self.stats_max['time']['elapsed']:
                     self.stats_max['time']['elapsed'] = elapsed
                 continue
+
+            m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
+            if m:
+                uuid = m.group('uuid')
+                if not self._include_child_jobs:
+                    logger.warning('%s: omitting %s (try --include-child-job)',
+                                   self.label, uuid)
+                    continue
+                logger.debug('%s: follow %s', self.label, uuid)
+                child_summarizer = JobSummarizer(uuid)
+                child_summarizer.stats_max = self.stats_max
+                child_summarizer.task_stats = self.task_stats
+                child_summarizer.tasks = self.tasks
+                child_summarizer.run()
+                logger.debug('%s: done %s', self.label, uuid)
+                continue
+
             m = re.search(r'^(?P<timestamp>\S+) (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
             if not m:
                 continue
+
             if self.label is None:
                 self.label = m.group('job_uuid')
                 logger.debug('%s: using job uuid as label', self.label)
@@ -65,7 +97,7 @@ class Summarizer(object):
                 continue
             elif m.group('category') == 'error':
                 continue
-            task_id = m.group('seq')
+            task_id = self.seq_to_uuid[int(m.group('seq'))]
             task = self.tasks[task_id]
 
             # Use the first and last crunchstat timestamps as
@@ -126,6 +158,8 @@ class Summarizer(object):
                         self.task_stats[task_id][category][stat] = val
                     if val > self.stats_max[category][stat]:
                         self.stats_max[category][stat] = val
+        logger.debug('%s: done parsing', self.label)
+
         self.job_tot = collections.defaultdict(
             functools.partial(collections.defaultdict, int))
         for task_id, task_stat in self.task_stats.iteritems():
@@ -135,6 +169,7 @@ class Summarizer(object):
                         # meaningless stats like 16 cpu cores x 5 tasks = 80
                         continue
                     self.job_tot[category][stat] += val
+        logger.debug('%s: done totals', self.label)
 
     def long_label(self):
         label = self.label
@@ -169,6 +204,9 @@ class Summarizer(object):
                 tot = self._format(self.job_tot[category].get(stat, '-'))
                 yield "\t".join([category, stat, str(val), max_rate, tot])
         for args in (
+                ('Number of tasks: {}',
+                 len(self.tasks),
+                 None),
                 ('Max CPU time spent by a single task: {}s',
                  self.stats_max['cpu']['user+sys'],
                  None),
@@ -249,7 +287,8 @@ class Summarizer(object):
 
 
 class CollectionSummarizer(Summarizer):
-    def __init__(self, collection_id):
+    def __init__(self, collection_id, **kwargs):
+        logger.debug('load collection %s', collection_id)
         collection = arvados.collection.CollectionReader(collection_id)
         filenames = [filename for filename in collection]
         if len(filenames) != 1:
@@ -257,12 +296,12 @@ class CollectionSummarizer(Summarizer):
                 "collection {} has {} files; need exactly one".format(
                     collection_id, len(filenames)))
         super(CollectionSummarizer, self).__init__(
-            collection.open(filenames[0]))
+            collection.open(filenames[0]), **kwargs)
         self.label = collection_id
 
 
 class JobSummarizer(CollectionSummarizer):
-    def __init__(self, job):
+    def __init__(self, job, **kwargs):
         arv = arvados.api('v1')
         if isinstance(job, str):
             self.job = arv.jobs().get(uuid=job).execute()
@@ -274,12 +313,12 @@ class JobSummarizer(CollectionSummarizer):
             raise ValueError(
                 "job {} has no log; live summary not implemented".format(
                     self.job['uuid']))
-        super(JobSummarizer, self).__init__(self.job['log'])
+        super(JobSummarizer, self).__init__(self.job['log'], **kwargs)
         self.label = self.job['uuid']
 
 
 class PipelineSummarizer():
-    def __init__(self, pipeline_instance_uuid):
+    def __init__(self, pipeline_instance_uuid, **kwargs):
         arv = arvados.api('v1', model=OrderedJsonModel())
         instance = arv.pipeline_instances().get(
             uuid=pipeline_instance_uuid).execute()
@@ -295,7 +334,7 @@ class PipelineSummarizer():
             else:
                 logger.info(
                     "%s: logdata %s", cname, component['job']['log'])
-                summarizer = JobSummarizer(component['job'])
+                summarizer = JobSummarizer(component['job'], **kwargs)
                 summarizer.label = cname
                 self.summarizers[cname] = summarizer
         self.label = pipeline_instance_uuid
index b12e93117f3e1488d6536d2b0181c27986003d8a..0ba0181c1215620c7f0a4334c2eb77dc3d64d7a6 100644 (file)
@@ -22,11 +22,12 @@ net:keep0   rx      0       0.00    0
 net:keep0      tx      0       0.00    0
 net:keep0      tx+rx   0       0.00    0
 time   elapsed 80      -       80
+# Number of tasks: 1
 # Max CPU time spent by a single task: 5.75s
 # Max CPU usage in a single interval: 13.00%
 # Overall CPU usage: 7.19%
 # Max memory used by a single task: 0.35GB
 # Max network traffic in a single task: 1.79GB
 # Max network speed in a single interval: 42.58MB/s
-#!! job max CPU usage was 13% -- try runtime_constraints "min_cores_per_node":1
-#!! job max RSS was 334 MiB -- try runtime_constraints "min_ram_mb_per_node":972
+#!! 4xphq-8i9sb-jq0ekny1xou3zoh max CPU usage was 13% -- try runtime_constraints "min_cores_per_node":1
+#!! 4xphq-8i9sb-jq0ekny1xou3zoh max RSS was 334 MiB -- try runtime_constraints "min_ram_mb_per_node":972
index 8e1a2d893777a98e3583ba449af680d2bd1e3a57..0641bbac9f6c4f88c008adb279440e88be4f628a 100644 (file)
@@ -11,8 +11,9 @@ net:eth0      rx      90      -       90
 net:eth0       tx      90      -       90
 net:eth0       tx+rx   180     -       180
 time   elapsed 2       -       4
+# Number of tasks: 2
 # Max CPU time spent by a single task: 0.00s
 # Overall CPU usage: 0.00%
 # Max memory used by a single task: 0.00GB
 # Max network traffic in a single task: 0.00GB
-#!! job max RSS was 1 MiB -- try runtime_constraints "min_ram_mb_per_node":972
+#!! 4xphq-8i9sb-zvb2ocfycpomrup max RSS was 1 MiB -- try runtime_constraints "min_ram_mb_per_node":972
index dbe9321a040183150733a5742e70eb978d662c5e..19fe0ed764d4fc2c229c348161c0079d9d67b1dd 100644 (file)
@@ -11,8 +11,9 @@ net:eth0      rx      90      -       90
 net:eth0       tx      90      -       90
 net:eth0       tx+rx   180     -       180
 time   elapsed 2       -       3
+# Number of tasks: 2
 # Max CPU time spent by a single task: 0.00s
 # Overall CPU usage: 0.00%
 # Max memory used by a single task: 0.00GB
 # Max network traffic in a single task: 0.00GB
-#!! job max RSS was 1 MiB -- try runtime_constraints "min_ram_mb_per_node":972
+#!! 4xphq-8i9sb-v831jm2uq0g2g9x max RSS was 1 MiB -- try runtime_constraints "min_ram_mb_per_node":972
index 09e21a1a3a319e182ae7a157620c04d67998d81a..d35e81e8cd7dbe7af67613954afd45eb904f8a56 100644 (file)
@@ -35,7 +35,7 @@ class SummarizeFile(ReportDiff):
 
 
 class SummarizeJob(ReportDiff):
-    fake_job_uuid = 'zzzzz-8i9sb-jjjjjjjjjjjjjjj'
+    fake_job_uuid = '4xphq-8i9sb-jq0ekny1xou3zoh'
     fake_log_id = 'fake-log-collection-id'
     fake_job = {
         'uuid': fake_job_uuid,