21700: Install Bundler system-wide in Rails postinst
[arvados.git] / tools / crunchstat-summary / crunchstat_summary / reader.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 import arvados
6 import itertools
7 import json
8 import queue
9 import threading
10
11 from crunchstat_summary import logger
12
13
14 class CollectionReader(object):
15     def __init__(self, collection_id, api_client=None, collection_object=None):
16         self._collection_id = collection_id
17         self._label = collection_id
18         self._readers = []
19         self._api_client = api_client
20         self._collection = collection_object or arvados.collection.CollectionReader(self._collection_id, api_client=self._api_client)
21
22     def __str__(self):
23         return self._label
24
25     def __iter__(self):
26         logger.debug('load collection %s', self._collection_id)
27
28         filenames = [filename for filename in self._collection]
29         # Crunch2 has multiple stats files
30         if len(filenames) > 1:
31             filenames = ['crunchstat.txt', 'arv-mount.txt']
32         for filename in filenames:
33             try:
34                 self._readers.append(self._collection.open(filename, "rt"))
35             except IOError:
36                 logger.warn('Unable to open %s', filename)
37         self._label = "{}/{}".format(self._collection_id, filenames[0])
38         return itertools.chain(*[iter(reader) for reader in self._readers])
39
40     def __enter__(self):
41         return self
42
43     def __exit__(self, exc_type, exc_val, exc_tb):
44         if self._readers:
45             for reader in self._readers:
46                 reader.close()
47             self._readers = []
48
49     def node_info(self):
50         try:
51             with self._collection.open("node.json", "rt") as f:
52                 return json.load(f)
53         except IOError:
54             logger.warn('Unable to open node.json')
55         return {}
56
57
58 class LiveLogReader(object):
59     EOF = None
60
61     def __init__(self, job_uuid):
62         self.job_uuid = job_uuid
63         self.event_types = (['stderr'] if '-8i9sb-' in job_uuid else ['crunchstat', 'arv-mount'])
64         logger.debug('load %s events for job %s', self.event_types, self.job_uuid)
65
66     def __str__(self):
67         return self.job_uuid
68
69     def _get_all_pages(self):
70         got = 0
71         last_id = 0
72         filters = [
73             ['object_uuid', '=', self.job_uuid],
74             ['event_type', 'in', self.event_types]]
75         try:
76             while True:
77                 page = arvados.api().logs().list(
78                     limit=1000,
79                     order=['id asc'],
80                     filters=filters + [['id','>',str(last_id)]],
81                     select=['id', 'properties'],
82                 ).execute(num_retries=2)
83                 got += len(page['items'])
84                 logger.debug(
85                     '%s: received %d of %d log events',
86                     self.job_uuid, got,
87                     got + page['items_available'] - len(page['items']))
88                 for i in page['items']:
89                     for line in i['properties']['text'].split('\n'):
90                         self._queue.put(line+'\n')
91                     last_id = i['id']
92                 if (len(page['items']) == 0 or
93                     len(page['items']) >= page['items_available']):
94                     break
95         finally:
96             self._queue.put(self.EOF)
97
98     def __iter__(self):
99         self._queue = queue.Queue()
100         self._thread = threading.Thread(target=self._get_all_pages)
101         self._thread.daemon = True
102         self._thread.start()
103         return self
104
105     def __next__(self):
106         line = self._queue.get()
107         if line is self.EOF:
108             self._thread.join()
109             raise StopIteration
110         return line
111
112     next = __next__ # for Python 2
113
114     def __enter__(self):
115         return self
116
117     def __exit__(self, exc_type, exc_val, exc_tb):
118         pass
119
120     def node_info(self):
121         return {}
122
123 class StubReader(object):
124     def __init__(self, fh):
125         self.fh = fh
126
127     def __str__(self):
128         return ""
129
130     def __iter__(self):
131         return iter(self.fh)
132
133     def __enter__(self):
134         return self
135
136     def __exit__(self, exc_type, exc_val, exc_tb):
137         pass
138
139     def node_info(self):
140         return {}