1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
5 from __future__ import print_function
12 from crunchstat_summary import logger
15 class CollectionReader(object):
16 def __init__(self, collection_id):
17 self._collection_id = collection_id
18 self._label = collection_id
25 logger.debug('load collection %s', self._collection_id)
26 collection = arvados.collection.CollectionReader(self._collection_id)
27 filenames = [filename for filename in collection]
28 # Crunch2 has multiple stats files
29 if len(filenames) > 1:
30 filenames = ['crunchstat.txt', 'arv-mount.txt']
31 for filename in filenames:
33 self._readers.append(collection.open(filename))
35 logger.warn('Unable to open %s', filename)
36 self._label = "{}/{}".format(self._collection_id, filenames[0])
37 return itertools.chain(*[iter(reader) for reader in self._readers])
42 def __exit__(self, exc_type, exc_val, exc_tb):
44 for reader in self._readers:
49 class LiveLogReader(object):
52 def __init__(self, job_uuid):
53 self.job_uuid = job_uuid
54 self.event_types = (['stderr'] if '-8i9sb-' in job_uuid else ['crunchstat', 'arv-mount'])
55 logger.debug('load %s events for job %s', self.event_types, self.job_uuid)
60 def _get_all_pages(self):
64 ['object_uuid', '=', self.job_uuid],
65 ['event_type', 'in', self.event_types]]
68 page = arvados.api().logs().index(
71 filters=filters + [['id','>',str(last_id)]],
72 select=['id', 'properties'],
73 ).execute(num_retries=2)
74 got += len(page['items'])
76 '%s: received %d of %d log events',
78 got + page['items_available'] - len(page['items']))
79 for i in page['items']:
80 for line in i['properties']['text'].split('\n'):
81 self._queue.put(line+'\n')
83 if (len(page['items']) == 0 or
84 len(page['items']) >= page['items_available']):
87 self._queue.put(self.EOF)
90 self._queue = Queue.Queue()
91 self._thread = threading.Thread(target=self._get_all_pages)
92 self._thread.daemon = True
97 line = self._queue.get()
106 def __exit__(self, exc_type, exc_val, exc_tb):