8123: Add --pipeline-instance mode: generate a report for each finished component.
authorTom Clegg <tom@curoverse.com>
Mon, 21 Dec 2015 19:42:44 +0000 (14:42 -0500)
committerTom Clegg <tom@curoverse.com>
Tue, 12 Jan 2016 15:47:20 +0000 (10:47 -0500)
tools/crunchstat-summary/bin/crunchstat-summary
tools/crunchstat-summary/crunchstat_summary/command.py
tools/crunchstat-summary/crunchstat_summary/summarizer.py
tools/crunchstat-summary/tests/test_examples.py

index 662d7835cc0a1878cb3fff7fa2ef468ccc1f8087..f42d30b3c72035471b60f65334ba7a72ce23c00f 100755 (executable)
@@ -7,6 +7,6 @@ import crunchstat_summary.summarizer
 import sys
 
 args = crunchstat_summary.command.ArgumentParser().parse_args(sys.argv[1:])
-s = crunchstat_summary.summarizer.Summarizer(args)
+s = crunchstat_summary.command.Command(args).summarizer()
 s.run()
 print(s.report(), end='')
index 8186e5d7579e91c208e714abaeff3d7192c3ab16..fc37190d306c166139b1bc11dfc4bbfe5c0f9385 100644 (file)
@@ -1,4 +1,8 @@
 import argparse
+import gzip
+import sys
+
+from crunchstat_summary import summarizer
 
 
 class ArgumentParser(argparse.ArgumentParser):
@@ -9,6 +13,28 @@ class ArgumentParser(argparse.ArgumentParser):
         src.add_argument(
             '--job', type=str, metavar='UUID',
             help='Look up the specified job and read its log data from Keep')
+        src.add_argument(
+            '--pipeline-instance', type=str, metavar='UUID',
+            help='Summarize each component of the given pipeline instance')
         src.add_argument(
             '--log-file', type=str,
             help='Read log data from a regular file')
+
+
+class Command(object):
+    def __init__(self, args):
+        self.args = args
+
+    def summarizer(self):
+        if self.args.pipeline_instance:
+            return summarizer.PipelineSummarizer(self.args.pipeline_instance)
+        elif self.args.job:
+            return summarizer.JobSummarizer(self.args.job)
+        elif self.args.log_file:
+            if self.args.log_file.endswith('.gz'):
+                fh = gzip.open(self.args.log_file)
+            else:
+                fh = open(self.args.log_file)
+            return summarizer.Summarizer(fh)
+        else:
+            return summarizer.Summarizer(sys.stdin)
index ac0964b30e990724ecb5c7b16fb1cbd940bce25d..1b7f95075e51ba3614b1dc235d87c34a1973177c 100644 (file)
@@ -3,14 +3,13 @@ from __future__ import print_function
 import arvados
 import collections
 import functools
-import gzip
 import re
 import sys
 
 
 class Summarizer(object):
-    def __init__(self, args):
-        self.args = args
+    def __init__(self, logdata):
+        self._logdata = logdata
 
     def run(self):
         # stats_max: {category: {stat: val}}
@@ -20,7 +19,7 @@ class Summarizer(object):
         # task_stats: {task_id: {category: {stat: val}}}
         self.task_stats = collections.defaultdict(
             functools.partial(collections.defaultdict, dict))
-        for line in self._logdata():
+        for line in self._logdata:
             m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) success in (?P<elapsed>\d+) seconds', line)
             if m:
                 task_id = m.group('seq')
@@ -35,6 +34,8 @@ class Summarizer(object):
             if m.group('category').endswith(':'):
                 # "notice:" etc.
                 continue
+            elif m.group('category') == 'error':
+                continue
             task_id = m.group('seq')
             this_interval_s = None
             for group in ['current', 'interval']:
@@ -44,10 +45,15 @@ class Summarizer(object):
                 words = m.group(group).split(' ')
                 stats = {}
                 for val, stat in zip(words[::2], words[1::2]):
-                    if '.' in val:
-                        stats[stat] = float(val)
-                    else:
-                        stats[stat] = int(val)
+                    try:
+                        if '.' in val:
+                            stats[stat] = float(val)
+                        else:
+                            stats[stat] = int(val)
+                    except ValueError as e:
+                        raise ValueError(
+                            'Error parsing {} stat in "{}": {!r}'.format(
+                                stat, line, e))
                 if 'user' in stats or 'sys' in stats:
                     stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
                 if 'tx' in stats or 'rx' in stats:
