available_chunks = locators_and_ranges(self.segments, self._filepos, size)
if available_chunks:
locator, blocksize, segmentoffset, segmentsize = available_chunks[0]
available_chunks = locators_and_ranges(self.segments, self._filepos, size)
if available_chunks:
locator, blocksize, segmentoffset, segmentsize = available_chunks[0]
- data = self._stream.readfrom(locator+segmentoffset, segmentsize)
+ data = self._stream.readfrom(locator+segmentoffset, segmentsize,
+ num_retries=num_retries)
for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, start, size):
for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, start, size):
- data += self._stream.readfrom(locator+segmentoffset, segmentsize)
- return data
+ data.append(self._stream.readfrom(locator+segmentoffset, segmentsize,
+ num_retries=num_retries))
+ return ''.join(data)
- def decompress(self, decompress, size):
- for segment in self.readall(size):
+ @retry_method
+ def decompress(self, decompress, size, num_retries=None):
+ for segment in self.readall(size, num_retries):
self.seek(0)
if re.search('\.bz2$', self._name):
dc = bz2.BZ2Decompressor()
self.seek(0)
if re.search('\.bz2$', self._name):
dc = bz2.BZ2Decompressor()
- return self.decompress(lambda segment: dc.decompress(segment), size)
+ return self.decompress(dc.decompress, size,
+ num_retries=num_retries)
elif re.search('\.gz$', self._name):
dc = zlib.decompressobj(16+zlib.MAX_WBITS)
elif re.search('\.gz$', self._name):
dc = zlib.decompressobj(16+zlib.MAX_WBITS)
- return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment), size)
+ return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
+ size, num_retries=num_retries)
- def readlines(self, decompress=True):
- if decompress:
- datasource = self.readall_decompressed()
- else:
- datasource = self.readall()
+ @retry_method
+ def readlines(self, decompress=True, num_retries=None):
+ read_func = self.readall_decompressed if decompress else self.readall
manifest_text.extend(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], self.name().replace(' ', '\\040')) for seg in self.segments])
return arvados.CollectionReader(' '.join(manifest_text) + '\n').manifest_text()
manifest_text.extend(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], self.name().replace(' ', '\\040')) for seg in self.segments])
return arvados.CollectionReader(' '.join(manifest_text) + '\n').manifest_text()
- def __init__(self, tokens, keep=None, debug=False, _empty=False):
+ def __init__(self, tokens, keep=None, debug=False, _empty=False,
+ num_retries=0):
def locators_and_ranges(self, range_start, range_size):
return locators_and_ranges(self._data_locators, range_start, range_size)
def locators_and_ranges(self, range_start, range_size):
return locators_and_ranges(self._data_locators, range_start, range_size)
for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._data_locators, start, size):
for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._data_locators, start, size):
def manifest_text(self, strip=False):
manifest_text = [self.name().replace(' ', '\\040')]
def manifest_text(self, strip=False):
manifest_text = [self.name().replace(' ', '\\040')]