19744: Run summarizer at end of container
[arvados.git] / tools / crunchstat-summary / crunchstat_summary / command.py
index a9dfc83bf1835aeed46fd825930b5d8397dc3ad7..71eae519969bf1f34ec10b491c3885c1425596df 100644 (file)
@@ -1,9 +1,15 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
 import argparse
 import gzip
+from io import open
 import logging
 import sys
 
 from crunchstat_summary import logger, summarizer
+from crunchstat_summary._version import __version__
 
 
 class ArgumentParser(argparse.ArgumentParser):
@@ -12,23 +18,63 @@ class ArgumentParser(argparse.ArgumentParser):
             description='Summarize resource usage of an Arvados Crunch job')
         src = self.add_mutually_exclusive_group()
         src.add_argument(
-            '--job', type=str, metavar='UUID',
-            help='Look up the specified job and read its log data from Keep')
+            '--job', '--container-request',
+            type=str, metavar='UUID',
+            help='Look up the specified job or container request '
+            'and read its log data from Keep (or from the Arvados event log, '
+            'if the job is still running)')
+        src.add_argument(
+            '--container',
+            type=str, metavar='UUID',
+            help='[Deprecated] Look up the specified container find its container request '
+            'and read its log data from Keep (or from the Arvados event log, '
+            'if the job is still running)')
         src.add_argument(
             '--pipeline-instance', type=str, metavar='UUID',
-            help='Summarize each component of the given pipeline instance')
+            help='[Deprecated] Summarize each component of the given pipeline instance (historical pre-1.4)')
         src.add_argument(
             '--log-file', type=str,
             help='Read log data from a regular file')
         self.add_argument(
             '--skip-child-jobs', action='store_true',
-            help='Do not include stats from child jobs')
+            help='Do not include stats from child jobs/containers')
         self.add_argument(
             '--format', type=str, choices=('html', 'text'), default='text',
             help='Report format')
+        self.add_argument(
+            '--threads', type=int, default=8,
+            help='Maximum worker threads to run')
         self.add_argument(
             '--verbose', '-v', action='count', default=0,
             help='Log more information (once for progress, twice for debug)')
+        self.add_argument('--version', action='version',
+                         version="%s %s" % (sys.argv[0], __version__),
+                         help='Print version and exit.')
+
+
+class UTF8Decode(object):
+    '''Wrap a file-like iterable to decode UTF-8 bytes into a strings
+    '''
+    def __init__(self, fh):
+        self.fh = fh
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self.close()
+
+    def __iter__(self):
+        return self
+
+    def __next__(self):
+        return next(self.fh).decode('utf-8')
+
+    next = __next__
+
+    def close(self):
+        # mimic Gzip behavior and don't close underlying object
+        pass
 
 
 class Command(object):
@@ -39,16 +85,19 @@ class Command(object):
     def run(self):
         kwargs = {
             'skip_child_jobs': self.args.skip_child_jobs,
+            'threads': self.args.threads,
         }
         if self.args.pipeline_instance:
-            self.summer = summarizer.PipelineSummarizer(self.args.pipeline_instance, **kwargs)
+            self.summer = summarizer.NewSummarizer(self.args.pipeline_instance, **kwargs)
         elif self.args.job:
-            self.summer = summarizer.JobSummarizer(self.args.job, **kwargs)
+            self.summer = summarizer.NewSummarizer(self.args.job, **kwargs)
+        elif self.args.container:
+            self.summer = summarizer.NewSummarizer(self.args.container, **kwargs)
         elif self.args.log_file:
             if self.args.log_file.endswith('.gz'):
-                fh = gzip.open(self.args.log_file)
+                fh = UTF8Decode(gzip.open(self.args.log_file))
             else:
-                fh = open(self.args.log_file)
+                fh = open(self.args.log_file, mode = 'r', encoding = 'utf-8')
             self.summer = summarizer.Summarizer(fh, **kwargs)
         else:
             self.summer = summarizer.Summarizer(sys.stdin, **kwargs)