8784: Fix test for latest firefox.
[arvados.git] / tools / crunchstat-summary / crunchstat_summary / reader.py
1 from __future__ import print_function
2
3 import arvados
4 import Queue
5 import threading
6 import _strptime
7
8 from crunchstat_summary import logger
9
10
11 class CollectionReader(object):
12     def __init__(self, collection_id):
13         logger.debug('load collection %s', collection_id)
14         collection = arvados.collection.CollectionReader(collection_id)
15         filenames = [filename for filename in collection]
16         if len(filenames) != 1:
17             raise ValueError(
18                 "collection {} has {} files; need exactly one".format(
19                     collection_id, len(filenames)))
20         self._reader = collection.open(filenames[0])
21         self._label = "{}/{}".format(collection_id, filenames[0])
22
23     def __str__(self):
24         return self._label
25
26     def __iter__(self):
27         return iter(self._reader)
28
29
30 class LiveLogReader(object):
31     EOF = None
32
33     def __init__(self, job_uuid):
34         logger.debug('load stderr events for job %s', job_uuid)
35         self.job_uuid = job_uuid
36
37     def __str__(self):
38         return self.job_uuid
39
40     def _get_all_pages(self):
41         got = 0
42         last_id = 0
43         filters = [
44             ['object_uuid', '=', self.job_uuid],
45             ['event_type', '=', 'stderr']]
46         try:
47             while True:
48                 page = arvados.api().logs().index(
49                     limit=1000,
50                     order=['id asc'],
51                     filters=filters + [['id','>',str(last_id)]],
52                     select=['id', 'properties'],
53                 ).execute(num_retries=2)
54                 got += len(page['items'])
55                 logger.debug(
56                     '%s: received %d of %d log events',
57                     self.job_uuid, got,
58                     got + page['items_available'] - len(page['items']))
59                 for i in page['items']:
60                     for line in i['properties']['text'].split('\n'):
61                         self._queue.put(line+'\n')
62                     last_id = i['id']
63                 if (len(page['items']) == 0 or
64                     len(page['items']) >= page['items_available']):
65                     break
66         finally:
67             self._queue.put(self.EOF)
68
69     def __iter__(self):
70         self._queue = Queue.Queue()
71         self._thread = threading.Thread(target=self._get_all_pages)
72         self._thread.daemon = True
73         self._thread.start()
74         return self
75
76     def next(self):
77         line = self._queue.get()
78         if line is self.EOF:
79             self._thread.join()
80             raise StopIteration
81         return line