30 def locators_and_ranges(data_locators, range_start, range_size, debug=False):
32 Get blocks that are covered by the range
33 data_locators: list of [locator, block_size, block_start], assumes that blocks are in order and contigous
34 range_start: start of range
35 range_size: size of range
36 returns list of [block locator, blocksize, segment offset, segment size] that satisfies the range
41 range_start = long(range_start)
42 range_size = long(range_size)
43 range_end = range_start + range_size
46 # range_start/block_start is the inclusive lower bound
47 # range_end/block_end is the exclusive upper bound
49 hi = len(data_locators)
51 i = int((hi + lo) / 2)
52 block_size = data_locators[i][BLOCKSIZE]
53 block_start = data_locators[i][OFFSET]
54 block_end = block_start + block_size
57 # perform a binary search for the first block
58 # assumes that all of the blocks are contigious, so range_start is guaranteed
59 # to either fall into the range of a block or be outside the block range entirely
60 while not (range_start >= block_start and range_start < block_end):
62 # must be out of range, fail
64 if range_start > block_start:
68 i = int((hi + lo) / 2)
69 if debug: print lo, i, hi
70 block_size = data_locators[i][BLOCKSIZE]
71 block_start = data_locators[i][OFFSET]
72 block_end = block_start + block_size
74 while i < len(data_locators):
75 locator, block_size, block_start = data_locators[i]
76 block_end = block_start + block_size
78 print locator, "range_start", range_start, "block_start", block_start, "range_end", range_end, "block_end", block_end
79 if range_end <= block_start:
80 # range ends before this block starts, so don't look at any more locators
83 #if range_start >= block_end:
84 # range starts after this block ends, so go to next block
85 # we should always start at the first block due to the binary above, so this test is redundant
88 if range_start >= block_start and range_end <= block_end:
89 # range starts and ends in this block
90 resp.append([locator, block_size, range_start - block_start, range_size])
91 elif range_start >= block_start and range_end > block_end:
92 # range starts in this block
93 resp.append([locator, block_size, range_start - block_start, block_end - range_start])
94 elif range_start < block_start and range_end > block_end:
95 # range starts in a previous block and extends to further blocks
96 resp.append([locator, block_size, 0L, block_size])
97 elif range_start < block_start and range_end <= block_end:
98 # range starts in a previous block and ends in this block
99 resp.append([locator, block_size, 0L, range_end - block_start])
100 block_start = block_end
105 class StreamFileReader(object):
106 def __init__(self, stream, segments, name):
107 self._stream = stream
108 self.segments = segments
115 def decompressed_name(self):
116 return re.sub('\.(bz2|gz)$', '', self._name)
118 def stream_name(self):
119 return self._stream.name()
122 self._filepos = min(max(pos, 0L), self.size())
128 n = self.segments[-1]
129 return n[OFFSET] + n[BLOCKSIZE]
131 def read(self, size):
132 """Read up to 'size' bytes from the stream, starting at the current file position"""
137 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
139 locator, blocksize, segmentoffset, segmentsize = available_chunks[0]
140 data = self._stream.readfrom(locator+segmentoffset, segmentsize)
142 self._filepos += len(data)
145 def readfrom(self, start, size):
146 """Read up to 'size' bytes from the stream, starting at 'start'"""
151 for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, start, size):
152 data += self._stream.readfrom(locator+segmentoffset, segmentsize)
155 def readall(self, size=2**20):
157 data = self.read(size)
162 def decompress(self, decompress, size):
163 for segment in self.readall(size):
164 data = decompress(segment)
165 if data and data != '':
168 def readall_decompressed(self, size=2**20):
170 if re.search('\.bz2$', self._name):
171 dc = bz2.BZ2Decompressor()
172 return self.decompress(lambda segment: dc.decompress(segment), size)
173 elif re.search('\.gz$', self._name):
174 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
175 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment), size)
177 return self.readall(size)
179 def readlines(self, decompress=True):
181 datasource = self.readall_decompressed()
183 datasource = self.readall()
185 for newdata in datasource:
189 eol = string.find(data, "\n", sol)
192 yield data[sol:eol+1]
198 def as_manifest(self):
199 manifest_text = ['.']
200 manifest_text.extend([d[LOCATOR] for d in self._stream._data_locators])
201 manifest_text.extend(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], self.name().replace(' ', '\\040')) for seg in self.segments])
202 return arvados.CollectionReader(' '.join(manifest_text) + '\n').manifest_text()
204 class StreamReader(object):
205 def __init__(self, tokens, keep=None, debug=False, _empty=False):
206 self._stream_name = None
207 self._data_locators = []
208 self._files = collections.OrderedDict()
218 if debug: print 'tok', tok
219 if self._stream_name == None:
220 self._stream_name = tok.replace('\\040', ' ')
223 s = re.match(r'^[0-9a-f]{32}\+(\d+)(\+\S+)*$', tok)
225 blocksize = long(s.group(1))
226 self._data_locators.append([tok, blocksize, streamoffset])
227 streamoffset += blocksize
230 s = re.search(r'^(\d+):(\d+):(\S+)', tok)
232 pos = long(s.group(1))
233 size = long(s.group(2))
234 name = s.group(3).replace('\\040', ' ')
235 if name not in self._files:
236 self._files[name] = StreamFileReader(self, [[pos, size, 0]], name)
238 n = self._files[name]
239 n.segments.append([pos, size, n.size()])
242 raise errors.SyntaxError("Invalid manifest format")
245 return self._stream_name
251 return self._files.values()
254 n = self._data_locators[-1]
255 return n[OFFSET] + n[BLOCKSIZE]
257 def locators_and_ranges(self, range_start, range_size):
258 return locators_and_ranges(self._data_locators, range_start, range_size)
260 def readfrom(self, start, size):
261 """Read up to 'size' bytes from the stream, starting at 'start'"""
265 for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._data_locators, start, size):
266 data += self._keep.get(locator)[segmentoffset:segmentoffset+segmentsize]
269 def manifest_text(self, strip=False):
270 manifest_text = [self.name().replace(' ', '\\040')]
272 for d in self._data_locators:
273 m = re.match(r'^[0-9a-f]{32}\+\d+', d[LOCATOR])
274 manifest_text.append(m.group(0))
276 manifest_text.extend([d[LOCATOR] for d in self._data_locators])
277 manifest_text.extend([' '.join(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], f.name().replace(' ', '\\040'))
278 for seg in f.segments])
279 for f in self._files.values()])
280 return ' '.join(manifest_text) + '\n'