19744: Fix up threading
[arvados.git] / tools / crunchstat-summary / crunchstat_summary / command.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 import argparse
6 import gzip
7 from io import open
8 import logging
9 import sys
10 import arvados
11
12 from crunchstat_summary import logger, summarizer
13 from crunchstat_summary._version import __version__
14
15
16 class ArgumentParser(argparse.ArgumentParser):
17     def __init__(self):
18         super(ArgumentParser, self).__init__(
19             description='Summarize resource usage of an Arvados Crunch job')
20         src = self.add_mutually_exclusive_group()
21         src.add_argument(
22             '--job', '--container-request',
23             type=str, metavar='UUID',
24             help='Look up the specified job or container request '
25             'and read its log data from Keep (or from the Arvados event log, '
26             'if the job is still running)')
27         src.add_argument(
28             '--container',
29             type=str, metavar='UUID',
30             help='[Deprecated] Look up the specified container find its container request '
31             'and read its log data from Keep (or from the Arvados event log, '
32             'if the job is still running)')
33         src.add_argument(
34             '--pipeline-instance', type=str, metavar='UUID',
35             help='[Deprecated] Summarize each component of the given pipeline instance (historical pre-1.4)')
36         src.add_argument(
37             '--log-file', type=str,
38             help='Read log data from a regular file')
39         self.add_argument(
40             '--skip-child-jobs', action='store_true',
41             help='Do not include stats from child jobs/containers')
42         self.add_argument(
43             '--format', type=str, choices=('html', 'text'), default='text',
44             help='Report format')
45         self.add_argument(
46             '--threads', type=int, default=8,
47             help='Maximum worker threads to run')
48         self.add_argument(
49             '--verbose', '-v', action='count', default=0,
50             help='Log more information (once for progress, twice for debug)')
51         self.add_argument('--version', action='version',
52                          version="%s %s" % (sys.argv[0], __version__),
53                          help='Print version and exit.')
54
55
56 class UTF8Decode(object):
57     '''Wrap a file-like iterable to decode UTF-8 bytes into a strings
58     '''
59     def __init__(self, fh):
60         self.fh = fh
61
62     def __enter__(self):
63         return self
64
65     def __exit__(self, exc_type, exc_val, exc_tb):
66         self.close()
67
68     def __iter__(self):
69         return self
70
71     def __next__(self):
72         return next(self.fh).decode('utf-8')
73
74     next = __next__
75
76     def close(self):
77         # mimic Gzip behavior and don't close underlying object
78         pass
79
80
81 class Command(object):
82     def __init__(self, args):
83         self.args = args
84         logger.setLevel(logging.WARNING - 10 * args.verbose)
85
86     def run(self):
87         kwargs = {
88             'skip_child_jobs': self.args.skip_child_jobs,
89             'threads': self.args.threads,
90             'arv': arvados.api('v1')
91         }
92         if self.args.pipeline_instance:
93             self.summer = summarizer.NewSummarizer(self.args.pipeline_instance, **kwargs)
94         elif self.args.job:
95             self.summer = summarizer.NewSummarizer(self.args.job, **kwargs)
96         elif self.args.container:
97             self.summer = summarizer.NewSummarizer(self.args.container, **kwargs)
98         elif self.args.log_file:
99             if self.args.log_file.endswith('.gz'):
100                 fh = UTF8Decode(gzip.open(self.args.log_file))
101             else:
102                 fh = open(self.args.log_file, mode = 'r', encoding = 'utf-8')
103             self.summer = summarizer.Summarizer(fh, **kwargs)
104         else:
105             self.summer = summarizer.Summarizer(sys.stdin, **kwargs)
106         return self.summer.run()
107
108     def report(self):
109         if self.args.format == 'html':
110             return self.summer.html_report()
111         elif self.args.format == 'text':
112             return self.summer.text_report()