X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/bba9a9be7d2686e9f8c40b59b8be8ffe1a511957..0841b144012a6cd54c927c2141d72411b0c86070:/tools/crunchstat-summary/crunchstat_summary/reader.py diff --git a/tools/crunchstat-summary/crunchstat_summary/reader.py b/tools/crunchstat-summary/crunchstat_summary/reader.py index e8f0861be4..8ccdbc2fcf 100644 --- a/tools/crunchstat-summary/crunchstat_summary/reader.py +++ b/tools/crunchstat-summary/crunchstat_summary/reader.py @@ -2,12 +2,10 @@ # # SPDX-License-Identifier: AGPL-3.0 -from __future__ import print_function - import arvados -import Queue +import itertools +import queue import threading -import _strptime from crunchstat_summary import logger @@ -16,7 +14,7 @@ class CollectionReader(object): def __init__(self, collection_id): self._collection_id = collection_id self._label = collection_id - self._reader = None + self._readers = [] def __str__(self): return self._label @@ -25,29 +23,34 @@ class CollectionReader(object): logger.debug('load collection %s', self._collection_id) collection = arvados.collection.CollectionReader(self._collection_id) filenames = [filename for filename in collection] - if len(filenames) == 1: - filename = filenames[0] - else: - filename = 'crunchstat.txt' - self._label = "{}/{}".format(self._collection_id, filename) - self._reader = collection.open(filename) - return iter(self._reader) + # Crunch2 has multiple stats files + if len(filenames) > 1: + filenames = ['crunchstat.txt', 'arv-mount.txt'] + for filename in filenames: + try: + self._readers.append(collection.open(filename)) + except IOError: + logger.warn('Unable to open %s', filename) + self._label = "{}/{}".format(self._collection_id, filenames[0]) + return itertools.chain(*[iter(reader) for reader in self._readers]) def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): - if self._reader: - self._reader.close() - self._reader = None + if self._readers: + for reader in self._readers: + reader.close() + self._readers = [] class LiveLogReader(object): EOF = None def __init__(self, job_uuid): - logger.debug('load stderr events for job %s', job_uuid) self.job_uuid = job_uuid + self.event_types = (['stderr'] if '-8i9sb-' in job_uuid else ['crunchstat', 'arv-mount']) + logger.debug('load %s events for job %s', self.event_types, self.job_uuid) def __str__(self): return self.job_uuid @@ -57,7 +60,7 @@ class LiveLogReader(object): last_id = 0 filters = [ ['object_uuid', '=', self.job_uuid], - ['event_type', '=', 'stderr']] + ['event_type', 'in', self.event_types]] try: while True: page = arvados.api().logs().index( @@ -82,19 +85,21 @@ class LiveLogReader(object): self._queue.put(self.EOF) def __iter__(self): - self._queue = Queue.Queue() + self._queue = queue.Queue() self._thread = threading.Thread(target=self._get_all_pages) self._thread.daemon = True self._thread.start() return self - def next(self): + def __next__(self): line = self._queue.get() if line is self.EOF: self._thread.join() raise StopIteration return line + next = __next__ # for Python 2 + def __enter__(self): return self