import sys
args = crunchstat_summary.command.ArgumentParser().parse_args(sys.argv[1:])
-s = crunchstat_summary.summarizer.Summarizer(args)
+s = crunchstat_summary.command.Command(args).summarizer()
s.run()
print(s.report(), end='')
import argparse
+import gzip
+import sys
+
+from crunchstat_summary import summarizer
class ArgumentParser(argparse.ArgumentParser):
src.add_argument(
'--job', type=str, metavar='UUID',
help='Look up the specified job and read its log data from Keep')
+ src.add_argument(
+ '--pipeline-instance', type=str, metavar='UUID',
+ help='Summarize each component of the given pipeline instance')
src.add_argument(
'--log-file', type=str,
help='Read log data from a regular file')
+
+
+class Command(object):
+ def __init__(self, args):
+ self.args = args
+
+ def summarizer(self):
+ if self.args.pipeline_instance:
+ return summarizer.PipelineSummarizer(self.args.pipeline_instance)
+ elif self.args.job:
+ return summarizer.JobSummarizer(self.args.job)
+ elif self.args.log_file:
+ if self.args.log_file.endswith('.gz'):
+ fh = gzip.open(self.args.log_file)
+ else:
+ fh = open(self.args.log_file)
+ return summarizer.Summarizer(fh)
+ else:
+ return summarizer.Summarizer(sys.stdin)
import arvados
import collections
import functools
-import gzip
import re
import sys
class Summarizer(object):
- def __init__(self, args):
- self.args = args
+ def __init__(self, logdata):
+ self._logdata = logdata
def run(self):
# stats_max: {category: {stat: val}}
# task_stats: {task_id: {category: {stat: val}}}
self.task_stats = collections.defaultdict(
functools.partial(collections.defaultdict, dict))
- for line in self._logdata():
+ for line in self._logdata:
m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) success in (?P<elapsed>\d+) seconds', line)
if m:
task_id = m.group('seq')
if m.group('category').endswith(':'):
# "notice:" etc.
continue
+ elif m.group('category') == 'error':
+ continue
task_id = m.group('seq')
this_interval_s = None
for group in ['current', 'interval']:
words = m.group(group).split(' ')
stats = {}
for val, stat in zip(words[::2], words[1::2]):
- if '.' in val:
- stats[stat] = float(val)
- else:
- stats[stat] = int(val)
+ try:
+ if '.' in val:
+ stats[stat] = float(val)
+ else:
+ stats[stat] = int(val)
+ except ValueError as e:
+ raise ValueError(
+ 'Error parsing {} stat in "{}": {!r}'.format(
+ stat, line, e))
if 'user' in stats or 'sys' in stats:
stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
if 'tx' in stats or 'rx' in stats:
else:
return '{}'.format(val)
- def _logdata(self):
- if self.args.log_file:
- if self.args.log_file.endswith('.gz'):
- return gzip.open(self.args.log_file)
- else:
- return open(self.args.log_file)
- elif self.args.job:
- arv = arvados.api('v1')
- job = arv.jobs().get(uuid=self.args.job).execute()
- if not job['log']:
- raise ValueError(
- "job {} has no log; live summary not implemented".format(
- self.args.job))
- collection = arvados.collection.CollectionReader(job['log'])
- filenames = [filename for filename in collection]
- if len(filenames) != 1:
- raise ValueError(
- "collection {} has {} files; need exactly one".format(
- job.log, len(filenames)))
- return collection.open(filenames[0])
+class CollectionSummarizer(Summarizer):
+ def __init__(self, collection_id):
+ collection = arvados.collection.CollectionReader(collection_id)
+ filenames = [filename for filename in collection]
+ if len(filenames) != 1:
+ raise ValueError(
+ "collection {} has {} files; need exactly one".format(
+ collection_id, len(filenames)))
+ super(CollectionSummarizer, self).__init__(collection.open(filenames[0]))
+
+class JobSummarizer(CollectionSummarizer):
+ def __init__(self, job):
+ arv = arvados.api('v1')
+ if isinstance(job, str):
+ self.job = arv.jobs().get(uuid=job).execute()
else:
- return sys.stdin
+ self.job = job
+ if not self.job['log']:
+ raise ValueError(
+ "job {} has no log; live summary not implemented".format(
+ self.job['uuid']))
+ super(JobSummarizer, self).__init__(self.job['log'])
+
+class PipelineSummarizer():
+ def __init__(self, pipeline_instance_uuid):
+ arv = arvados.api('v1')
+ instance = arv.pipeline_instances().get(
+ uuid=pipeline_instance_uuid).execute()
+ self.summarizers = collections.OrderedDict()
+ for cname, component in instance['components'].iteritems():
+ if 'job' not in component:
+ print("{}: skipping component with no job assigned".format(
+ cname), file=sys.stderr)
+ elif component['job'].get('log') is None:
+ print("{}: skipping component with no log available".format(
+ cname), file=sys.stderr)
+ else:
+ print("{}: reading log from {}".format(
+ cname, component['job']['log']), file=sys.stderr)
+ summarizer = CollectionSummarizer(component['job']['log'])
+ summarizer.job_uuid = component['job']['uuid']
+ self.summarizers[cname] = summarizer
+
+ def run(self):
+ for summarizer in self.summarizers.itervalues():
+ summarizer.run()
+
+ def report(self):
+ txt = ''
+ for cname, summarizer in self.summarizers.iteritems():
+ txt += '### Summary for {} ({})\n'.format(
+ cname, summarizer.job_uuid)
+ txt += summarizer.report()
+ txt += '\n'
+ return txt
+import arvados
+import collections
import crunchstat_summary.command
import crunchstat_summary.summarizer
import difflib
import glob
+import gzip
+import mock
import os
import unittest
-class ExampleLogsTestCase(unittest.TestCase):
+TESTS_DIR = os.path.dirname(os.path.abspath(__file__))
+
+class ReportDiff(unittest.TestCase):
+ def diff_known_report(self, logfile, summarizer):
+ expectfile = logfile+'.report'
+ expect = open(expectfile).readlines()
+ self.diff_report(summarizer, expect, expectfile=expectfile)
+
+ def diff_report(self, summarizer, expect, expectfile=None):
+ got = [x+"\n" for x in summarizer.report().strip("\n").split("\n")]
+ self.assertEqual(got, expect, "\n"+"".join(difflib.context_diff(
+ expect, got, fromfile=expectfile, tofile="(generated)")))
+
+
+class ExampleLogsTestCase(ReportDiff):
def test_example_files(self):
- dirname = os.path.dirname(os.path.abspath(__file__))
- for fnm in glob.glob(os.path.join(dirname, '*.txt.gz')):
- logfile = os.path.join(dirname, fnm)
+ for fnm in glob.glob(os.path.join(TESTS_DIR, '*.txt.gz')):
+ logfile = os.path.join(TESTS_DIR, fnm)
args = crunchstat_summary.command.ArgumentParser().parse_args(
['--log-file', logfile])
- summarizer = crunchstat_summary.summarizer.Summarizer(args)
+ summarizer = crunchstat_summary.command.Command(args).summarizer()
summarizer.run()
- got = [x+"\n" for x in summarizer.report().strip("\n").split("\n")]
- expectfile = logfile+'.report'
- expect = open(expectfile).readlines()
- self.assertEqual(got, expect, "\n"+"".join(difflib.context_diff(
- expect, got, fromfile=expectfile, tofile="(generated)")))
+ self.diff_known_report(logfile, summarizer)
+
+
+class LookupJobUUID(ReportDiff):
+ fake_uuid = 'zzzzz-8i9sb-jq0ekny1xou3zoh'
+
+ @mock.patch('arvados.collection.CollectionReader')
+ @mock.patch('arvados.api')
+ def test_job_uuid(self, mock_api, mock_cr):
+ logfile = os.path.join(TESTS_DIR, 'logfile_20151204190335.txt.gz')
+ mock_api().jobs().get().execute.return_value = {'log': 'fake-uuid'}
+ mock_cr().__iter__.return_value = ['fake-logfile.txt']
+ mock_cr().open.return_value = gzip.open(logfile)
+ args = crunchstat_summary.command.ArgumentParser().parse_args(
+ ['--job', self.fake_uuid])
+ summarizer = crunchstat_summary.command.Command(args).summarizer()
+ summarizer.run()
+ self.diff_known_report(logfile, summarizer)
+ mock_api().jobs().get.assert_called_with(uuid=self.fake_uuid)
+ mock_cr().open.assert_called_with('fake-logfile.txt')
+
+
+class SummarizePipeline(ReportDiff):
+ fake_instance = {
+ 'uuid': 'zzzzz-d1hrv-i3e77t9z5y8j9cc',
+ 'owner_uuid': 'zzzzz-tpzed-xurymjxw79nv3jz',
+ 'components': collections.OrderedDict([
+ ['foo', {
+ 'job': {
+ 'uuid': 'zzzzz-8i9sb-000000000000000',
+ 'log': 'fake-log-pdh-0',
+ },
+ }],
+ ['bar', {
+ 'job': {
+ 'uuid': 'zzzzz-8i9sb-000000000000001',
+ 'log': 'fake-log-pdh-1',
+ },
+ }],
+ ['no-job-assigned', {}],
+ ['unfinished-job', {
+ 'job': {
+ 'uuid': 'zzzzz-8i9sb-xxxxxxxxxxxxxxx',
+ },
+ }],
+ ['baz', {
+ 'job': {
+ 'uuid': 'zzzzz-8i9sb-000000000000002',
+ 'log': 'fake-log-pdh-2',
+ },
+ }]]),
+ }
+
+ @mock.patch('arvados.collection.CollectionReader')
+ @mock.patch('arvados.api')
+ def test_pipeline(self, mock_api, mock_cr):
+ logfile = os.path.join(TESTS_DIR, 'logfile_20151204190335.txt.gz')
+ mock_api().pipeline_instances().get().execute. \
+ return_value = self.fake_instance
+ mock_cr().__iter__.return_value = ['fake-logfile.txt']
+ mock_cr().open.side_effect = [gzip.open(logfile) for _ in range(3)]
+ args = crunchstat_summary.command.ArgumentParser().parse_args(
+ ['--pipeline-instance', self.fake_instance['uuid']])
+ summarizer = crunchstat_summary.command.Command(args).summarizer()
+ summarizer.run()
+
+ expect = (
+ ['### Summary for foo (zzzzz-8i9sb-000000000000000)\n'] +
+ open(logfile+'.report').readlines() + ['\n'] +
+ ['### Summary for bar (zzzzz-8i9sb-000000000000001)\n'] +
+ open(logfile+'.report').readlines() + ['\n'] +
+ ['### Summary for baz (zzzzz-8i9sb-000000000000002)\n'] +
+ open(logfile+'.report').readlines())
+ self.diff_report(summarizer, expect)
+ mock_cr.assert_has_calls(
+ [
+ mock.call('fake-log-pdh-0'),
+ mock.call('fake-log-pdh-1'),
+ mock.call('fake-log-pdh-2'),
+ ], any_order=True)
+ mock_cr().open.assert_called_with('fake-logfile.txt')