1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
10 from crunchstat_summary import logger
13 class CollectionReader(object):
14 def __init__(self, collection_id):
15 self._collection_id = collection_id
16 self._label = collection_id
23 logger.debug('load collection %s', self._collection_id)
24 collection = arvados.collection.CollectionReader(self._collection_id)
25 filenames = [filename for filename in collection]
26 # Crunch2 has multiple stats files
27 if len(filenames) > 1:
28 filenames = ['crunchstat.txt', 'arv-mount.txt']
29 for filename in filenames:
31 self._readers.append(collection.open(filename))
33 logger.warn('Unable to open %s', filename)
34 self._label = "{}/{}".format(self._collection_id, filenames[0])
35 return itertools.chain(*[iter(reader) for reader in self._readers])
40 def __exit__(self, exc_type, exc_val, exc_tb):
42 for reader in self._readers:
47 class LiveLogReader(object):
50 def __init__(self, job_uuid):
51 self.job_uuid = job_uuid
52 self.event_types = (['stderr'] if '-8i9sb-' in job_uuid else ['crunchstat', 'arv-mount'])
53 logger.debug('load %s events for job %s', self.event_types, self.job_uuid)
58 def _get_all_pages(self):
62 ['object_uuid', '=', self.job_uuid],
63 ['event_type', 'in', self.event_types]]
66 page = arvados.api().logs().index(
69 filters=filters + [['id','>',str(last_id)]],
70 select=['id', 'properties'],
71 ).execute(num_retries=2)
72 got += len(page['items'])
74 '%s: received %d of %d log events',
76 got + page['items_available'] - len(page['items']))
77 for i in page['items']:
78 for line in i['properties']['text'].split('\n'):
79 self._queue.put(line+'\n')
81 if (len(page['items']) == 0 or
82 len(page['items']) >= page['items_available']):
85 self._queue.put(self.EOF)
88 self._queue = queue.Queue()
89 self._thread = threading.Thread(target=self._get_all_pages)
90 self._thread.daemon = True
95 line = self._queue.get()
101 next = __next__ # for Python 2
106 def __exit__(self, exc_type, exc_val, exc_tb):