-import gflags
-import httplib
-import httplib2
-import logging
+import bz2
+import collections
+import hashlib
import os
-import pprint
-import sys
-import types
-import subprocess
-import json
-import UserDict
import re
-import hashlib
-import string
-import bz2
import zlib
-import fcntl
-import time
-import threading
-import collections
+from .arvfile import ArvadosFileBase
+from arvados.retry import retry_method
from keep import *
import config
import errors
block_size = data_locators[i][BLOCKSIZE]
block_start = data_locators[i][OFFSET]
block_end = block_start + block_size
-
+
while i < len(data_locators):
locator, block_size, block_start = data_locators[i]
block_end = block_start + block_size
i += 1
return resp
+def split(path):
+ """split(path) -> streamname, filename
+
+ Separate the stream name and file name in a /-separated stream path.
+ If no stream name is available, assume '.'.
+ """
+ try:
+ stream_name, file_name = path.rsplit('/', 1)
+ except ValueError: # No / in string
+ stream_name, file_name = '.', path
+ return stream_name, file_name
+
+class StreamFileReader(ArvadosFileBase):
+ class _NameAttribute(str):
+ # The Python file API provides a plain .name attribute.
+ # Older SDK provided a name() method.
+ # This class provides both, for maximum compatibility.
+ def __call__(self):
+ return self
+
-class StreamFileReader(object):
def __init__(self, stream, segments, name):
+ super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb')
self._stream = stream
self.segments = segments
- self._name = name
self._filepos = 0L
+ self.num_retries = stream.num_retries
+ self._readline_cache = (None, None)
- def name(self):
- return self._name
+ def __iter__(self):
+ while True:
+ data = self.readline()
+ if not data:
+ break
+ yield data
def decompressed_name(self):
- return re.sub('\.(bz2|gz)$', '', self._name)
+ return re.sub('\.(bz2|gz)$', '', self.name)
def stream_name(self):
return self._stream.name()
- def seek(self, pos):
+ @ArvadosFileBase._before_close
+ def seek(self, pos, whence=os.SEEK_CUR):
+ if whence == os.SEEK_CUR:
+ pos += self._filepos
+ elif whence == os.SEEK_END:
+ pos += self.size()
self._filepos = min(max(pos, 0L), self.size())
def tell(self):
n = self.segments[-1]
return n[OFFSET] + n[BLOCKSIZE]
- def read(self, size):
+ @ArvadosFileBase._before_close
+ @retry_method
+ def read(self, size, num_retries=None):
"""Read up to 'size' bytes from the stream, starting at the current file position"""
if size == 0:
return ''
data = ''
- for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, self._filepos, size):
- data += self._stream.readfrom(locator+segmentoffset, segmentsize)
- self._filepos += len(data)
+ available_chunks = locators_and_ranges(self.segments, self._filepos, size)
+ if available_chunks:
+ locator, blocksize, segmentoffset, segmentsize = available_chunks[0]
+ data = self._stream.readfrom(locator+segmentoffset, segmentsize,
+ num_retries=num_retries)
+
+ self._filepos += len(data)
return data
- def readfrom(self, start, size):
+ @ArvadosFileBase._before_close
+ @retry_method
+ def readfrom(self, start, size, num_retries=None):
"""Read up to 'size' bytes from the stream, starting at 'start'"""
if size == 0:
return ''
- data = ''
+ data = []
for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, start, size):
- data += self._stream.readfrom(locator+segmentoffset, segmentsize)
- return data
+ data.append(self._stream.readfrom(locator+segmentoffset, segmentsize,
+ num_retries=num_retries))
+ return ''.join(data)
- def readall(self, size=2**20):
+ @ArvadosFileBase._before_close
+ @retry_method
+ def readall(self, size=2**20, num_retries=None):
while True:
- data = self.read(size)
+ data = self.read(size, num_retries=num_retries)
if data == '':
break
yield data
- def decompress(self, decompress, size):
- for segment in self.readall(size):
+ @ArvadosFileBase._before_close
+ @retry_method
+ def readline(self, size=float('inf'), num_retries=None):
+ cache_pos, cache_data = self._readline_cache
+ if self.tell() == cache_pos:
+ data = [cache_data]
+ else:
+ data = ['']
+ data_size = len(data[-1])
+ while (data_size < size) and ('\n' not in data[-1]):
+ next_read = self.read(2 ** 20, num_retries=num_retries)
+ if not next_read:
+ break
+ data.append(next_read)
+ data_size += len(next_read)
+ data = ''.join(data)
+ try:
+ nextline_index = data.index('\n') + 1
+ except ValueError:
+ nextline_index = len(data)
+ nextline_index = min(nextline_index, size)
+ self._readline_cache = (self.tell(), data[nextline_index:])
+ return data[:nextline_index]
+
+ @ArvadosFileBase._before_close
+ @retry_method
+ def decompress(self, decompress, size, num_retries=None):
+ for segment in self.readall(size, num_retries):
data = decompress(segment)
- if data and data != '':
+ if data:
yield data
- def readall_decompressed(self, size=2**20):
+ @ArvadosFileBase._before_close
+ @retry_method
+ def readall_decompressed(self, size=2**20, num_retries=None):
self.seek(0)
- if re.search('\.bz2$', self._name):
+ if self.name.endswith('.bz2'):
dc = bz2.BZ2Decompressor()
- return self.decompress(lambda segment: dc.decompress(segment), size)
- elif re.search('\.gz$', self._name):
- dc = zlib.decompressobj(16+zlib.MAX_WBITS)
- return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment), size)
+ return self.decompress(dc.decompress, size,
+ num_retries=num_retries)
+ elif self.name.endswith('.gz'):
+ dc = zlib.decompressobj(16+zlib.MAX_WBITS)
+ return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
+ size, num_retries=num_retries)
else:
- return self.readall(size)
-
- def readlines(self, decompress=True):
- if decompress:
- datasource = self.readall_decompressed()
- else:
- datasource = self.readall()
- data = ''
- for newdata in datasource:
- data += newdata
- sol = 0
- while True:
- eol = string.find(data, "\n", sol)
- if eol < 0:
- break
- yield data[sol:eol+1]
- sol = eol+1
- data = data[sol:]
- if data != '':
- yield data
+ return self.readall(size, num_retries=num_retries)
+
+ @ArvadosFileBase._before_close
+ @retry_method
+ def readlines(self, sizehint=float('inf'), num_retries=None):
+ data = []
+ data_size = 0
+ for s in self.readall(num_retries=num_retries):
+ data.append(s)
+ data_size += len(s)
+ if data_size >= sizehint:
+ break
+ return ''.join(data).splitlines(True)
def as_manifest(self):
manifest_text = ['.']
manifest_text.extend([d[LOCATOR] for d in self._stream._data_locators])
manifest_text.extend(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], self.name().replace(' ', '\\040')) for seg in self.segments])
- return arvados.CollectionReader(' '.join(manifest_text) + '\n').manifest_text()
+ return arvados.CollectionReader(' '.join(manifest_text) + '\n').manifest_text(normalize=True)
+
class StreamReader(object):
- def __init__(self, tokens, keep=None, debug=False, _empty=False):
+ def __init__(self, tokens, keep=None, debug=False, _empty=False,
+ num_retries=0):
self._stream_name = None
self._data_locators = []
self._files = collections.OrderedDict()
+ self._keep = keep
+ self.num_retries = num_retries
- if keep != None:
- self._keep = keep
- else:
- self._keep = Keep.global_client_object()
-
streamoffset = 0L
# parse stream
for tok in tokens:
if debug: print 'tok', tok
- if self._stream_name == None:
+ if self._stream_name is None:
self._stream_name = tok.replace('\\040', ' ')
continue
def locators_and_ranges(self, range_start, range_size):
return locators_and_ranges(self._data_locators, range_start, range_size)
- def readfrom(self, start, size):
+ @retry_method
+ def readfrom(self, start, size, num_retries=None):
"""Read up to 'size' bytes from the stream, starting at 'start'"""
if size == 0:
return ''
- data = ''
+ if self._keep is None:
+ self._keep = KeepClient(num_retries=self.num_retries)
+ data = []
for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._data_locators, start, size):
- data += self._keep.get(locator)[segmentoffset:segmentoffset+segmentsize]
- return data
-
- def manifest_text(self):
+ data.append(self._keep.get(locator, num_retries=num_retries)[segmentoffset:segmentoffset+segmentsize])
+ return ''.join(data)
+
+ def manifest_text(self, strip=False):
manifest_text = [self.name().replace(' ', '\\040')]
- manifest_text.extend([d[LOCATOR] for d in self._data_locators])
- manifest_text.extend([' '.join(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], f.name().replace(' ', '\\040'))
+ if strip:
+ for d in self._data_locators:
+ m = re.match(r'^[0-9a-f]{32}\+\d+', d[LOCATOR])
+ manifest_text.append(m.group(0))
+ else:
+ manifest_text.extend([d[LOCATOR] for d in self._data_locators])
+ manifest_text.extend([' '.join(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], f.name().replace(' ', '\\040'))
for seg in f.segments])
for f in self._files.values()])
return ' '.join(manifest_text) + '\n'