8 from .arvfile import ArvadosFileBase
9 from arvados.retry import retry_method
19 def locators_and_ranges(data_locators, range_start, range_size, debug=False):
21 Get blocks that are covered by the range
22 data_locators: list of [locator, block_size, block_start], assumes that blocks are in order and contigous
23 range_start: start of range
24 range_size: size of range
25 returns list of [block locator, blocksize, segment offset, segment size] that satisfies the range
30 range_start = long(range_start)
31 range_size = long(range_size)
32 range_end = range_start + range_size
35 # range_start/block_start is the inclusive lower bound
36 # range_end/block_end is the exclusive upper bound
38 hi = len(data_locators)
40 i = int((hi + lo) / 2)
41 block_size = data_locators[i][BLOCKSIZE]
42 block_start = data_locators[i][OFFSET]
43 block_end = block_start + block_size
46 # perform a binary search for the first block
47 # assumes that all of the blocks are contigious, so range_start is guaranteed
48 # to either fall into the range of a block or be outside the block range entirely
49 while not (range_start >= block_start and range_start < block_end):
51 # must be out of range, fail
53 if range_start > block_start:
57 i = int((hi + lo) / 2)
58 if debug: print lo, i, hi
59 block_size = data_locators[i][BLOCKSIZE]
60 block_start = data_locators[i][OFFSET]
61 block_end = block_start + block_size
63 while i < len(data_locators):
64 locator, block_size, block_start = data_locators[i]
65 block_end = block_start + block_size
67 print locator, "range_start", range_start, "block_start", block_start, "range_end", range_end, "block_end", block_end
68 if range_end <= block_start:
69 # range ends before this block starts, so don't look at any more locators
72 #if range_start >= block_end:
73 # range starts after this block ends, so go to next block
74 # we should always start at the first block due to the binary above, so this test is redundant
77 if range_start >= block_start and range_end <= block_end:
78 # range starts and ends in this block
79 resp.append([locator, block_size, range_start - block_start, range_size])
80 elif range_start >= block_start and range_end > block_end:
81 # range starts in this block
82 resp.append([locator, block_size, range_start - block_start, block_end - range_start])
83 elif range_start < block_start and range_end > block_end:
84 # range starts in a previous block and extends to further blocks
85 resp.append([locator, block_size, 0L, block_size])
86 elif range_start < block_start and range_end <= block_end:
87 # range starts in a previous block and ends in this block
88 resp.append([locator, block_size, 0L, range_end - block_start])
89 block_start = block_end
94 """split(path) -> streamname, filename
96 Separate the stream name and file name in a /-separated stream path.
97 If no stream name is available, assume '.'.
100 stream_name, file_name = path.rsplit('/', 1)
101 except ValueError: # No / in string
102 stream_name, file_name = '.', path
103 return stream_name, file_name
105 class StreamFileReader(ArvadosFileBase):
106 class _NameAttribute(str):
107 # The Python file API provides a plain .name attribute.
108 # Older SDK provided a name() method.
109 # This class provides both, for maximum compatibility.
114 def __init__(self, stream, segments, name):
115 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb')
116 self._stream = stream
117 self.segments = segments
119 self.num_retries = stream.num_retries
120 self._readline_cache = (None, None)
124 data = self.readline()
129 def decompressed_name(self):
130 return re.sub('\.(bz2|gz)$', '', self.name)
132 def stream_name(self):
133 return self._stream.name()
135 @ArvadosFileBase._before_close
136 def seek(self, pos, whence=os.SEEK_CUR):
137 if whence == os.SEEK_CUR:
139 elif whence == os.SEEK_END:
141 self._filepos = min(max(pos, 0L), self.size())
147 n = self.segments[-1]
148 return n[OFFSET] + n[BLOCKSIZE]
150 @ArvadosFileBase._before_close
152 def read(self, size, num_retries=None):
153 """Read up to 'size' bytes from the stream, starting at the current file position"""
158 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
160 locator, blocksize, segmentoffset, segmentsize = available_chunks[0]
161 data = self._stream.readfrom(locator+segmentoffset, segmentsize,
162 num_retries=num_retries)
164 self._filepos += len(data)
167 @ArvadosFileBase._before_close
169 def readfrom(self, start, size, num_retries=None):
170 """Read up to 'size' bytes from the stream, starting at 'start'"""
175 for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, start, size):
176 data.append(self._stream.readfrom(locator+segmentoffset, segmentsize,
177 num_retries=num_retries))
180 @ArvadosFileBase._before_close
182 def readall(self, size=2**20, num_retries=None):
184 data = self.read(size, num_retries=num_retries)
189 @ArvadosFileBase._before_close
191 def readline(self, size=float('inf'), num_retries=None):
192 cache_pos, cache_data = self._readline_cache
193 if self.tell() == cache_pos:
197 data_size = len(data[-1])
198 while (data_size < size) and ('\n' not in data[-1]):
199 next_read = self.read(2 ** 20, num_retries=num_retries)
202 data.append(next_read)
203 data_size += len(next_read)
206 nextline_index = data.index('\n') + 1
208 nextline_index = len(data)
209 nextline_index = min(nextline_index, size)
210 self._readline_cache = (self.tell(), data[nextline_index:])
211 return data[:nextline_index]
213 @ArvadosFileBase._before_close
215 def decompress(self, decompress, size, num_retries=None):
216 for segment in self.readall(size, num_retries):
217 data = decompress(segment)
221 @ArvadosFileBase._before_close
223 def readall_decompressed(self, size=2**20, num_retries=None):
225 if self.name.endswith('.bz2'):
226 dc = bz2.BZ2Decompressor()
227 return self.decompress(dc.decompress, size,
228 num_retries=num_retries)
229 elif self.name.endswith('.gz'):
230 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
231 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
232 size, num_retries=num_retries)
234 return self.readall(size, num_retries=num_retries)
236 @ArvadosFileBase._before_close
238 def readlines(self, sizehint=float('inf'), num_retries=None):
241 for s in self.readall(num_retries=num_retries):
244 if data_size >= sizehint:
246 return ''.join(data).splitlines(True)
248 def as_manifest(self):
249 manifest_text = ['.']
250 manifest_text.extend([d[LOCATOR] for d in self._stream._data_locators])
251 manifest_text.extend(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], self.name().replace(' ', '\\040')) for seg in self.segments])
252 return arvados.CollectionReader(' '.join(manifest_text) + '\n').manifest_text(normalize=True)
255 class StreamReader(object):
256 def __init__(self, tokens, keep=None, debug=False, _empty=False,
258 self._stream_name = None
259 self._data_locators = []
260 self._files = collections.OrderedDict()
262 self.num_retries = num_retries
268 if debug: print 'tok', tok
269 if self._stream_name is None:
270 self._stream_name = tok.replace('\\040', ' ')
273 s = re.match(r'^[0-9a-f]{32}\+(\d+)(\+\S+)*$', tok)
275 blocksize = long(s.group(1))
276 self._data_locators.append([tok, blocksize, streamoffset])
277 streamoffset += blocksize
280 s = re.search(r'^(\d+):(\d+):(\S+)', tok)
282 pos = long(s.group(1))
283 size = long(s.group(2))
284 name = s.group(3).replace('\\040', ' ')
285 if name not in self._files:
286 self._files[name] = StreamFileReader(self, [[pos, size, 0]], name)
288 n = self._files[name]
289 n.segments.append([pos, size, n.size()])
292 raise errors.SyntaxError("Invalid manifest format")
295 return self._stream_name
301 return self._files.values()
304 n = self._data_locators[-1]
305 return n[OFFSET] + n[BLOCKSIZE]
307 def locators_and_ranges(self, range_start, range_size):
308 return locators_and_ranges(self._data_locators, range_start, range_size)
311 def readfrom(self, start, size, num_retries=None):
312 """Read up to 'size' bytes from the stream, starting at 'start'"""
315 if self._keep is None:
316 self._keep = KeepClient(num_retries=self.num_retries)
318 for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._data_locators, start, size):
319 data.append(self._keep.get(locator, num_retries=num_retries)[segmentoffset:segmentoffset+segmentsize])
322 def manifest_text(self, strip=False):
323 manifest_text = [self.name().replace(' ', '\\040')]
325 for d in self._data_locators:
326 m = re.match(r'^[0-9a-f]{32}\+\d+', d[LOCATOR])
327 manifest_text.append(m.group(0))
329 manifest_text.extend([d[LOCATOR] for d in self._data_locators])
330 manifest_text.extend([' '.join(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], f.name().replace(' ', '\\040'))
331 for seg in f.segments])
332 for f in self._files.values()])
333 return ' '.join(manifest_text) + '\n'