Do not pipe into `grep -q`, because that stops reading as soon as a
[arvados.git] / tools / crunchstat-summary / crunchstat_summary / command.py
index a9dfc83bf1835aeed46fd825930b5d8397dc3ad7..ec7acb8083928f6f35f2af7835ee92f8a4a895dc 100644 (file)
@@ -1,5 +1,10 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
 import argparse
 import gzip
+from io import open
 import logging
 import sys
 
@@ -12,25 +17,62 @@ class ArgumentParser(argparse.ArgumentParser):
             description='Summarize resource usage of an Arvados Crunch job')
         src = self.add_mutually_exclusive_group()
         src.add_argument(
-            '--job', type=str, metavar='UUID',
-            help='Look up the specified job and read its log data from Keep')
+            '--job', '--container-request',
+            type=str, metavar='UUID',
+            help='Look up the specified job or container request '
+            'and read its log data from Keep (or from the Arvados event log, '
+            'if the job is still running)')
+        src.add_argument(
+            '--container',
+            type=str, metavar='UUID',
+            help='[Deprecated] Look up the specified container find its container request '
+            'and read its log data from Keep (or from the Arvados event log, '
+            'if the job is still running)')
         src.add_argument(
             '--pipeline-instance', type=str, metavar='UUID',
-            help='Summarize each component of the given pipeline instance')
+            help='[Deprecated] Summarize each component of the given pipeline instance (historical pre-1.4)')
         src.add_argument(
             '--log-file', type=str,
             help='Read log data from a regular file')
         self.add_argument(
             '--skip-child-jobs', action='store_true',
-            help='Do not include stats from child jobs')
+            help='Do not include stats from child jobs/containers')
         self.add_argument(
             '--format', type=str, choices=('html', 'text'), default='text',
             help='Report format')
+        self.add_argument(
+            '--threads', type=int, default=8,
+            help='Maximum worker threads to run')
         self.add_argument(
             '--verbose', '-v', action='count', default=0,
             help='Log more information (once for progress, twice for debug)')
 
 
+class UTF8Decode(object):
+    '''Wrap a file-like iterable to decode UTF-8 bytes into a strings
+    '''
+    def __init__(self, fh):
+        self.fh = fh
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self.close()
+
+    def __iter__(self):
+        return self
+
+    def __next__(self):
+        return next(self.fh).decode('utf-8')
+
+    next = __next__
+
+    def close(self):
+        # mimic Gzip behavior and don't close underlying object
+        pass
+
+
 class Command(object):
     def __init__(self, args):
         self.args = args
@@ -39,16 +81,19 @@ class Command(object):
     def run(self):
         kwargs = {
             'skip_child_jobs': self.args.skip_child_jobs,
+            'threads': self.args.threads,
         }
         if self.args.pipeline_instance:
-            self.summer = summarizer.PipelineSummarizer(self.args.pipeline_instance, **kwargs)
+            self.summer = summarizer.NewSummarizer(self.args.pipeline_instance, **kwargs)
         elif self.args.job:
-            self.summer = summarizer.JobSummarizer(self.args.job, **kwargs)
+            self.summer = summarizer.NewSummarizer(self.args.job, **kwargs)
+        elif self.args.container:
+            self.summer = summarizer.NewSummarizer(self.args.container, **kwargs)
         elif self.args.log_file:
             if self.args.log_file.endswith('.gz'):
-                fh = gzip.open(self.args.log_file)
+                fh = UTF8Decode(gzip.open(self.args.log_file))
             else:
-                fh = open(self.args.log_file)
+                fh = open(self.args.log_file, mode = 'r', encoding = 'utf-8')
             self.summer = summarizer.Summarizer(fh, **kwargs)
         else:
             self.summer = summarizer.Summarizer(sys.stdin, **kwargs)