@@ -127,25 +133,58 @@ class Summarizer(object):
         else:
             return '{}'.format(val)
 
-    def _logdata(self):
-        if self.args.log_file:
-            if self.args.log_file.endswith('.gz'):
-                return gzip.open(self.args.log_file)
-            else:
-                return open(self.args.log_file)
-        elif self.args.job:
-            arv = arvados.api('v1')
-            job = arv.jobs().get(uuid=self.args.job).execute()
-            if not job['log']:
-                raise ValueError(
-                    "job {} has no log; live summary not implemented".format(
-                        self.args.job))
-            collection = arvados.collection.CollectionReader(job['log'])
-            filenames = [filename for filename in collection]
-            if len(filenames) != 1:
-                raise ValueError(
-                    "collection {} has {} files; need exactly one".format(
-                        job.log, len(filenames)))
-            return collection.open(filenames[0])
+class CollectionSummarizer(Summarizer):
+    def __init__(self, collection_id):
+        collection = arvados.collection.CollectionReader(collection_id)
+        filenames = [filename for filename in collection]
+        if len(filenames) != 1:
+            raise ValueError(
+                "collection {} has {} files; need exactly one".format(
+                    collection_id, len(filenames)))
+        super(CollectionSummarizer, self).__init__(collection.open(filenames[0]))
+
+class JobSummarizer(CollectionSummarizer):
+    def __init__(self, job):
+        arv = arvados.api('v1')
+        if isinstance(job, str):
+            self.job = arv.jobs().get(uuid=job).execute()
         else:
-            return sys.stdin
+            self.job = job
+        if not self.job['log']:
+            raise ValueError(
+                "job {} has no log; live summary not implemented".format(
+                    self.job['uuid']))
+        super(JobSummarizer, self).__init__(self.job['log'])
+
+class PipelineSummarizer():
+    def __init__(self, pipeline_instance_uuid):
+        arv = arvados.api('v1')
+        instance = arv.pipeline_instances().get(
+            uuid=pipeline_instance_uuid).execute()
+        self.summarizers = collections.OrderedDict()
+        for cname, component in instance['components'].iteritems():
+            if 'job' not in component:
+                print("{}: skipping component with no job assigned".format(
+                    cname), file=sys.stderr)
+            elif component['job'].get('log') is None:
+                print("{}: skipping component with no log available".format(
+                    cname), file=sys.stderr)
+            else:
+                print("{}: reading log from {}".format(
+                    cname, component['job']['log']), file=sys.stderr)
+                summarizer = CollectionSummarizer(component['job']['log'])
+                summarizer.job_uuid = component['job']['uuid']
+                self.summarizers[cname] = summarizer
+
+    def run(self):
+        for summarizer in self.summarizers.itervalues():
+            summarizer.run()
+
+    def report(self):
+        txt = ''
+        for cname, summarizer in self.summarizers.iteritems():
+            txt += '### Summary for {} ({})\n'.format(
+                cname, summarizer.job_uuid)
+            txt += summarizer.report()
+            txt += '\n'
+        return txt
index dbc3843c698c616e9ef2f98b95b250e632b9f3c7..1bce69301fa975c143138c6f2dbee6b0d06a3a59 100644 (file)
+import arvados
+import collections
 import crunchstat_summary.command
 import crunchstat_summary.summarizer
 import difflib
 import glob
+import gzip
+import mock
 import os
 import unittest
 
 
-class ExampleLogsTestCase(unittest.TestCase):
+TESTS_DIR = os.path.dirname(os.path.abspath(__file__))
+
+class ReportDiff(unittest.TestCase):
+    def diff_known_report(self, logfile, summarizer):
+        expectfile = logfile+'.report'
+        expect = open(expectfile).readlines()
+        self.diff_report(summarizer, expect, expectfile=expectfile)
+
+    def diff_report(self, summarizer, expect, expectfile=None):
+        got = [x+"\n" for x in summarizer.report().strip("\n").split("\n")]
+        self.assertEqual(got, expect, "\n"+"".join(difflib.context_diff(
+            expect, got, fromfile=expectfile, tofile="(generated)")))
+
+
+class ExampleLogsTestCase(ReportDiff):
     def test_example_files(self):
-        dirname = os.path.dirname(os.path.abspath(__file__))
-        for fnm in glob.glob(os.path.join(dirname, '*.txt.gz')):
-            logfile = os.path.join(dirname, fnm)
+        for fnm in glob.glob(os.path.join(TESTS_DIR, '*.txt.gz')):
+            logfile = os.path.join(TESTS_DIR, fnm)
             args = crunchstat_summary.command.ArgumentParser().parse_args(
                 ['--log-file', logfile])
