30 def locators_and_ranges(data_locators, range_start, range_size, debug=False):
32 Get blocks that are covered by the range
33 data_locators: list of [locator, block_size, block_start], assumes that blocks are in order and contigous
34 range_start: start of range
35 range_size: size of range
36 returns list of [block locator, blocksize, segment offset, segment size] that satisfies the range
41 range_start = long(range_start)
42 range_size = long(range_size)
43 range_end = range_start + range_size
46 # range_start/block_start is the inclusive lower bound
47 # range_end/block_end is the exclusive upper bound
49 hi = len(data_locators)
51 i = int((hi + lo) / 2)
52 block_size = data_locators[i][BLOCKSIZE]
53 block_start = data_locators[i][OFFSET]
54 block_end = block_start + block_size
57 # perform a binary search for the first block
58 # assumes that all of the blocks are contigious, so range_start is guaranteed
59 # to either fall into the range of a block or be outside the block range entirely
60 while not (range_start >= block_start and range_start < block_end):
62 # must be out of range, fail
64 if range_start > block_start:
68 i = int((hi + lo) / 2)
69 if debug: print lo, i, hi
70 block_size = data_locators[i][BLOCKSIZE]
71 block_start = data_locators[i][OFFSET]
72 block_end = block_start + block_size
74 while i < len(data_locators):
75 locator, block_size, block_start = data_locators[i]
76 block_end = block_start + block_size
78 print locator, "range_start", range_start, "block_start", block_start, "range_end", range_end, "block_end", block_end
79 if range_end <= block_start:
80 # range ends before this block starts, so don't look at any more locators
83 #if range_start >= block_end:
84 # range starts after this block ends, so go to next block
85 # we should always start at the first block due to the binary above, so this test is redundant
88 if range_start >= block_start and range_end <= block_end:
89 # range starts and ends in this block
90 resp.append([locator, block_size, range_start - block_start, range_size])
91 elif range_start >= block_start and range_end > block_end:
92 # range starts in this block
93 resp.append([locator, block_size, range_start - block_start, block_end - range_start])
94 elif range_start < block_start and range_end > block_end:
95 # range starts in a previous block and extends to further blocks
96 resp.append([locator, block_size, 0L, block_size])
97 elif range_start < block_start and range_end <= block_end:
98 # range starts in a previous block and ends in this block
99 resp.append([locator, block_size, 0L, range_end - block_start])
100 block_start = block_end
105 class StreamFileReader(object):
106 def __init__(self, stream, segments, name):
107 self._stream = stream
108 self.segments = segments
115 def decompressed_name(self):
116 return re.sub('\.(bz2|gz)$', '', self._name)
118 def stream_name(self):
119 return self._stream.name()
122 self._filepos = min(max(pos, 0L), self.size())
128 n = self.segments[-1]
129 return n[OFFSET] + n[BLOCKSIZE]
131 def read(self, size):
132 """Read up to 'size' bytes from the stream, starting at the current file position"""
137 for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, self._filepos, size):
138 data += self._stream.readfrom(locator+segmentoffset, segmentsize)
139 self._filepos += len(data)
142 def readfrom(self, start, size):
143 """Read up to 'size' bytes from the stream, starting at 'start'"""
148 for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, start, size):
149 data += self._stream.readfrom(locator+segmentoffset, segmentsize)
152 def readall(self, size=2**20):
154 data = self.read(size)
159 def decompress(self, decompress, size):
160 for segment in self.readall(size):
161 data = decompress(segment)
162 if data and data != '':
165 def readall_decompressed(self, size=2**20):
167 if re.search('\.bz2$', self._name):
168 dc = bz2.BZ2Decompressor()
169 return self.decompress(lambda segment: dc.decompress(segment), size)
170 elif re.search('\.gz$', self._name):
171 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
172 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment), size)
174 return self.readall(size)
176 def readlines(self, decompress=True):
178 datasource = self.readall_decompressed()
180 datasource = self.readall()
182 for newdata in datasource:
186 eol = string.find(data, "\n", sol)
189 yield data[sol:eol+1]
195 def as_manifest(self):
196 manifest_text = ['.']
197 manifest_text.extend([d[LOCATOR] for d in self._stream._data_locators])
198 manifest_text.extend(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], self.name().replace(' ', '\\040')) for seg in self.segments])
199 return arvados.CollectionReader(' '.join(manifest_text) + '\n').manifest_text()
201 class StreamReader(object):
202 def __init__(self, tokens, keep=None, debug=False, _empty=False):
203 self._stream_name = None
204 self._data_locators = []
205 self._files = collections.OrderedDict()
210 self._keep = Keep.global_client_object()
216 if debug: print 'tok', tok
217 if self._stream_name == None:
218 self._stream_name = tok.replace('\\040', ' ')
221 s = re.match(r'^[0-9a-f]{32}\+(\d+)(\+\S+)*$', tok)
223 blocksize = long(s.group(1))
224 self._data_locators.append([tok, blocksize, streamoffset])
225 streamoffset += blocksize
228 s = re.search(r'^(\d+):(\d+):(\S+)', tok)
230 pos = long(s.group(1))
231 size = long(s.group(2))
232 name = s.group(3).replace('\\040', ' ')
233 if name not in self._files:
234 self._files[name] = StreamFileReader(self, [[pos, size, 0]], name)
236 n = self._files[name]
237 n.segments.append([pos, size, n.size()])
240 raise errors.SyntaxError("Invalid manifest format")
243 return self._stream_name
249 return self._files.values()
252 n = self._data_locators[-1]
253 return n[OFFSET] + n[BLOCKSIZE]
255 def locators_and_ranges(self, range_start, range_size):
256 return locators_and_ranges(self._data_locators, range_start, range_size)
258 def readfrom(self, start, size):
259 """Read up to 'size' bytes from the stream, starting at 'start'"""
263 for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._data_locators, start, size):
264 data += self._keep.get(locator)[segmentoffset:segmentoffset+segmentsize]
267 def manifest_text(self, strip=False):
268 manifest_text = [self.name().replace(' ', '\\040')]
270 for d in self._data_locators:
271 m = re.match(r'^[0-9a-f]{32}\+\d+', d[LOCATOR])
272 manifest_text.append(m.group(0))
274 manifest_text.extend([d[LOCATOR] for d in self._data_locators])
275 manifest_text.extend([' '.join(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], f.name().replace(' ', '\\040'))
276 for seg in f.segments])
277 for f in self._files.values()])
278 return ' '.join(manifest_text) + '\n'