1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
5 from __future__ import print_function
12 from crunchstat_summary import logger
15 class CollectionReader(object):
16 def __init__(self, collection_id):
17 self._collection_id = collection_id
18 self._label = collection_id
25 logger.debug('load collection %s', self._collection_id)
26 collection = arvados.collection.CollectionReader(self._collection_id)
27 filenames = [filename for filename in collection]
28 if len(filenames) == 1:
29 filename = filenames[0]
31 filename = 'crunchstat.txt'
32 self._label = "{}/{}".format(self._collection_id, filename)
33 self._reader = collection.open(filename)
34 return iter(self._reader)
39 def __exit__(self, exc_type, exc_val, exc_tb):
45 class LiveLogReader(object):
48 def __init__(self, job_uuid):
49 logger.debug('load stderr events for job %s', job_uuid)
50 self.job_uuid = job_uuid
55 def _get_all_pages(self):
59 ['object_uuid', '=', self.job_uuid],
60 ['event_type', '=', 'stderr']]
63 page = arvados.api().logs().index(
66 filters=filters + [['id','>',str(last_id)]],
67 select=['id', 'properties'],
68 ).execute(num_retries=2)
69 got += len(page['items'])
71 '%s: received %d of %d log events',
73 got + page['items_available'] - len(page['items']))
74 for i in page['items']:
75 for line in i['properties']['text'].split('\n'):
76 self._queue.put(line+'\n')
78 if (len(page['items']) == 0 or
79 len(page['items']) >= page['items_available']):
82 self._queue.put(self.EOF)
85 self._queue = Queue.Queue()
86 self._thread = threading.Thread(target=self._get_all_pages)
87 self._thread.daemon = True
92 line = self._queue.get()
101 def __exit__(self, exc_type, exc_val, exc_tb):