import argparse
import gzip
+from io import open
import logging
import sys
+import arvados
-from crunchstat_summary import logger, summarizer
+from crunchstat_summary import logger, summarizer, reader
+from crunchstat_summary._version import __version__
class ArgumentParser(argparse.ArgumentParser):
description='Summarize resource usage of an Arvados Crunch job')
src = self.add_mutually_exclusive_group()
src.add_argument(
- '--job', type=str, metavar='UUID',
- help='Look up the specified job and read its log data from Keep'
- ' (or from the Arvados event log, if the job is still running)')
+ '--job', '--container-request',
+ type=str, metavar='UUID',
+ help='Look up the specified job or container request '
+ 'and read its log data from Keep (or from the Arvados event log, '
+ 'if the job is still running)')
src.add_argument(
- '--pipeline-instance', type=str, metavar='UUID',
- help='Summarize each component of the given pipeline instance')
+ '--container',
+ type=str, metavar='UUID',
+ help='[Deprecated] Look up the specified container find its container request '
+ 'and read its log data from Keep (or from the Arvados event log, '
+ 'if the job is still running)')
src.add_argument(
'--log-file', type=str,
help='Read log data from a regular file')
self.add_argument(
'--skip-child-jobs', action='store_true',
- help='Do not include stats from child jobs')
+ help='Do not include stats from child jobs/containers')
self.add_argument(
'--format', type=str, choices=('html', 'text'), default='text',
help='Report format')
+ self.add_argument(
+ '--threads', type=int, default=8,
+ help='Maximum worker threads to run')
self.add_argument(
'--verbose', '-v', action='count', default=0,
help='Log more information (once for progress, twice for debug)')
+ self.add_argument('--version', action='version',
+ version="%s %s" % (sys.argv[0], __version__),
+ help='Print version and exit.')
+
+
+class UTF8Decode(object):
+ '''Wrap a file-like iterable to decode UTF-8 bytes into a strings
+ '''
+ def __init__(self, fh):
+ self.fh = fh
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.close()
+
+ def __iter__(self):
+ return self
+
+ def __next__(self):
+ return next(self.fh).decode('utf-8')
+
+ next = __next__
+
+ def close(self):
+ # mimic Gzip behavior and don't close underlying object
+ pass
class Command(object):
def run(self):
kwargs = {
'skip_child_jobs': self.args.skip_child_jobs,
+ 'threads': self.args.threads,
+ 'arv': arvados.api('v1')
}
- if self.args.pipeline_instance:
- self.summer = summarizer.PipelineSummarizer(self.args.pipeline_instance, **kwargs)
- elif self.args.job:
- self.summer = summarizer.JobSummarizer(self.args.job, **kwargs)
+ if self.args.job:
+ self.summer = summarizer.NewSummarizer(self.args.job, **kwargs)
+ elif self.args.container:
+ self.summer = summarizer.NewSummarizer(self.args.container, **kwargs)
elif self.args.log_file:
if self.args.log_file.endswith('.gz'):
- fh = gzip.open(self.args.log_file)
+ fh = UTF8Decode(gzip.open(self.args.log_file))
else:
- fh = open(self.args.log_file)
- self.summer = summarizer.Summarizer(fh, **kwargs)
+ fh = open(self.args.log_file, mode = 'r', encoding = 'utf-8')
+ self.summer = summarizer.Summarizer(reader.StubReader(fh), **kwargs)
else:
- self.summer = summarizer.Summarizer(sys.stdin, **kwargs)
+ self.summer = summarizer.Summarizer(reader.StubReader(sys.stdin), **kwargs)
return self.summer.run()
def report(self):