1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
5 from __future__ import print_function
13 from crunchstat_summary import logger
16 class CollectionReader(object):
17 def __init__(self, collection_id):
18 self._collection_id = collection_id
19 self._label = collection_id
26 logger.debug('load collection %s', self._collection_id)
27 collection = arvados.collection.CollectionReader(self._collection_id)
28 filenames = [filename for filename in collection]
29 # Crunch2 has multiple stats files
30 if len(filenames) > 1:
31 filenames = ['crunchstat.txt', 'arv-mount.txt']
32 for filename in filenames:
33 self._readers.append(collection.open(filename))
34 self._label = "{}/{}".format(self._collection_id, filenames[0])
35 return itertools.chain(*[iter(reader) for reader in self._readers])
40 def __exit__(self, exc_type, exc_val, exc_tb):
42 for reader in self._readers:
47 class LiveLogReader(object):
50 def __init__(self, job_uuid):
51 self.job_uuid = job_uuid
52 self.event_types = (['stderr'] if '-8i9sb-' in job_uuid else ['crunchstat', 'arv-mount'])
53 logger.debug('load %s events for job %s', self.event_types, self.job_uuid)
58 def _get_all_pages(self):
62 ['object_uuid', '=', self.job_uuid],
63 ['event_type', 'in', self.event_types]]
66 page = arvados.api().logs().index(
69 filters=filters + [['id','>',str(last_id)]],
70 select=['id', 'properties'],
71 ).execute(num_retries=2)
72 got += len(page['items'])
74 '%s: received %d of %d log events',
76 got + page['items_available'] - len(page['items']))
77 for i in page['items']:
78 for line in i['properties']['text'].split('\n'):
79 self._queue.put(line+'\n')
81 if (len(page['items']) == 0 or
82 len(page['items']) >= page['items_available']):
85 self._queue.put(self.EOF)
88 self._queue = Queue.Queue()
89 self._thread = threading.Thread(target=self._get_all_pages)
90 self._thread.daemon = True
95 line = self._queue.get()
104 def __exit__(self, exc_type, exc_val, exc_tb):