30 def locators_and_ranges(data_locators, range_start, range_size, debug=False):
32 Get blocks that are covered by the range
33 data_locators: list of [locator, block_size, block_start], assumes that blocks are in order and contigous
34 range_start: start of range
35 range_size: size of range
36 returns list of [block locator, blocksize, segment offset, segment size] that satisfies the range
41 range_start = long(range_start)
42 range_size = long(range_size)
43 range_end = range_start + range_size
46 # range_start/block_start is the inclusive lower bound
47 # range_end/block_end is the exclusive upper bound
49 hi = len(data_locators)
51 i = int((hi + lo) / 2)
52 block_size = data_locators[i][BLOCKSIZE]
53 block_start = data_locators[i][OFFSET]
54 block_end = block_start + block_size
57 # perform a binary search for the first block
58 # assumes that all of the blocks are contigious, so range_start is guaranteed
59 # to either fall into the range of a block or be outside the block range entirely
60 while not (range_start >= block_start and range_start < block_end):
62 # must be out of range, fail
64 if range_start > block_start:
68 i = int((hi + lo) / 2)
69 if debug: print lo, i, hi
70 block_size = data_locators[i][BLOCKSIZE]
71 block_start = data_locators[i][OFFSET]
72 block_end = block_start + block_size
74 while i < len(data_locators):
75 locator, block_size, block_start = data_locators[i]
76 block_end = block_start + block_size
78 print locator, "range_start", range_start, "block_start", block_start, "range_end", range_end, "block_end", block_end
79 if range_end <= block_start:
80 # range ends before this block starts, so don't look at any more locators
83 #if range_start >= block_end:
84 # range starts after this block ends, so go to next block
85 # we should always start at the first block due to the binary above, so this test is redundant
88 if range_start >= block_start and range_end <= block_end:
89 # range starts and ends in this block
90 resp.append([locator, block_size, range_start - block_start, range_size])
91 elif range_start >= block_start and range_end > block_end:
92 # range starts in this block
93 resp.append([locator, block_size, range_start - block_start, block_end - range_start])
94 elif range_start < block_start and range_end > block_end:
95 # range starts in a previous block and extends to further blocks
96 resp.append([locator, block_size, 0L, block_size])
97 elif range_start < block_start and range_end <= block_end:
98 # range starts in a previous block and ends in this block
99 resp.append([locator, block_size, 0L, range_end - block_start])
100 block_start = block_end
105 class StreamFileReader(object):
106 def __init__(self, stream, segments, name):
107 self._stream = stream
108 self.segments = segments
115 def decompressed_name(self):
116 return re.sub('\.(bz2|gz)$', '', self._name)
118 def stream_name(self):
119 return self._stream.name()
122 self._filepos = min(max(pos, 0L), self.size())
128 n = self.segments[-1]
129 return n[OFFSET] + n[BLOCKSIZE]
131 def read(self, size):
132 """Read up to 'size' bytes from the stream, starting at the current file position"""
137 for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, self._filepos, size):
138 data += self._stream.readfrom(locator+segmentoffset, segmentsize)
139 self._filepos += len(data)
142 def readfrom(self, start, size):
143 """Read up to 'size' bytes from the stream, starting at 'start'"""
148 for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, start, size):
149 data += self._stream.readfrom(locator+segmentoffset, segmentsize)
152 def readall(self, size=2**20):
154 data = self.read(size)
159 def decompress(self, decompress, size):
160 for segment in self.readall(size):
161 data = decompress(segment)
162 if data and data != '':
165 def readall_decompressed(self, size=2**20):
167 if re.search('\.bz2$', self._name):
168 dc = bz2.BZ2Decompressor()
169 return self.decompress(lambda segment: dc.decompress(segment), size)
170 elif re.search('\.gz$', self._name):
171 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
172 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment), size)
174 return self.readall(size)
176 def readlines(self, decompress=True):
178 datasource = self.readall_decompressed()
180 datasource = self.readall()
182 for newdata in datasource:
186 eol = string.find(data, "\n", sol)
189 yield data[sol:eol+1]
195 def as_manifest(self):
196 manifest_text = ['.']
197 manifest_text.extend([d[LOCATOR] for d in self._stream._data_locators])
198 manifest_text.extend(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], self.name().replace(' ', '\\040')) for seg in self.segments])
199 return arvados.CollectionReader(' '.join(manifest_text) + '\n').manifest_text()
201 class StreamReader(object):
202 def __init__(self, tokens, keep=None, debug=False, _empty=False):
203 self._stream_name = None
204 self._data_locators = []
205 self._files = collections.OrderedDict()
215 if debug: print 'tok', tok
216 if self._stream_name == None:
217 self._stream_name = tok.replace('\\040', ' ')
220 s = re.match(r'^[0-9a-f]{32}\+(\d+)(\+\S+)*$', tok)
222 blocksize = long(s.group(1))
223 self._data_locators.append([tok, blocksize, streamoffset])
224 streamoffset += blocksize
227 s = re.search(r'^(\d+):(\d+):(\S+)', tok)
229 pos = long(s.group(1))
230 size = long(s.group(2))
231 name = s.group(3).replace('\\040', ' ')
232 if name not in self._files:
233 self._files[name] = StreamFileReader(self, [[pos, size, 0]], name)
235 n = self._files[name]
236 n.segments.append([pos, size, n.size()])
239 raise errors.SyntaxError("Invalid manifest format")
242 return self._stream_name
248 return self._files.values()
251 n = self._data_locators[-1]
252 return n[OFFSET] + n[BLOCKSIZE]
254 def locators_and_ranges(self, range_start, range_size):
255 return locators_and_ranges(self._data_locators, range_start, range_size)
257 def readfrom(self, start, size):
258 """Read up to 'size' bytes from the stream, starting at 'start'"""
262 for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._data_locators, start, size):
263 data += self._keep.get(locator)[segmentoffset:segmentoffset+segmentsize]
266 def manifest_text(self, strip=False):
267 manifest_text = [self.name().replace(' ', '\\040')]
269 for d in self._data_locators:
270 m = re.match(r'^[0-9a-f]{32}\+\d+', d[LOCATOR])
271 manifest_text.append(m.group(0))
273 manifest_text.extend([d[LOCATOR] for d in self._data_locators])
274 manifest_text.extend([' '.join(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], f.name().replace(' ', '\\040'))
275 for seg in f.segments])
276 for f in self._files.values()])
277 return ' '.join(manifest_text) + '\n'