-            summarizer = crunchstat_summary.summarizer.Summarizer(args)
+            summarizer = crunchstat_summary.command.Command(args).summarizer()
             summarizer.run()
-            got = [x+"\n" for x in summarizer.report().strip("\n").split("\n")]
-            expectfile = logfile+'.report'
-            expect = open(expectfile).readlines()
-            self.assertEqual(got, expect, "\n"+"".join(difflib.context_diff(
-                expect, got, fromfile=expectfile, tofile="(generated)")))
+            self.diff_known_report(logfile, summarizer)
+
+
+class LookupJobUUID(ReportDiff):
+    fake_uuid = 'zzzzz-8i9sb-jq0ekny1xou3zoh'
+
+    @mock.patch('arvados.collection.CollectionReader')
+    @mock.patch('arvados.api')
+    def test_job_uuid(self, mock_api, mock_cr):
+        logfile = os.path.join(TESTS_DIR, 'logfile_20151204190335.txt.gz')
+        mock_api().jobs().get().execute.return_value = {'log': 'fake-uuid'}
+        mock_cr().__iter__.return_value = ['fake-logfile.txt']
+        mock_cr().open.return_value = gzip.open(logfile)
+        args = crunchstat_summary.command.ArgumentParser().parse_args(
+            ['--job', self.fake_uuid])
+        summarizer = crunchstat_summary.command.Command(args).summarizer()
+        summarizer.run()
+        self.diff_known_report(logfile, summarizer)
+        mock_api().jobs().get.assert_called_with(uuid=self.fake_uuid)
+        mock_cr().open.assert_called_with('fake-logfile.txt')
+
+
+class SummarizePipeline(ReportDiff):
+    fake_instance = {
+        'uuid': 'zzzzz-d1hrv-i3e77t9z5y8j9cc',
+        'owner_uuid': 'zzzzz-tpzed-xurymjxw79nv3jz',
+        'components': collections.OrderedDict([
+            ['foo', {
+                'job': {
+                    'uuid': 'zzzzz-8i9sb-000000000000000',
+                    'log': 'fake-log-pdh-0',
+                },
+            }],
+            ['bar', {
+                'job': {
+                    'uuid': 'zzzzz-8i9sb-000000000000001',
+                    'log': 'fake-log-pdh-1',
+                },
+            }],
+            ['no-job-assigned', {}],
+            ['unfinished-job', {
+                'job': {
+                    'uuid': 'zzzzz-8i9sb-xxxxxxxxxxxxxxx',
+                },
+            }],
+            ['baz', {
+                'job': {
+                    'uuid': 'zzzzz-8i9sb-000000000000002',
+                    'log': 'fake-log-pdh-2',
+                },
+            }]]),
+    }
+
+    @mock.patch('arvados.collection.CollectionReader')
+    @mock.patch('arvados.api')
+    def test_pipeline(self, mock_api, mock_cr):
+        logfile = os.path.join(TESTS_DIR, 'logfile_20151204190335.txt.gz')
+        mock_api().pipeline_instances().get().execute. \
+            return_value = self.fake_instance
+        mock_cr().__iter__.return_value = ['fake-logfile.txt']
+        mock_cr().open.side_effect = [gzip.open(logfile) for _ in range(3)]
+        args = crunchstat_summary.command.ArgumentParser().parse_args(
+            ['--pipeline-instance', self.fake_instance['uuid']])
+        summarizer = crunchstat_summary.command.Command(args).summarizer()
+        summarizer.run()
+
+        expect = (
+            ['### Summary for foo (zzzzz-8i9sb-000000000000000)\n'] +
+            open(logfile+'.report').readlines() + ['\n'] +
+            ['### Summary for bar (zzzzz-8i9sb-000000000000001)\n'] +
+            open(logfile+'.report').readlines() + ['\n'] +
+            ['### Summary for baz (zzzzz-8i9sb-000000000000002)\n'] +
+            open(logfile+'.report').readlines())
+        self.diff_report(summarizer, expect)
+        mock_cr.assert_has_calls(
+            [
+                mock.call('fake-log-pdh-0'),
+                mock.call('fake-log-pdh-1'),
+                mock.call('fake-log-pdh-2'),
+            ], any_order=True)
+        mock_cr().open.assert_called_with('fake-logfile.txt')