8341: Retrieve only the log attributes that actually get used.
[arvados.git] / tools / crunchstat-summary / crunchstat_summary / reader.py
1 from __future__ import print_function
2
3 import arvados
4 import Queue
5 import threading
6
7 from crunchstat_summary import logger
8
9
10 class CollectionReader(object):
11     def __init__(self, collection_id):
12         logger.debug('load collection %s', collection_id)
13         collection = arvados.collection.CollectionReader(collection_id)
14         filenames = [filename for filename in collection]
15         if len(filenames) != 1:
16             raise ValueError(
17                 "collection {} has {} files; need exactly one".format(
18                     collection_id, len(filenames)))
19         self._reader = collection.open(filenames[0])
20         self._label = "{}/{}".format(collection_id, filenames[0])
21
22     def __str__(self):
23         return self._label
24
25     def __iter__(self):
26         return iter(self._reader)
27
28
29 class LiveLogReader(object):
30     EOF = None
31
32     def __init__(self, job_uuid):
33         logger.debug('load stderr events for job %s', job_uuid)
34         self.job_uuid = job_uuid
35
36     def __str__(self):
37         return self.job_uuid
38
39     def _get_all_pages(self):
40         got = 0
41         last_id = 0
42         filters = [
43             ['object_uuid', '=', self.job_uuid],
44             ['event_type', '=', 'stderr']]
45         try:
46             while True:
47                 page = arvados.api().logs().index(
48                     limit=1000,
49                     order=['id asc'],
50                     filters=filters + [['id','>',str(last_id)]],
51                     select=['id', 'properties'],
52                 ).execute(num_retries=2)
53                 got += len(page['items'])
54                 logger.debug(
55                     '%s: received %d of %d log events',
56                     self.job_uuid, got,
57                     got + page['items_available'] - len(page['items']))
58                 for i in page['items']:
59                     for line in i['properties']['text'].split('\n'):
60                         self._queue.put(line+'\n')
61                     last_id = i['id']
62                 if (len(page['items']) == 0 or
63                     len(page['items']) >= page['items_available']):
64                     break
65         finally:
66             self._queue.put(self.EOF)
67
68     def __iter__(self):
69         self._queue = Queue.Queue()
70         self._thread = threading.Thread(target=self._get_all_pages)
71         self._thread.daemon = True
72         self._thread.start()
73         return self
74
75     def next(self):
76         line = self._queue.get()
77         if line is self.EOF:
78             self._thread.join()
79             raise StopIteration
80         return line