X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/600378f933f028ba497cac86a978fa71401d209b..e158f485053be1e840073b321033d60d686a55a8:/sdk/python/arvados/stream.py diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py index 6ee12e18aa..c263dd871b 100644 --- a/sdk/python/arvados/stream.py +++ b/sdk/python/arvados/stream.py @@ -1,24 +1,12 @@ -import gflags -import httplib -import httplib2 -import logging +import bz2 +import collections +import hashlib import os -import pprint -import sys -import types -import subprocess -import json -import UserDict import re -import hashlib -import string -import bz2 import zlib -import fcntl -import time -import threading -import collections +from .arvfile import ArvadosFileBase +from arvados.retry import retry_method from keep import * import config import errors @@ -71,7 +59,7 @@ def locators_and_ranges(data_locators, range_start, range_size, debug=False): block_size = data_locators[i][BLOCKSIZE] block_start = data_locators[i][OFFSET] block_end = block_start + block_size - + while i < len(data_locators): locator, block_size, block_start = data_locators[i] block_end = block_start + block_size @@ -102,24 +90,54 @@ def locators_and_ranges(data_locators, range_start, range_size, debug=False): i += 1 return resp +def split(path): + """split(path) -> streamname, filename + + Separate the stream name and file name in a /-separated stream path. + If no stream name is available, assume '.'. + """ + try: + stream_name, file_name = path.rsplit('/', 1) + except ValueError: # No / in string + stream_name, file_name = '.', path + return stream_name, file_name + +class StreamFileReader(ArvadosFileBase): + class _NameAttribute(str): + # The Python file API provides a plain .name attribute. + # Older SDK provided a name() method. + # This class provides both, for maximum compatibility. + def __call__(self): + return self + -class StreamFileReader(object): def __init__(self, stream, segments, name): + super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb') self._stream = stream self.segments = segments - self._name = name self._filepos = 0L + self.num_retries = stream.num_retries + self._readline_cache = (None, None) - def name(self): - return self._name + def __iter__(self): + while True: + data = self.readline() + if not data: + break + yield data def decompressed_name(self): - return re.sub('\.(bz2|gz)$', '', self._name) + return re.sub('\.(bz2|gz)$', '', self.name) def stream_name(self): return self._stream.name() - def seek(self, pos): + @ArvadosFileBase._before_close + def seek(self, pos, whence=os.SEEK_CUR): + if whence == os.SEEK_CUR: + pos += self._filepos + elif whence == os.SEEK_END: + pos += self.size() self._filepos = min(max(pos, 0L), self.size()) def tell(self): @@ -129,87 +147,126 @@ class StreamFileReader(object): n = self.segments[-1] return n[OFFSET] + n[BLOCKSIZE] - def read(self, size): + @ArvadosFileBase._before_close + @retry_method + def read(self, size, num_retries=None): """Read up to 'size' bytes from the stream, starting at the current file position""" if size == 0: return '' data = '' - for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, self._filepos, size): - data += self._stream.readfrom(locator+segmentoffset, segmentsize) - self._filepos += len(data) + available_chunks = locators_and_ranges(self.segments, self._filepos, size) + if available_chunks: + locator, blocksize, segmentoffset, segmentsize = available_chunks[0] + data = self._stream.readfrom(locator+segmentoffset, segmentsize, + num_retries=num_retries) + + self._filepos += len(data) return data - def readfrom(self, start, size): + @ArvadosFileBase._before_close + @retry_method + def readfrom(self, start, size, num_retries=None): """Read up to 'size' bytes from the stream, starting at 'start'""" if size == 0: return '' - data = '' + data = [] for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, start, size): - data += self._stream.readfrom(locator+segmentoffset, segmentsize) - return data + data.append(self._stream.readfrom(locator+segmentoffset, segmentsize, + num_retries=num_retries)) + return ''.join(data) - def readall(self, size=2**20): + @ArvadosFileBase._before_close + @retry_method + def readall(self, size=2**20, num_retries=None): while True: - data = self.read(size) + data = self.read(size, num_retries=num_retries) if data == '': break yield data - def decompress(self, decompress, size): - for segment in self.readall(size): + @ArvadosFileBase._before_close + @retry_method + def readline(self, size=float('inf'), num_retries=None): + cache_pos, cache_data = self._readline_cache + if self.tell() == cache_pos: + data = [cache_data] + else: + data = [''] + data_size = len(data[-1]) + while (data_size < size) and ('\n' not in data[-1]): + next_read = self.read(2 ** 20, num_retries=num_retries) + if not next_read: + break + data.append(next_read) + data_size += len(next_read) + data = ''.join(data) + try: + nextline_index = data.index('\n') + 1 + except ValueError: + nextline_index = len(data) + nextline_index = min(nextline_index, size) + self._readline_cache = (self.tell(), data[nextline_index:]) + return data[:nextline_index] + + @ArvadosFileBase._before_close + @retry_method + def decompress(self, decompress, size, num_retries=None): + for segment in self.readall(size, num_retries): data = decompress(segment) - if data and data != '': + if data: yield data - def readall_decompressed(self, size=2**20): + @ArvadosFileBase._before_close + @retry_method + def readall_decompressed(self, size=2**20, num_retries=None): self.seek(0) - if re.search('\.bz2$', self._name): + if self.name.endswith('.bz2'): dc = bz2.BZ2Decompressor() - return self.decompress(lambda segment: dc.decompress(segment), size) - elif re.search('\.gz$', self._name): - dc = zlib.decompressobj(16+zlib.MAX_WBITS) - return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment), size) + return self.decompress(dc.decompress, size, + num_retries=num_retries) + elif self.name.endswith('.gz'): + dc = zlib.decompressobj(16+zlib.MAX_WBITS) + return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment), + size, num_retries=num_retries) else: - return self.readall(size) + return self.readall(size, num_retries=num_retries) + + @ArvadosFileBase._before_close + @retry_method + def readlines(self, sizehint=float('inf'), num_retries=None): + data = [] + data_size = 0 + for s in self.readall(num_retries=num_retries): + data.append(s) + data_size += len(s) + if data_size >= sizehint: + break + return ''.join(data).splitlines(True) + + def as_manifest(self): + manifest_text = ['.'] + manifest_text.extend([d[LOCATOR] for d in self._stream._data_locators]) + manifest_text.extend(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], self.name().replace(' ', '\\040')) for seg in self.segments]) + return arvados.CollectionReader(' '.join(manifest_text) + '\n').manifest_text(normalize=True) - def readlines(self, decompress=True): - if decompress: - datasource = self.readall_decompressed() - else: - datasource = self.readall() - data = '' - for newdata in datasource: - data += newdata - sol = 0 - while True: - eol = string.find(data, "\n", sol) - if eol < 0: - break - yield data[sol:eol+1] - sol = eol+1 - data = data[sol:] - if data != '': - yield data class StreamReader(object): - def __init__(self, tokens, keep=None, debug=False): + def __init__(self, tokens, keep=None, debug=False, _empty=False, + num_retries=0): self._stream_name = None self._data_locators = [] self._files = collections.OrderedDict() + self._keep = keep + self.num_retries = num_retries - if keep != None: - self._keep = keep - else: - self._keep = Keep.global_client_object() - streamoffset = 0L # parse stream for tok in tokens: if debug: print 'tok', tok - if self._stream_name == None: + if self._stream_name is None: self._stream_name = tok.replace('\\040', ' ') continue @@ -250,11 +307,27 @@ class StreamReader(object): def locators_and_ranges(self, range_start, range_size): return locators_and_ranges(self._data_locators, range_start, range_size) - def readfrom(self, start, size): + @retry_method + def readfrom(self, start, size, num_retries=None): """Read up to 'size' bytes from the stream, starting at 'start'""" if size == 0: return '' - data = '' + if self._keep is None: + self._keep = KeepClient(num_retries=self.num_retries) + data = [] for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._data_locators, start, size): - data += self._keep.get(locator)[segmentoffset:segmentoffset+segmentsize] - return data + data.append(self._keep.get(locator, num_retries=num_retries)[segmentoffset:segmentoffset+segmentsize]) + return ''.join(data) + + def manifest_text(self, strip=False): + manifest_text = [self.name().replace(' ', '\\040')] + if strip: + for d in self._data_locators: + m = re.match(r'^[0-9a-f]{32}\+\d+', d[LOCATOR]) + manifest_text.append(m.group(0)) + else: + manifest_text.extend([d[LOCATOR] for d in self._data_locators]) + manifest_text.extend([' '.join(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], f.name().replace(' ', '\\040')) + for seg in f.segments]) + for f in self._files.values()]) + return ' '.join(manifest_text) + '\n'