31 def locators_and_ranges(data_locators, range_start, range_size, debug=False):
33 Get blocks that are covered by the range
34 data_locators: list of [locator, block_size, block_start], assumes that blocks are in order and contigous
35 range_start: start of range
36 range_size: size of range
37 returns list of [block locator, blocksize, segment offset, segment size] that satisfies the range
42 range_start = long(range_start)
43 range_size = long(range_size)
44 range_end = range_start + range_size
47 # range_start/block_start is the inclusive lower bound
48 # range_end/block_end is the exclusive upper bound
50 hi = len(data_locators)
52 i = int((hi + lo) / 2)
53 block_size = data_locators[i][BLOCKSIZE]
54 block_start = data_locators[i][OFFSET]
55 block_end = block_start + block_size
58 # perform a binary search for the first block
59 # assumes that all of the blocks are contigious, so range_start is guaranteed
60 # to either fall into the range of a block or be outside the block range entirely
61 while not (range_start >= block_start and range_start < block_end):
63 # must be out of range, fail
65 if range_start > block_start:
69 i = int((hi + lo) / 2)
70 if debug: print lo, i, hi
71 block_size = data_locators[i][BLOCKSIZE]
72 block_start = data_locators[i][OFFSET]
73 block_end = block_start + block_size
75 while i < len(data_locators):
76 locator, block_size, block_start = data_locators[i]
77 block_end = block_start + block_size
79 print locator, "range_start", range_start, "block_start", block_start, "range_end", range_end, "block_end", block_end
80 if range_end <= block_start:
81 # range ends before this block starts, so don't look at any more locators
84 #if range_start >= block_end:
85 # range starts after this block ends, so go to next block
86 # we should always start at the first block due to the binary above, so this test is redundant
89 if range_start >= block_start and range_end <= block_end:
90 # range starts and ends in this block
91 resp.append([locator, block_size, range_start - block_start, range_size])
92 elif range_start >= block_start and range_end > block_end:
93 # range starts in this block
94 resp.append([locator, block_size, range_start - block_start, block_end - range_start])
95 elif range_start < block_start and range_end > block_end:
96 # range starts in a previous block and extends to further blocks
97 resp.append([locator, block_size, 0L, block_size])
98 elif range_start < block_start and range_end <= block_end:
99 # range starts in a previous block and ends in this block
100 resp.append([locator, block_size, 0L, range_end - block_start])
101 block_start = block_end
106 class StreamFileReader(object):
107 def __init__(self, stream, segments, name):
108 self._stream = stream
109 self.segments = segments
116 def decompressed_name(self):
117 return re.sub('\.(bz2|gz)$', '', self._name)
119 def stream_name(self):
120 return self._stream.name()
123 self._filepos = min(max(pos, 0L), self.size())
129 n = self.segments[-1]
130 return n[OFFSET] + n[BLOCKSIZE]
132 def read(self, size):
133 """Read up to 'size' bytes from the stream, starting at the current file position"""
138 for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, self._filepos, size):
139 data += self._stream.readfrom(locator+segmentoffset, segmentsize)
140 self._filepos += len(data)
143 def readfrom(self, start, size):
144 """Read up to 'size' bytes from the stream, starting at 'start'"""
149 for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, start, size):
150 data += self._stream.readfrom(locator+segmentoffset, segmentsize)
153 def readall(self, size=2**20):
155 data = self.read(size)
160 def decompress(self, decompress, size):
161 for segment in self.readall(size):
162 data = decompress(segment)
163 if data and data != '':
166 def readall_decompressed(self, size=2**20):
168 if re.search('\.bz2$', self._name):
169 dc = bz2.BZ2Decompressor()
170 return self.decompress(lambda segment: dc.decompress(segment), size)
171 elif re.search('\.gz$', self._name):
172 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
173 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment), size)
175 return self.readall(size)
177 def readlines(self, decompress=True):
179 datasource = self.readall_decompressed()
181 datasource = self.readall()
183 for newdata in datasource:
187 eol = string.find(data, "\n", sol)
190 yield data[sol:eol+1]
196 def as_manifest(self):
197 manifest_text = ['.']
198 manifest_text.extend([d[LOCATOR] for d in self._stream._data_locators])
199 manifest_text.extend(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], self.name().replace(' ', '\\040')) for seg in self.segments])
200 return arvados.CollectionReader(' '.join(manifest_text) + '\n').manifest_text()
202 class StreamReader(object):
203 def __init__(self, tokens, keep=None, debug=False, _empty=False):
204 self._stream_name = None
205 self._data_locators = []
206 self._files = collections.OrderedDict()
211 self._keep = Keep.global_client_object()
217 if debug: print 'tok', tok
218 if self._stream_name == None:
219 self._stream_name = tok.replace('\\040', ' ')
222 s = re.match(r'^[0-9a-f]{32}\+(\d+)(\+\S+)*$', tok)
224 blocksize = long(s.group(1))
225 self._data_locators.append([tok, blocksize, streamoffset])
226 streamoffset += blocksize
229 s = re.search(r'^(\d+):(\d+):(\S+)', tok)
231 pos = long(s.group(1))
232 size = long(s.group(2))
233 name = s.group(3).replace('\\040', ' ')
234 if name not in self._files:
235 self._files[name] = StreamFileReader(self, [[pos, size, 0]], name)
237 n = self._files[name]
238 n.segments.append([pos, size, n.size()])
241 raise errors.SyntaxError("Invalid manifest format")
244 return self._stream_name
250 return self._files.values()
253 n = self._data_locators[-1]
254 return n[OFFSET] + n[BLOCKSIZE]
256 def locators_and_ranges(self, range_start, range_size):
257 return locators_and_ranges(self._data_locators, range_start, range_size)
259 def readfrom(self, start, size):
260 """Read up to 'size' bytes from the stream, starting at 'start'"""
264 for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._data_locators, start, size):
265 data += self._keep.get(locator)[segmentoffset:segmentoffset+segmentsize]
268 def manifest_text(self):
269 manifest_text = [self.name().replace(' ', '\\040')]
270 manifest_text.extend([d[LOCATOR] for d in self._data_locators])
271 manifest_text.extend([' '.join(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], f.name().replace(' ', '\\040'))
272 for seg in f.segments])
273 for f in self._files.values()])
274 return ' '.join(manifest_text) + '\n'