Merge branch '14645-fuse-operations-reporting'
[arvados.git] / tools / crunchstat-summary / crunchstat_summary / command.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 import argparse
6 import gzip
7 from io import open
8 import logging
9 import sys
10
11 from crunchstat_summary import logger, summarizer
12
13
14 class ArgumentParser(argparse.ArgumentParser):
15     def __init__(self):
16         super(ArgumentParser, self).__init__(
17             description='Summarize resource usage of an Arvados Crunch job')
18         src = self.add_mutually_exclusive_group()
19         src.add_argument(
20             '--job', '--container', '--container-request',
21             type=str, metavar='UUID',
22             help='Look up the specified job, container, or container request '
23             'and read its log data from Keep (or from the Arvados event log, '
24             'if the job is still running)')
25         src.add_argument(
26             '--pipeline-instance', type=str, metavar='UUID',
27             help='Summarize each component of the given pipeline instance')
28         src.add_argument(
29             '--log-file', type=str,
30             help='Read log data from a regular file')
31         self.add_argument(
32             '--skip-child-jobs', action='store_true',
33             help='Do not include stats from child jobs/containers')
34         self.add_argument(
35             '--format', type=str, choices=('html', 'text'), default='text',
36             help='Report format')
37         self.add_argument(
38             '--threads', type=int, default=8,
39             help='Maximum worker threads to run')
40         self.add_argument(
41             '--verbose', '-v', action='count', default=0,
42             help='Log more information (once for progress, twice for debug)')
43
44
45 class UTF8Decode(object):
46     '''Wrap a file-like iterable to decode UTF-8 bytes into a strings
47     '''
48     def __init__(self, fh):
49         self.fh = fh
50
51     def __enter__(self):
52         return self
53
54     def __exit__(self, exc_type, exc_val, exc_tb):
55         self.close()
56
57     def __iter__(self):
58         return self
59
60     def __next__(self):
61         return next(self.fh).decode('utf-8')
62
63     next = __next__
64
65     def close(self):
66         # mimic Gzip behavior and don't close underlying object
67         pass
68
69
70 class Command(object):
71     def __init__(self, args):
72         self.args = args
73         logger.setLevel(logging.WARNING - 10 * args.verbose)
74
75     def run(self):
76         kwargs = {
77             'skip_child_jobs': self.args.skip_child_jobs,
78             'threads': self.args.threads,
79         }
80         if self.args.pipeline_instance:
81             self.summer = summarizer.NewSummarizer(self.args.pipeline_instance, **kwargs)
82         elif self.args.job:
83             self.summer = summarizer.NewSummarizer(self.args.job, **kwargs)
84         elif self.args.log_file:
85             if self.args.log_file.endswith('.gz'):
86                 fh = UTF8Decode(gzip.open(self.args.log_file))
87             else:
88                 fh = open(self.args.log_file, mode = 'r', encoding = 'utf-8')
89             self.summer = summarizer.Summarizer(fh, **kwargs)
90         else:
91             self.summer = summarizer.Summarizer(sys.stdin, **kwargs)
92         return self.summer.run()
93
94     def report(self):
95         if self.args.format == 'html':
96             return self.summer.html_report()
97         elif self.args.format == 'text':
98             return self.summer.text_report()