import gflags
import httplib
import httplib2
-import logging
import os
import pprint
import sys
import threading
import collections
+from arvados.retry import retry_method
from keep import *
import config
import errors
self.segments = segments
self._name = name
self._filepos = 0L
+ self.num_retries = stream.num_retries
def name(self):
return self._name
n = self.segments[-1]
return n[OFFSET] + n[BLOCKSIZE]
- def read(self, size):
+ @retry_method
+ def read(self, size, num_retries=None):
"""Read up to 'size' bytes from the stream, starting at the current file position"""
if size == 0:
return ''
data = ''
- for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, self._filepos, size):
- data += self._stream.readfrom(locator+segmentoffset, segmentsize)
+ available_chunks = locators_and_ranges(self.segments, self._filepos, size)
+ if available_chunks:
+ locator, blocksize, segmentoffset, segmentsize = available_chunks[0]
+ data = self._stream.readfrom(locator+segmentoffset, segmentsize,
+ num_retries=num_retries)
+
self._filepos += len(data)
return data
- def readfrom(self, start, size):
+ @retry_method
+ def readfrom(self, start, size, num_retries=None):
"""Read up to 'size' bytes from the stream, starting at 'start'"""
if size == 0:
return ''
- data = ''
+ data = []
for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, start, size):
- data += self._stream.readfrom(locator+segmentoffset, segmentsize)
- return data
+ data.append(self._stream.readfrom(locator+segmentoffset, segmentsize,
+ num_retries=num_retries))
+ return ''.join(data)
- def readall(self, size=2**20):
+ @retry_method
+ def readall(self, size=2**20, num_retries=None):
while True:
- data = self.read(size)
+ data = self.read(size, num_retries=num_retries)
if data == '':
break
yield data
- def decompress(self, decompress, size):
- for segment in self.readall(size):
+ @retry_method
+ def decompress(self, decompress, size, num_retries=None):
+ for segment in self.readall(size, num_retries):
data = decompress(segment)
if data and data != '':
yield data
- def readall_decompressed(self, size=2**20):
+ @retry_method
+ def readall_decompressed(self, size=2**20, num_retries=None):
self.seek(0)
if re.search('\.bz2$', self._name):
dc = bz2.BZ2Decompressor()
- return self.decompress(lambda segment: dc.decompress(segment), size)
+ return self.decompress(dc.decompress, size,
+ num_retries=num_retries)
elif re.search('\.gz$', self._name):
dc = zlib.decompressobj(16+zlib.MAX_WBITS)
- return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment), size)
+ return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
+ size, num_retries=num_retries)
else:
- return self.readall(size)
+ return self.readall(size, num_retries=num_retries)
- def readlines(self, decompress=True):
- if decompress:
- datasource = self.readall_decompressed()
- else:
- datasource = self.readall()
+ @retry_method
+ def readlines(self, decompress=True, num_retries=None):
+ read_func = self.readall_decompressed if decompress else self.readall
data = ''
- for newdata in datasource:
+ for newdata in read_func(num_retries=num_retries):
data += newdata
sol = 0
while True:
manifest_text.extend(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], self.name().replace(' ', '\\040')) for seg in self.segments])
return arvados.CollectionReader(' '.join(manifest_text) + '\n').manifest_text()
+
class StreamReader(object):
- def __init__(self, tokens, keep=None, debug=False, _empty=False):
+ def __init__(self, tokens, keep=None, debug=False, _empty=False,
+ num_retries=0):
self._stream_name = None
self._data_locators = []
self._files = collections.OrderedDict()
-
- if keep != None:
- self._keep = keep
- else:
- self._keep = Keep.global_client_object()
+ self._keep = keep
+ self.num_retries = num_retries
streamoffset = 0L
def locators_and_ranges(self, range_start, range_size):
return locators_and_ranges(self._data_locators, range_start, range_size)
- def readfrom(self, start, size):
+ @retry_method
+ def readfrom(self, start, size, num_retries=None):
"""Read up to 'size' bytes from the stream, starting at 'start'"""
if size == 0:
return ''
- data = ''
+ if self._keep is None:
+ self._keep = KeepClient(num_retries=self.num_retries)
+ data = []
for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._data_locators, start, size):
- data += self._keep.get(locator)[segmentoffset:segmentoffset+segmentsize]
- return data
+ data.append(self._keep.get(locator, num_retries=num_retries)[segmentoffset:segmentoffset+segmentsize])
+ return ''.join(data)
def manifest_text(self, strip=False):
manifest_text = [self.name().replace(' ', '\\040')]