1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
5 from __future__ import print_function
12 from crunchstat_summary import logger
15 class CollectionReader(object):
16 def __init__(self, collection_id):
17 self._collection_id = collection_id
18 self._label = collection_id
25 logger.debug('load collection %s', self._collection_id)
26 collection = arvados.collection.CollectionReader(self._collection_id)
27 filenames = [filename for filename in collection]
28 if len(filenames) == 1:
29 filename = filenames[0]
31 filename = 'crunchstat.txt'
32 self._label = "{}/{}".format(self._collection_id, filename)
33 self._reader = collection.open(filename)
34 return iter(self._reader)
39 def __exit__(self, exc_type, exc_val, exc_tb):
45 class LiveLogReader(object):
48 def __init__(self, job_uuid):
49 self.job_uuid = job_uuid
50 self.event_types = (['stderr'] if '-8i9sb-' in job_uuid else ['crunchstat', 'arv-mount'])
51 logger.debug('load %s events for job %s', self.event_types, self.job_uuid)
56 def _get_all_pages(self):
60 ['object_uuid', '=', self.job_uuid],
61 ['event_type', 'in', self.event_types]]
64 page = arvados.api().logs().index(
67 filters=filters + [['id','>',str(last_id)]],
68 select=['id', 'properties'],
69 ).execute(num_retries=2)
70 got += len(page['items'])
72 '%s: received %d of %d log events',
74 got + page['items_available'] - len(page['items']))
75 for i in page['items']:
76 for line in i['properties']['text'].split('\n'):
77 self._queue.put(line+'\n')
79 if (len(page['items']) == 0 or
80 len(page['items']) >= page['items_available']):
83 self._queue.put(self.EOF)
86 self._queue = Queue.Queue()
87 self._thread = threading.Thread(target=self._get_all_pages)
88 self._thread.daemon = True
93 line = self._queue.get()
102 def __exit__(self, exc_type, exc_val, exc_tb):