import re
import zlib
+from .arvfile import ArvadosFileBase
from arvados.retry import retry_method
from keep import *
import config
i += 1
return resp
+def split(path):
+ """split(path) -> streamname, filename
+
+ Separate the stream name and file name in a /-separated stream path.
+ If no stream name is available, assume '.'.
+ """
+ try:
+ stream_name, file_name = path.rsplit('/', 1)
+ except ValueError: # No / in string
+ stream_name, file_name = '.', path
+ return stream_name, file_name
+
+class StreamFileReader(ArvadosFileBase):
+ class _NameAttribute(str):
+ # The Python file API provides a plain .name attribute.
+ # Older SDK provided a name() method.
+ # This class provides both, for maximum compatibility.
+ def __call__(self):
+ return self
+
-class StreamFileReader(object):
def __init__(self, stream, segments, name):
+ super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb')
self._stream = stream
self.segments = segments
- self._name = name
self._filepos = 0L
self.num_retries = stream.num_retries
+ self._readline_cache = (None, None)
- def name(self):
- return self._name
+ def __iter__(self):
+ while True:
+ data = self.readline()
+ if not data:
+ break
+ yield data
def decompressed_name(self):
- return re.sub('\.(bz2|gz)$', '', self._name)
+ return re.sub('\.(bz2|gz)$', '', self.name)
def stream_name(self):
return self._stream.name()
- def seek(self, pos):
+ @ArvadosFileBase._before_close
+ def seek(self, pos, whence=os.SEEK_CUR):
+ if whence == os.SEEK_CUR:
+ pos += self._filepos
+ elif whence == os.SEEK_END:
+ pos += self.size()
self._filepos = min(max(pos, 0L), self.size())
def tell(self):
n = self.segments[-1]
return n[OFFSET] + n[BLOCKSIZE]
+ @ArvadosFileBase._before_close
@retry_method
def read(self, size, num_retries=None):
"""Read up to 'size' bytes from the stream, starting at the current file position"""
self._filepos += len(data)
return data
+ @ArvadosFileBase._before_close
@retry_method
def readfrom(self, start, size, num_retries=None):
"""Read up to 'size' bytes from the stream, starting at 'start'"""
num_retries=num_retries))
return ''.join(data)
+ @ArvadosFileBase._before_close
@retry_method
def readall(self, size=2**20, num_retries=None):
while True:
break
yield data
+ @ArvadosFileBase._before_close
+ @retry_method
+ def readline(self, size=float('inf'), num_retries=None):
+ cache_pos, cache_data = self._readline_cache
+ if self.tell() == cache_pos:
+ data = [cache_data]
+ else:
+ data = ['']
+ data_size = len(data[-1])
+ while (data_size < size) and ('\n' not in data[-1]):
+ next_read = self.read(2 ** 20, num_retries=num_retries)
+ if not next_read:
+ break
+ data.append(next_read)
+ data_size += len(next_read)
+ data = ''.join(data)
+ try:
+ nextline_index = data.index('\n') + 1
+ except ValueError:
+ nextline_index = len(data)
+ nextline_index = min(nextline_index, size)
+ self._readline_cache = (self.tell(), data[nextline_index:])
+ return data[:nextline_index]
+
+ @ArvadosFileBase._before_close
@retry_method
def decompress(self, decompress, size, num_retries=None):
for segment in self.readall(size, num_retries):
data = decompress(segment)
- if data and data != '':
+ if data:
yield data
+ @ArvadosFileBase._before_close
@retry_method
def readall_decompressed(self, size=2**20, num_retries=None):
self.seek(0)
- if re.search('\.bz2$', self._name):
+ if self.name.endswith('.bz2'):
dc = bz2.BZ2Decompressor()
return self.decompress(dc.decompress, size,
num_retries=num_retries)
- elif re.search('\.gz$', self._name):
+ elif self.name.endswith('.gz'):
dc = zlib.decompressobj(16+zlib.MAX_WBITS)
return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
size, num_retries=num_retries)
else:
return self.readall(size, num_retries=num_retries)
+ @ArvadosFileBase._before_close
@retry_method
- def readlines(self, decompress=True, num_retries=None):
- read_func = self.readall_decompressed if decompress else self.readall
- data = ''
- for newdata in read_func(num_retries=num_retries):
- data += newdata
- sol = 0
- while True:
- eol = data.find("\n", sol)
- if eol < 0:
- break
- yield data[sol:eol+1]
- sol = eol+1
- data = data[sol:]
- if data != '':
- yield data
+ def readlines(self, sizehint=float('inf'), num_retries=None):
+ data = []
+ data_size = 0
+ for s in self.readall(num_retries=num_retries):
+ data.append(s)
+ data_size += len(s)
+ if data_size >= sizehint:
+ break
+ return ''.join(data).splitlines(True)
def as_manifest(self):
manifest_text = ['.']