19 from arvados.retry import retry_method
29 def locators_and_ranges(data_locators, range_start, range_size, debug=False):
31 Get blocks that are covered by the range
32 data_locators: list of [locator, block_size, block_start], assumes that blocks are in order and contigous
33 range_start: start of range
34 range_size: size of range
35 returns list of [block locator, blocksize, segment offset, segment size] that satisfies the range
40 range_start = long(range_start)
41 range_size = long(range_size)
42 range_end = range_start + range_size
45 # range_start/block_start is the inclusive lower bound
46 # range_end/block_end is the exclusive upper bound
48 hi = len(data_locators)
50 i = int((hi + lo) / 2)
51 block_size = data_locators[i][BLOCKSIZE]
52 block_start = data_locators[i][OFFSET]
53 block_end = block_start + block_size
56 # perform a binary search for the first block
57 # assumes that all of the blocks are contigious, so range_start is guaranteed
58 # to either fall into the range of a block or be outside the block range entirely
59 while not (range_start >= block_start and range_start < block_end):
61 # must be out of range, fail
63 if range_start > block_start:
67 i = int((hi + lo) / 2)
68 if debug: print lo, i, hi
69 block_size = data_locators[i][BLOCKSIZE]
70 block_start = data_locators[i][OFFSET]
71 block_end = block_start + block_size
73 while i < len(data_locators):
74 locator, block_size, block_start = data_locators[i]
75 block_end = block_start + block_size
77 print locator, "range_start", range_start, "block_start", block_start, "range_end", range_end, "block_end", block_end
78 if range_end <= block_start:
79 # range ends before this block starts, so don't look at any more locators
82 #if range_start >= block_end:
83 # range starts after this block ends, so go to next block
84 # we should always start at the first block due to the binary above, so this test is redundant
87 if range_start >= block_start and range_end <= block_end:
88 # range starts and ends in this block
89 resp.append([locator, block_size, range_start - block_start, range_size])
90 elif range_start >= block_start and range_end > block_end:
91 # range starts in this block
92 resp.append([locator, block_size, range_start - block_start, block_end - range_start])
93 elif range_start < block_start and range_end > block_end:
94 # range starts in a previous block and extends to further blocks
95 resp.append([locator, block_size, 0L, block_size])
96 elif range_start < block_start and range_end <= block_end:
97 # range starts in a previous block and ends in this block
98 resp.append([locator, block_size, 0L, range_end - block_start])
99 block_start = block_end
104 class StreamFileReader(object):
105 def __init__(self, stream, segments, name):
106 self._stream = stream
107 self.segments = segments
110 self.num_retries = stream.num_retries
115 def decompressed_name(self):
116 return re.sub('\.(bz2|gz)$', '', self._name)
118 def stream_name(self):
119 return self._stream.name()
122 self._filepos = min(max(pos, 0L), self.size())
128 n = self.segments[-1]
129 return n[OFFSET] + n[BLOCKSIZE]
132 def read(self, size, num_retries=None):
133 """Read up to 'size' bytes from the stream, starting at the current file position"""
138 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
140 locator, blocksize, segmentoffset, segmentsize = available_chunks[0]
141 data = self._stream.readfrom(locator+segmentoffset, segmentsize,
142 num_retries=num_retries)
144 self._filepos += len(data)
148 def readfrom(self, start, size, num_retries=None):
149 """Read up to 'size' bytes from the stream, starting at 'start'"""
154 for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, start, size):
155 data.append(self._stream.readfrom(locator+segmentoffset, segmentsize,
156 num_retries=num_retries))
160 def readall(self, size=2**20, num_retries=None):
162 data = self.read(size, num_retries=num_retries)
168 def decompress(self, decompress, size, num_retries=None):
169 for segment in self.readall(size, num_retries):
170 data = decompress(segment)
171 if data and data != '':
175 def readall_decompressed(self, size=2**20, num_retries=None):
177 if re.search('\.bz2$', self._name):
178 dc = bz2.BZ2Decompressor()
179 return self.decompress(dc.decompress, size,
180 num_retries=num_retries)
181 elif re.search('\.gz$', self._name):
182 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
183 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
184 size, num_retries=num_retries)
186 return self.readall(size, num_retries=num_retries)
189 def readlines(self, decompress=True, num_retries=None):
190 read_func = self.readall_decompressed if decompress else self.readall
192 for newdata in read_func(num_retries=num_retries):
196 eol = string.find(data, "\n", sol)
199 yield data[sol:eol+1]
205 def as_manifest(self):
206 manifest_text = ['.']
207 manifest_text.extend([d[LOCATOR] for d in self._stream._data_locators])
208 manifest_text.extend(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], self.name().replace(' ', '\\040')) for seg in self.segments])
209 return arvados.CollectionReader(' '.join(manifest_text) + '\n').manifest_text(normalize=True)
212 class StreamReader(object):
213 def __init__(self, tokens, keep=None, debug=False, _empty=False,
215 self._stream_name = None
216 self._data_locators = []
217 self._files = collections.OrderedDict()
219 self.num_retries = num_retries
225 if debug: print 'tok', tok
226 if self._stream_name is None:
227 self._stream_name = tok.replace('\\040', ' ')
230 s = re.match(r'^[0-9a-f]{32}\+(\d+)(\+\S+)*$', tok)
232 blocksize = long(s.group(1))
233 self._data_locators.append([tok, blocksize, streamoffset])
234 streamoffset += blocksize
237 s = re.search(r'^(\d+):(\d+):(\S+)', tok)
239 pos = long(s.group(1))
240 size = long(s.group(2))
241 name = s.group(3).replace('\\040', ' ')
242 if name not in self._files:
243 self._files[name] = StreamFileReader(self, [[pos, size, 0]], name)
245 n = self._files[name]
246 n.segments.append([pos, size, n.size()])
249 raise errors.SyntaxError("Invalid manifest format")
252 return self._stream_name
258 return self._files.values()
261 n = self._data_locators[-1]
262 return n[OFFSET] + n[BLOCKSIZE]
264 def locators_and_ranges(self, range_start, range_size):
265 return locators_and_ranges(self._data_locators, range_start, range_size)
268 def readfrom(self, start, size, num_retries=None):
269 """Read up to 'size' bytes from the stream, starting at 'start'"""
272 if self._keep is None:
273 self._keep = KeepClient(num_retries=self.num_retries)
275 for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._data_locators, start, size):
276 data.append(self._keep.get(locator, num_retries=num_retries)[segmentoffset:segmentoffset+segmentsize])
279 def manifest_text(self, strip=False):
280 manifest_text = [self.name().replace(' ', '\\040')]
282 for d in self._data_locators:
283 m = re.match(r'^[0-9a-f]{32}\+\d+', d[LOCATOR])
284 manifest_text.append(m.group(0))
286 manifest_text.extend([d[LOCATOR] for d in self._data_locators])
287 manifest_text.extend([' '.join(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], f.name().replace(' ', '\\040'))
288 for seg in f.segments])
289 for f in self._files.values()])
290 return ' '.join(manifest_text) + '\n'