10472: Merge branch 'master' into 10472-csummary-cwl-pipeline
[arvados.git] / tools / crunchstat-summary / crunchstat_summary / reader.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 from __future__ import print_function
6
7 import arvados
8 import Queue
9 import threading
10 import _strptime
11
12 from crunchstat_summary import logger
13
14
15 class CollectionReader(object):
16     def __init__(self, collection_id):
17         self._collection_id = collection_id
18         self._label = collection_id
19         self._reader = None
20
21     def __str__(self):
22         return self._label
23
24     def __iter__(self):
25         logger.debug('load collection %s', self._collection_id)
26         collection = arvados.collection.CollectionReader(self._collection_id)
27         filenames = [filename for filename in collection]
28         if len(filenames) == 1:
29             filename = filenames[0]
30         else:
31             filename = 'crunchstat.txt'
32         self._label = "{}/{}".format(self._collection_id, filename)
33         self._reader = collection.open(filename)
34         return iter(self._reader)
35
36     def __enter__(self):
37         return self
38
39     def __exit__(self, exc_type, exc_val, exc_tb):
40         if self._reader:
41             self._reader.close()
42             self._reader = None
43
44
45 class LiveLogReader(object):
46     EOF = None
47
48     def __init__(self, job_uuid):
49         logger.debug('load stderr events for job %s', job_uuid)
50         self.job_uuid = job_uuid
51
52     def __str__(self):
53         return self.job_uuid
54
55     def _get_all_pages(self):
56         got = 0
57         last_id = 0
58         filters = [
59             ['object_uuid', '=', self.job_uuid],
60             ['event_type', '=', 'stderr']]
61         try:
62             while True:
63                 page = arvados.api().logs().index(
64                     limit=1000,
65                     order=['id asc'],
66                     filters=filters + [['id','>',str(last_id)]],
67                     select=['id', 'properties'],
68                 ).execute(num_retries=2)
69                 got += len(page['items'])
70                 logger.debug(
71                     '%s: received %d of %d log events',
72                     self.job_uuid, got,
73                     got + page['items_available'] - len(page['items']))
74                 for i in page['items']:
75                     for line in i['properties']['text'].split('\n'):
76                         self._queue.put(line+'\n')
77                     last_id = i['id']
78                 if (len(page['items']) == 0 or
79                     len(page['items']) >= page['items_available']):
80                     break
81         finally:
82             self._queue.put(self.EOF)
83
84     def __iter__(self):
85         self._queue = Queue.Queue()
86         self._thread = threading.Thread(target=self._get_all_pages)
87         self._thread.daemon = True
88         self._thread.start()
89         return self
90
91     def next(self):
92         line = self._queue.get()
93         if line is self.EOF:
94             self._thread.join()
95             raise StopIteration
96         return line
97
98     def __enter__(self):
99         return self
100
101     def __exit__(self, exc_type, exc_val, exc_tb):
102         pass