21 from arvados.retry import retry_method
31 def locators_and_ranges(data_locators, range_start, range_size, debug=False):
33 Get blocks that are covered by the range
34 data_locators: list of [locator, block_size, block_start], assumes that blocks are in order and contigous
35 range_start: start of range
36 range_size: size of range
37 returns list of [block locator, blocksize, segment offset, segment size] that satisfies the range
42 range_start = long(range_start)
43 range_size = long(range_size)
44 range_end = range_start + range_size
47 # range_start/block_start is the inclusive lower bound
48 # range_end/block_end is the exclusive upper bound
50 hi = len(data_locators)
52 i = int((hi + lo) / 2)
53 block_size = data_locators[i][BLOCKSIZE]
54 block_start = data_locators[i][OFFSET]
55 block_end = block_start + block_size
58 # perform a binary search for the first block
59 # assumes that all of the blocks are contigious, so range_start is guaranteed
60 # to either fall into the range of a block or be outside the block range entirely
61 while not (range_start >= block_start and range_start < block_end):
63 # must be out of range, fail
65 if range_start > block_start:
69 i = int((hi + lo) / 2)
70 if debug: print lo, i, hi
71 block_size = data_locators[i][BLOCKSIZE]
72 block_start = data_locators[i][OFFSET]
73 block_end = block_start + block_size
75 while i < len(data_locators):
76 locator, block_size, block_start = data_locators[i]
77 block_end = block_start + block_size
79 print locator, "range_start", range_start, "block_start", block_start, "range_end", range_end, "block_end", block_end
80 if range_end <= block_start:
81 # range ends before this block starts, so don't look at any more locators
84 #if range_start >= block_end:
85 # range starts after this block ends, so go to next block
86 # we should always start at the first block due to the binary above, so this test is redundant
89 if range_start >= block_start and range_end <= block_end:
90 # range starts and ends in this block
91 resp.append([locator, block_size, range_start - block_start, range_size])
92 elif range_start >= block_start and range_end > block_end:
93 # range starts in this block
94 resp.append([locator, block_size, range_start - block_start, block_end - range_start])
95 elif range_start < block_start and range_end > block_end:
96 # range starts in a previous block and extends to further blocks
97 resp.append([locator, block_size, 0L, block_size])
98 elif range_start < block_start and range_end <= block_end:
99 # range starts in a previous block and ends in this block
100 resp.append([locator, block_size, 0L, range_end - block_start])
101 block_start = block_end
106 class StreamFileReader(object):
107 def __init__(self, stream, segments, name):
108 self._stream = stream
109 self.segments = segments
112 self.num_retries = stream.num_retries
117 def decompressed_name(self):
118 return re.sub('\.(bz2|gz)$', '', self._name)
120 def stream_name(self):
121 return self._stream.name()
124 self._filepos = min(max(pos, 0L), self.size())
130 n = self.segments[-1]
131 return n[OFFSET] + n[BLOCKSIZE]
134 def read(self, size, num_retries=None):
135 """Read up to 'size' bytes from the stream, starting at the current file position"""
140 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
142 locator, blocksize, segmentoffset, segmentsize = available_chunks[0]
143 data = self._stream.readfrom(locator+segmentoffset, segmentsize,
144 num_retries=num_retries)
146 self._filepos += len(data)
150 def readfrom(self, start, size, num_retries=None):
151 """Read up to 'size' bytes from the stream, starting at 'start'"""
156 for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, start, size):
157 data.append(self._stream.readfrom(locator+segmentoffset, segmentsize,
158 num_retries=num_retries))
162 def readall(self, size=2**20, num_retries=None):
164 data = self.read(size, num_retries=num_retries)
170 def decompress(self, decompress, size, num_retries=None):
171 for segment in self.readall(size, num_retries):
172 data = decompress(segment)
173 if data and data != '':
177 def readall_decompressed(self, size=2**20, num_retries=None):
179 if re.search('\.bz2$', self._name):
180 dc = bz2.BZ2Decompressor()
181 return self.decompress(dc.decompress, size,
182 num_retries=num_retries)
183 elif re.search('\.gz$', self._name):
184 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
185 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
186 size, num_retries=num_retries)
188 return self.readall(size, num_retries=num_retries)
191 def readlines(self, decompress=True, num_retries=None):
192 read_func = self.readall_decompressed if decompress else self.readall
194 for newdata in read_func(num_retries=num_retries):
198 eol = string.find(data, "\n", sol)
201 yield data[sol:eol+1]
207 def as_manifest(self):
208 manifest_text = ['.']
209 manifest_text.extend([d[LOCATOR] for d in self._stream._data_locators])
210 manifest_text.extend(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], self.name().replace(' ', '\\040')) for seg in self.segments])
211 return arvados.CollectionReader(' '.join(manifest_text) + '\n').manifest_text()
214 class StreamReader(object):
215 def __init__(self, tokens, keep=None, debug=False, _empty=False,
217 self._stream_name = None
218 self._data_locators = []
219 self._files = collections.OrderedDict()
221 self.num_retries = num_retries
227 if debug: print 'tok', tok
228 if self._stream_name == None:
229 self._stream_name = tok.replace('\\040', ' ')
232 s = re.match(r'^[0-9a-f]{32}\+(\d+)(\+\S+)*$', tok)
234 blocksize = long(s.group(1))
235 self._data_locators.append([tok, blocksize, streamoffset])
236 streamoffset += blocksize
239 s = re.search(r'^(\d+):(\d+):(\S+)', tok)
241 pos = long(s.group(1))
242 size = long(s.group(2))
243 name = s.group(3).replace('\\040', ' ')
244 if name not in self._files:
245 self._files[name] = StreamFileReader(self, [[pos, size, 0]], name)
247 n = self._files[name]
248 n.segments.append([pos, size, n.size()])
251 raise errors.SyntaxError("Invalid manifest format")
254 return self._stream_name
260 return self._files.values()
263 n = self._data_locators[-1]
264 return n[OFFSET] + n[BLOCKSIZE]
266 def locators_and_ranges(self, range_start, range_size):
267 return locators_and_ranges(self._data_locators, range_start, range_size)
270 def readfrom(self, start, size, num_retries=None):
271 """Read up to 'size' bytes from the stream, starting at 'start'"""
274 if self._keep is None:
275 self._keep = KeepClient(num_retries=self.num_retries)
277 for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._data_locators, start, size):
278 data.append(self._keep.get(locator, num_retries=num_retries)[segmentoffset:segmentoffset+segmentsize])
281 def manifest_text(self, strip=False):
282 manifest_text = [self.name().replace(' ', '\\040')]
284 for d in self._data_locators:
285 m = re.match(r'^[0-9a-f]{32}\+\d+', d[LOCATOR])
286 manifest_text.append(m.group(0))
288 manifest_text.extend([d[LOCATOR] for d in self._data_locators])
289 manifest_text.extend([' '.join(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], f.name().replace(' ', '\\040'))
290 for seg in f.segments])
291 for f in self._files.values()])
292 return ' '.join(manifest_text) + '\n'