1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
5 from __future__ import print_function
12 from crunchstat_summary import logger
15 class CollectionReader(object):
16 def __init__(self, collection_id):
17 logger.debug('load collection %s', collection_id)
18 collection = arvados.collection.CollectionReader(collection_id)
19 filenames = [filename for filename in collection]
20 if len(filenames) == 1:
21 filename = filenames[0]
23 filename = 'crunchstat.txt'
24 self._reader = collection.open(filename)
25 self._label = "{}/{}".format(collection_id, filename)
31 return iter(self._reader)
34 class LiveLogReader(object):
37 def __init__(self, job_uuid):
38 logger.debug('load stderr events for job %s', job_uuid)
39 self.job_uuid = job_uuid
44 def _get_all_pages(self):
48 ['object_uuid', '=', self.job_uuid],
49 ['event_type', '=', 'stderr']]
52 page = arvados.api().logs().index(
55 filters=filters + [['id','>',str(last_id)]],
56 select=['id', 'properties'],
57 ).execute(num_retries=2)
58 got += len(page['items'])
60 '%s: received %d of %d log events',
62 got + page['items_available'] - len(page['items']))
63 for i in page['items']:
64 for line in i['properties']['text'].split('\n'):
65 self._queue.put(line+'\n')
67 if (len(page['items']) == 0 or
68 len(page['items']) >= page['items_available']):
71 self._queue.put(self.EOF)
74 self._queue = Queue.Queue()
75 self._thread = threading.Thread(target=self._get_all_pages)
76 self._thread.daemon = True
81 line = self._queue.get()