X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/80c57e3536f41d8419f580b577776d85209f6111..1f354e0ba1b5b23c2d36c0cb60451260b29e1d3f:/sdk/python/arvados/stream.py diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py index b98937fe64..04b6b81804 100644 --- a/sdk/python/arvados/stream.py +++ b/sdk/python/arvados/stream.py @@ -18,6 +18,7 @@ import time import threading import collections +from arvados.retry import retry_method from keep import * import config import errors @@ -108,6 +109,7 @@ class StreamFileReader(object): self.segments = segments self._name = name self._filepos = 0L + self.num_retries = stream.num_retries def name(self): return self._name @@ -128,7 +130,8 @@ class StreamFileReader(object): n = self.segments[-1] return n[OFFSET] + n[BLOCKSIZE] - def read(self, size): + @retry_method + def read(self, size, num_retries=None): """Read up to 'size' bytes from the stream, starting at the current file position""" if size == 0: return '' @@ -137,52 +140,58 @@ class StreamFileReader(object): available_chunks = locators_and_ranges(self.segments, self._filepos, size) if available_chunks: locator, blocksize, segmentoffset, segmentsize = available_chunks[0] - data = self._stream.readfrom(locator+segmentoffset, segmentsize) + data = self._stream.readfrom(locator+segmentoffset, segmentsize, + num_retries=num_retries) self._filepos += len(data) return data - def readfrom(self, start, size): + @retry_method + def readfrom(self, start, size, num_retries=None): """Read up to 'size' bytes from the stream, starting at 'start'""" if size == 0: return '' - data = '' + data = [] for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, start, size): - data += self._stream.readfrom(locator+segmentoffset, segmentsize) - return data + data.append(self._stream.readfrom(locator+segmentoffset, segmentsize, + num_retries=num_retries)) + return ''.join(data) - def readall(self, size=2**20): + @retry_method + def readall(self, size=2**20, num_retries=None): while True: - data = self.read(size) + data = self.read(size, num_retries=num_retries) if data == '': break yield data - def decompress(self, decompress, size): - for segment in self.readall(size): + @retry_method + def decompress(self, decompress, size, num_retries=None): + for segment in self.readall(size, num_retries): data = decompress(segment) if data and data != '': yield data - def readall_decompressed(self, size=2**20): + @retry_method + def readall_decompressed(self, size=2**20, num_retries=None): self.seek(0) if re.search('\.bz2$', self._name): dc = bz2.BZ2Decompressor() - return self.decompress(lambda segment: dc.decompress(segment), size) + return self.decompress(dc.decompress, size, + num_retries=num_retries) elif re.search('\.gz$', self._name): dc = zlib.decompressobj(16+zlib.MAX_WBITS) - return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment), size) + return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment), + size, num_retries=num_retries) else: - return self.readall(size) + return self.readall(size, num_retries=num_retries) - def readlines(self, decompress=True): - if decompress: - datasource = self.readall_decompressed() - else: - datasource = self.readall() + @retry_method + def readlines(self, decompress=True, num_retries=None): + read_func = self.readall_decompressed if decompress else self.readall data = '' - for newdata in datasource: + for newdata in read_func(num_retries=num_retries): data += newdata sol = 0 while True: @@ -201,12 +210,15 @@ class StreamFileReader(object): manifest_text.extend(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], self.name().replace(' ', '\\040')) for seg in self.segments]) return arvados.CollectionReader(' '.join(manifest_text) + '\n').manifest_text() + class StreamReader(object): - def __init__(self, tokens, keep=None, debug=False, _empty=False): + def __init__(self, tokens, keep=None, debug=False, _empty=False, + num_retries=0): self._stream_name = None self._data_locators = [] self._files = collections.OrderedDict() self._keep = keep + self.num_retries = num_retries streamoffset = 0L @@ -254,16 +266,17 @@ class StreamReader(object): def locators_and_ranges(self, range_start, range_size): return locators_and_ranges(self._data_locators, range_start, range_size) - def readfrom(self, start, size): + @retry_method + def readfrom(self, start, size, num_retries=None): """Read up to 'size' bytes from the stream, starting at 'start'""" if size == 0: return '' if self._keep is None: - self._keep = KeepClient() - data = '' + self._keep = KeepClient(num_retries=self.num_retries) + data = [] for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._data_locators, start, size): - data += self._keep.get(locator)[segmentoffset:segmentoffset+segmentsize] - return data + data.append(self._keep.get(locator, num_retries=num_retries)[segmentoffset:segmentoffset+segmentsize]) + return ''.join(data) def manifest_text(self, strip=False): manifest_text = [self.name().replace(' ', '\\040')]