Merge branch '13111-no-anonymous-sitefs'
[arvados.git] / tools / crunchstat-summary / crunchstat_summary / reader.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 from __future__ import print_function
6
7 import arvados
8 import Queue
9 import threading
10 import _strptime
11
12 from crunchstat_summary import logger
13
14
15 class CollectionReader(object):
16     def __init__(self, collection_id):
17         self._collection_id = collection_id
18         self._label = collection_id
19         self._reader = None
20
21     def __str__(self):
22         return self._label
23
24     def __iter__(self):
25         logger.debug('load collection %s', self._collection_id)
26         collection = arvados.collection.CollectionReader(self._collection_id)
27         filenames = [filename for filename in collection]
28         if len(filenames) == 1:
29             filename = filenames[0]
30         else:
31             filename = 'crunchstat.txt'
32         self._label = "{}/{}".format(self._collection_id, filename)
33         self._reader = collection.open(filename)
34         return iter(self._reader)
35
36     def __enter__(self):
37         return self
38
39     def __exit__(self, exc_type, exc_val, exc_tb):
40         if self._reader:
41             self._reader.close()
42             self._reader = None
43
44
45 class LiveLogReader(object):
46     EOF = None
47
48     def __init__(self, job_uuid):
49         self.job_uuid = job_uuid
50         self.event_types = (['stderr'] if '-8i9sb-' in job_uuid else ['crunchstat', 'arv-mount'])
51         logger.debug('load %s events for job %s', self.event_types, self.job_uuid)
52
53     def __str__(self):
54         return self.job_uuid
55
56     def _get_all_pages(self):
57         got = 0
58         last_id = 0
59         filters = [
60             ['object_uuid', '=', self.job_uuid],
61             ['event_type', 'in', self.event_types]]
62         try:
63             while True:
64                 page = arvados.api().logs().index(
65                     limit=1000,
66                     order=['id asc'],
67                     filters=filters + [['id','>',str(last_id)]],
68                     select=['id', 'properties'],
69                 ).execute(num_retries=2)
70                 got += len(page['items'])
71                 logger.debug(
72                     '%s: received %d of %d log events',
73                     self.job_uuid, got,
74                     got + page['items_available'] - len(page['items']))
75                 for i in page['items']:
76                     for line in i['properties']['text'].split('\n'):
77                         self._queue.put(line+'\n')
78                     last_id = i['id']
79                 if (len(page['items']) == 0 or
80                     len(page['items']) >= page['items_available']):
81                     break
82         finally:
83             self._queue.put(self.EOF)
84
85     def __iter__(self):
86         self._queue = Queue.Queue()
87         self._thread = threading.Thread(target=self._get_all_pages)
88         self._thread.daemon = True
89         self._thread.start()
90         return self
91
92     def next(self):
93         line = self._queue.get()
94         if line is self.EOF:
95             self._thread.join()
96             raise StopIteration
97         return line
98
99     def __enter__(self):
100         return self
101
102     def __exit__(self, exc_type, exc_val, exc_tb):
103         pass