Merge branch '3663-collection-reader-performance'
[arvados.git] / sdk / python / arvados / stream.py
1 import gflags
2 import httplib
3 import httplib2
4 import os
5 import pprint
6 import sys
7 import types
8 import subprocess
9 import json
10 import UserDict
11 import re
12 import hashlib
13 import string
14 import bz2
15 import zlib
16 import fcntl
17 import time
18 import threading
19 import collections
20
21 from keep import *
22 import config
23 import errors
24
25 LOCATOR = 0
26 BLOCKSIZE = 1
27 OFFSET = 2
28 SEGMENTSIZE = 3
29
30 def locators_and_ranges(data_locators, range_start, range_size, debug=False):
31     '''
32     Get blocks that are covered by the range
33     data_locators: list of [locator, block_size, block_start], assumes that blocks are in order and contigous
34     range_start: start of range
35     range_size: size of range
36     returns list of [block locator, blocksize, segment offset, segment size] that satisfies the range
37     '''
38     if range_size == 0:
39         return []
40     resp = []
41     range_start = long(range_start)
42     range_size = long(range_size)
43     range_end = range_start + range_size
44     block_start = 0L
45
46     # range_start/block_start is the inclusive lower bound
47     # range_end/block_end is the exclusive upper bound
48
49     hi = len(data_locators)
50     lo = 0
51     i = int((hi + lo) / 2)
52     block_size = data_locators[i][BLOCKSIZE]
53     block_start = data_locators[i][OFFSET]
54     block_end = block_start + block_size
55     if debug: print '---'
56
57     # perform a binary search for the first block
58     # assumes that all of the blocks are contigious, so range_start is guaranteed
59     # to either fall into the range of a block or be outside the block range entirely
60     while not (range_start >= block_start and range_start < block_end):
61         if lo == i:
62             # must be out of range, fail
63             return []
64         if range_start > block_start:
65             lo = i
66         else:
67             hi = i
68         i = int((hi + lo) / 2)
69         if debug: print lo, i, hi
70         block_size = data_locators[i][BLOCKSIZE]
71         block_start = data_locators[i][OFFSET]
72         block_end = block_start + block_size
73
74     while i < len(data_locators):
75         locator, block_size, block_start = data_locators[i]
76         block_end = block_start + block_size
77         if debug:
78             print locator, "range_start", range_start, "block_start", block_start, "range_end", range_end, "block_end", block_end
79         if range_end <= block_start:
80             # range ends before this block starts, so don't look at any more locators
81             break
82
83         #if range_start >= block_end:
84             # range starts after this block ends, so go to next block
85             # we should always start at the first block due to the binary above, so this test is redundant
86             #next
87
88         if range_start >= block_start and range_end <= block_end:
89             # range starts and ends in this block
90             resp.append([locator, block_size, range_start - block_start, range_size])
91         elif range_start >= block_start and range_end > block_end:
92             # range starts in this block
93             resp.append([locator, block_size, range_start - block_start, block_end - range_start])
94         elif range_start < block_start and range_end > block_end:
95             # range starts in a previous block and extends to further blocks
96             resp.append([locator, block_size, 0L, block_size])
97         elif range_start < block_start and range_end <= block_end:
98             # range starts in a previous block and ends in this block
99             resp.append([locator, block_size, 0L, range_end - block_start])
100         block_start = block_end
101         i += 1
102     return resp
103
104
105 class StreamFileReader(object):
106     def __init__(self, stream, segments, name):
107         self._stream = stream
108         self.segments = segments
109         self._name = name
110         self._filepos = 0L
111
112     def name(self):
113         return self._name
114
115     def decompressed_name(self):
116         return re.sub('\.(bz2|gz)$', '', self._name)
117
118     def stream_name(self):
119         return self._stream.name()
120
121     def seek(self, pos):
122         self._filepos = min(max(pos, 0L), self.size())
123
124     def tell(self):
125         return self._filepos
126
127     def size(self):
128         n = self.segments[-1]
129         return n[OFFSET] + n[BLOCKSIZE]
130
131     def read(self, size):
132         """Read up to 'size' bytes from the stream, starting at the current file position"""
133         if size == 0:
134             return ''
135
136         data = ''
137         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
138         if available_chunks:
139             locator, blocksize, segmentoffset, segmentsize = available_chunks[0]
140             data = self._stream.readfrom(locator+segmentoffset, segmentsize)
141
142         self._filepos += len(data)
143         return data
144
145     def readfrom(self, start, size):
146         """Read up to 'size' bytes from the stream, starting at 'start'"""
147         if size == 0:
148             return ''
149
150         data = ''
151         for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, start, size):
152             data += self._stream.readfrom(locator+segmentoffset, segmentsize)
153         return data
154
155     def readall(self, size=2**20):
156         while True:
157             data = self.read(size)
158             if data == '':
159                 break
160             yield data
161
162     def decompress(self, decompress, size):
163         for segment in self.readall(size):
164             data = decompress(segment)
165             if data and data != '':
166                 yield data
167
168     def readall_decompressed(self, size=2**20):
169         self.seek(0)
170         if re.search('\.bz2$', self._name):
171             dc = bz2.BZ2Decompressor()
172             return self.decompress(lambda segment: dc.decompress(segment), size)
173         elif re.search('\.gz$', self._name):
174             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
175             return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment), size)
176         else:
177             return self.readall(size)
178
179     def readlines(self, decompress=True):
180         if decompress:
181             datasource = self.readall_decompressed()
182         else:
183             datasource = self.readall()
184         data = ''
185         for newdata in datasource:
186             data += newdata
187             sol = 0
188             while True:
189                 eol = string.find(data, "\n", sol)
190                 if eol < 0:
191                     break
192                 yield data[sol:eol+1]
193                 sol = eol+1
194             data = data[sol:]
195         if data != '':
196             yield data
197
198     def as_manifest(self):
199         manifest_text = ['.']
200         manifest_text.extend([d[LOCATOR] for d in self._stream._data_locators])
201         manifest_text.extend(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], self.name().replace(' ', '\\040')) for seg in self.segments])
202         return arvados.CollectionReader(' '.join(manifest_text) + '\n').manifest_text()
203
204 class StreamReader(object):
205     def __init__(self, tokens, keep=None, debug=False, _empty=False):
206         self._stream_name = None
207         self._data_locators = []
208         self._files = collections.OrderedDict()
209
210         if keep is None:
211             keep = KeepClient()
212         self._keep = keep
213
214         streamoffset = 0L
215
216         # parse stream
217         for tok in tokens:
218             if debug: print 'tok', tok
219             if self._stream_name == None:
220                 self._stream_name = tok.replace('\\040', ' ')
221                 continue
222
223             s = re.match(r'^[0-9a-f]{32}\+(\d+)(\+\S+)*$', tok)
224             if s:
225                 blocksize = long(s.group(1))
226                 self._data_locators.append([tok, blocksize, streamoffset])
227                 streamoffset += blocksize
228                 continue
229
230             s = re.search(r'^(\d+):(\d+):(\S+)', tok)
231             if s:
232                 pos = long(s.group(1))
233                 size = long(s.group(2))
234                 name = s.group(3).replace('\\040', ' ')
235                 if name not in self._files:
236                     self._files[name] = StreamFileReader(self, [[pos, size, 0]], name)
237                 else:
238                     n = self._files[name]
239                     n.segments.append([pos, size, n.size()])
240                 continue
241
242             raise errors.SyntaxError("Invalid manifest format")
243
244     def name(self):
245         return self._stream_name
246
247     def files(self):
248         return self._files
249
250     def all_files(self):
251         return self._files.values()
252
253     def size(self):
254         n = self._data_locators[-1]
255         return n[OFFSET] + n[BLOCKSIZE]
256
257     def locators_and_ranges(self, range_start, range_size):
258         return locators_and_ranges(self._data_locators, range_start, range_size)
259
260     def readfrom(self, start, size):
261         """Read up to 'size' bytes from the stream, starting at 'start'"""
262         if size == 0:
263             return ''
264         data = ''
265         for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._data_locators, start, size):
266             data += self._keep.get(locator)[segmentoffset:segmentoffset+segmentsize]
267         return data
268
269     def manifest_text(self, strip=False):
270         manifest_text = [self.name().replace(' ', '\\040')]
271         if strip:
272             for d in self._data_locators:
273                 m = re.match(r'^[0-9a-f]{32}\+\d+', d[LOCATOR])
274                 manifest_text.append(m.group(0))
275         else:
276             manifest_text.extend([d[LOCATOR] for d in self._data_locators])
277         manifest_text.extend([' '.join(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], f.name().replace(' ', '\\040'))
278                                         for seg in f.segments])
279                               for f in self._files.values()])
280         return ' '.join(manifest_text) + '\n'