From: Tom Clegg Date: Mon, 8 Feb 2016 19:18:42 +0000 (-0500) Subject: 8341: Use a Queue of lines and one thread, instead of a succession of threads and... X-Git-Tag: 1.1.0~1126^2 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/d7e8f7c787b7706937f95c3ed2a5086616d48514?ds=sidebyside 8341: Use a Queue of lines and one thread, instead of a succession of threads and a deque of buffers. --- diff --git a/tools/crunchstat-summary/crunchstat_summary/reader.py b/tools/crunchstat-summary/crunchstat_summary/reader.py index 52d7e80f58..049b48f063 100644 --- a/tools/crunchstat-summary/crunchstat_summary/reader.py +++ b/tools/crunchstat-summary/crunchstat_summary/reader.py @@ -1,7 +1,7 @@ from __future__ import print_function import arvados -import collections +import Queue import threading from crunchstat_summary import logger @@ -23,59 +23,47 @@ class CollectionReader(object): class LiveLogReader(object): + EOF = None + def __init__(self, job_uuid): logger.debug('load stderr events for job %s', job_uuid) self._filters = [ ['object_uuid', '=', job_uuid], ['event_type', '=', 'stderr']] - self._buffer = collections.deque() - self._got = 0 self._label = job_uuid - self._last_id = 0 - self._start_getting_next_page() - - def _start_getting_next_page(self): - self._thread = threading.Thread(target=self._get_next_page) - self._thread.daemon = True - self._thread.start() - def _get_next_page(self): - page = arvados.api().logs().index( - limit=1000, - order=['id asc'], - filters=self._filters + [['id','>',str(self._last_id)]], - ).execute() - self._got += len(page['items']) - logger.debug( - '%s: received %d of %d log events', - self._label, self._got, - self._got + page['items_available'] - len(page['items'])) - self._page = page - - def _buffer_page(self): - """Wait for current worker, copy results to _buffer, start next worker. - - Return True if anything was added to the buffer.""" - if self._thread is None: - return False - self._thread.join() - self._thread = None - page = self._page - if len(page['items']) == 0: - return False - if len(page['items']) < page['items_available']: - self._start_getting_next_page() - for i in page['items']: - for line in i['properties']['text'].split('\n'): - self._buffer.append(line) - self._last_id = i['id'] - return True + def _get_all_pages(self): + got = 0 + last_id = 0 + while True: + page = arvados.api().logs().index( + limit=1000, + order=['id asc'], + filters=self._filters + [['id','>',str(last_id)]], + ).execute(num_retries=2) + got += len(page['items']) + logger.debug( + '%s: received %d of %d log events', + self._label, got, + got + page['items_available'] - len(page['items'])) + for i in page['items']: + for line in i['properties']['text'].split('\n'): + self._queue.put(line+'\n') + last_id = i['id'] + if (len(page['items']) == 0 or + len(page['items']) >= page['items_available']): + break + self._queue.put(self.EOF) def __iter__(self): + self._queue = Queue.Queue() + self._thread = threading.Thread(target=self._get_all_pages) + self._thread.daemon = True + self._thread.start() return self def next(self): - if len(self._buffer) == 0: - if not self._buffer_page(): - raise StopIteration - return self._buffer.popleft() + '\n' + line = self._queue.get() + if line is self.EOF: + raise StopIteration + return line