1 from __future__ import print_function
8 from crunchstat_summary import logger
11 class CollectionReader(object):
12 def __init__(self, collection_id):
13 logger.debug('load collection %s', collection_id)
14 collection = arvados.collection.CollectionReader(collection_id)
15 filenames = [filename for filename in collection]
16 if len(filenames) != 1:
18 "collection {} has {} files; need exactly one".format(
19 collection_id, len(filenames)))
20 self._reader = collection.open(filenames[0])
21 self._label = "{}/{}".format(collection_id, filenames[0])
27 return iter(self._reader)
30 class LiveLogReader(object):
33 def __init__(self, job_uuid):
34 logger.debug('load stderr events for job %s', job_uuid)
35 self.job_uuid = job_uuid
40 def _get_all_pages(self):
44 ['object_uuid', '=', self.job_uuid],
45 ['event_type', '=', 'stderr']]
48 page = arvados.api().logs().index(
51 filters=filters + [['id','>',str(last_id)]],
52 select=['id', 'properties'],
53 ).execute(num_retries=2)
54 got += len(page['items'])
56 '%s: received %d of %d log events',
58 got + page['items_available'] - len(page['items']))
59 for i in page['items']:
60 for line in i['properties']['text'].split('\n'):
61 self._queue.put(line+'\n')
63 if (len(page['items']) == 0 or
64 len(page['items']) >= page['items_available']):
67 self._queue.put(self.EOF)
70 self._queue = Queue.Queue()
71 self._thread = threading.Thread(target=self._get_all_pages)
72 self._thread.daemon = True
77 line = self._queue.get()