From: Peter Amstutz Date: Wed, 17 Dec 2014 15:04:08 +0000 (-0500) Subject: 3198: Refactor stream and file classes and functions a little bit for readability. X-Git-Tag: 1.1.0~1780^2~72 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/b0e2fe9d0a18d5e809bc8c0d3382e0e023cb949c 3198: Refactor stream and file classes and functions a little bit for readability. --- diff --git a/sdk/python/arvados/__init__.py b/sdk/python/arvados/__init__.py index 4cae20d597..19a7ad8439 100644 --- a/sdk/python/arvados/__init__.py +++ b/sdk/python/arvados/__init__.py @@ -22,6 +22,7 @@ from api import * from collection import * from keep import * from stream import * +from arvfile import * import errors import util @@ -131,5 +132,3 @@ class job_setup: body={'success':True} ).execute() exit(0) - - diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py index e8dac463e5..ef7a6c88d9 100644 --- a/sdk/python/arvados/arvfile.py +++ b/sdk/python/arvados/arvfile.py @@ -1,4 +1,21 @@ import functools +import os +import zlib +import bz2 +from .ranges import * +from arvados.retry import retry_method + +def split(path): + """split(path) -> streamname, filename + + Separate the stream name and file name in a /-separated stream path. + If no stream name is available, assume '.'. + """ + try: + stream_name, file_name = path.rsplit('/', 1) + except ValueError: # No / in string + stream_name, file_name = '.', path + return stream_name, file_name class ArvadosFileBase(object): def __init__(self, name, mode): @@ -27,3 +44,222 @@ class ArvadosFileBase(object): def close(self): self.closed = True + + +class StreamFileReader(ArvadosFileBase): + class _NameAttribute(str): + # The Python file API provides a plain .name attribute. + # Older SDK provided a name() method. + # This class provides both, for maximum compatibility. + def __call__(self): + return self + + + def __init__(self, stream, segments, name): + super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb') + self._stream = stream + self.segments = segments + self._filepos = 0L + self.num_retries = stream.num_retries + self._readline_cache = (None, None) + + def __iter__(self): + while True: + data = self.readline() + if not data: + break + yield data + + def decompressed_name(self): + return re.sub('\.(bz2|gz)$', '', self.name) + + def stream_name(self): + return self._stream.name() + + @ArvadosFileBase._before_close + def seek(self, pos, whence=os.SEEK_CUR): + if whence == os.SEEK_CUR: + pos += self._filepos + elif whence == os.SEEK_END: + pos += self.size() + self._filepos = min(max(pos, 0L), self._size()) + + def tell(self): + return self._filepos + + def _size(self): + n = self.segments[-1] + return n[OFFSET] + n[BLOCKSIZE] + + def size(self): + return self._size() + + @ArvadosFileBase._before_close + @retry_method + def read(self, size, num_retries=None): + """Read up to 'size' bytes from the stream, starting at the current file position""" + if size == 0: + return '' + + data = '' + available_chunks = locators_and_ranges(self.segments, self._filepos, size) + if available_chunks: + locator, blocksize, segmentoffset, segmentsize = available_chunks[0] + data = self._stream._readfrom(locator+segmentoffset, segmentsize, + num_retries=num_retries) + + self._filepos += len(data) + return data + + @ArvadosFileBase._before_close + @retry_method + def readfrom(self, start, size, num_retries=None): + """Read up to 'size' bytes from the stream, starting at 'start'""" + if size == 0: + return '' + + data = [] + for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, start, size): + data.append(self._stream._readfrom(locator+segmentoffset, segmentsize, + num_retries=num_retries)) + return ''.join(data) + + @ArvadosFileBase._before_close + @retry_method + def readall(self, size=2**20, num_retries=None): + while True: + data = self.read(size, num_retries=num_retries) + if data == '': + break + yield data + + @ArvadosFileBase._before_close + @retry_method + def readline(self, size=float('inf'), num_retries=None): + cache_pos, cache_data = self._readline_cache + if self.tell() == cache_pos: + data = [cache_data] + else: + data = [''] + data_size = len(data[-1]) + while (data_size < size) and ('\n' not in data[-1]): + next_read = self.read(2 ** 20, num_retries=num_retries) + if not next_read: + break + data.append(next_read) + data_size += len(next_read) + data = ''.join(data) + try: + nextline_index = data.index('\n') + 1 + except ValueError: + nextline_index = len(data) + nextline_index = min(nextline_index, size) + self._readline_cache = (self.tell(), data[nextline_index:]) + return data[:nextline_index] + + @ArvadosFileBase._before_close + @retry_method + def decompress(self, decompress, size, num_retries=None): + for segment in self.readall(size, num_retries): + data = decompress(segment) + if data: + yield data + + @ArvadosFileBase._before_close + @retry_method + def readall_decompressed(self, size=2**20, num_retries=None): + self.seek(0) + if self.name.endswith('.bz2'): + dc = bz2.BZ2Decompressor() + return self.decompress(dc.decompress, size, + num_retries=num_retries) + elif self.name.endswith('.gz'): + dc = zlib.decompressobj(16+zlib.MAX_WBITS) + return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment), + size, num_retries=num_retries) + else: + return self.readall(size, num_retries=num_retries) + + @ArvadosFileBase._before_close + @retry_method + def readlines(self, sizehint=float('inf'), num_retries=None): + data = [] + data_size = 0 + for s in self.readall(num_retries=num_retries): + data.append(s) + data_size += len(s) + if data_size >= sizehint: + break + return ''.join(data).splitlines(True) + + def as_manifest(self): + manifest_text = ['.'] + manifest_text.extend([d[LOCATOR] for d in self._stream._data_locators]) + manifest_text.extend(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], self.name().replace(' ', '\\040')) for seg in self.segments]) + return arvados.CollectionReader(' '.join(manifest_text) + '\n').manifest_text(normalize=True) + + +class StreamFileWriter(StreamFileReader): + def __init__(self, stream, segments, name): + super(StreamFileWriter, self).__init__(stream, segments, name) + self.mode = 'wb' + + # wrap superclass methods in mutex + def _proxy_method(name): + method = getattr(StreamFileReader, name) + @functools.wraps(method, ('__name__', '__doc__')) + def wrapper(self, *args, **kwargs): + with self._stream.mutex: + return method(self, *args, **kwargs) + return wrapper + + for _method_name in ['__iter__', 'seek', 'tell', 'size', 'read', 'readfrom', 'readall', 'readline', 'decompress', 'readall_decompressed', 'readlines', 'as_manifest']: + locals()[_method_name] = _proxy_method(_method_name) + + def truncate(self, size=None): + with self._stream.mutex: + if size is None: + size = self._filepos + + segs = locators_and_ranges(self.segments, 0, size) + + newstream = [] + self.segments = [] + streamoffset = 0L + fileoffset = 0L + + for seg in segs: + for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._stream._data_locators, seg[LOCATOR]+seg[OFFSET], seg[SEGMENTSIZE]): + newstream.append([locator, blocksize, streamoffset]) + self.segments.append([streamoffset+segmentoffset, segmentsize, fileoffset]) + streamoffset += blocksize + fileoffset += segmentsize + if len(newstream) == 0: + newstream.append(config.EMPTY_BLOCK_LOCATOR) + self.segments.append([0, 0, 0]) + self._stream._data_locators = newstream + if self._filepos > fileoffset: + self._filepos = fileoffset + + def _writeto(self, offset, data): + self._stream._append(data) + replace_range(self.segments, self._filepos, len(data), self._stream._size()-len(data)) + self._filepos += len(data) + + def writeto(self, offset, data): + with self._stream.mutex: + self._writeto(offset, data) + + def write(self, data): + with self._stream.mutex: + self._writeto(self._filepos, data) + self._filepos += len(data) + + def writelines(self, seq): + with self._stream.mutex: + for s in seq: + self._writeto(self._filepos, s) + self._filepos += len(s) + + def flush(self): + pass diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py index 5ab7e77a2b..24572edf56 100644 --- a/sdk/python/arvados/collection.py +++ b/sdk/python/arvados/collection.py @@ -6,55 +6,15 @@ import re from collections import deque from stat import * -from .arvfile import ArvadosFileBase +from .arvfile import ArvadosFileBase, split from keep import * -from .stream import StreamReader, split +from .stream import StreamReader, normalize_stream import config import errors import util _logger = logging.getLogger('arvados.collection') -def normalize_stream(s, stream): - stream_tokens = [s] - sortedfiles = list(stream.keys()) - sortedfiles.sort() - - blocks = {} - streamoffset = 0L - for f in sortedfiles: - for b in stream[f]: - if b[arvados.LOCATOR] not in blocks: - stream_tokens.append(b[arvados.LOCATOR]) - blocks[b[arvados.LOCATOR]] = streamoffset - streamoffset += b[arvados.BLOCKSIZE] - - if len(stream_tokens) == 1: - stream_tokens.append(config.EMPTY_BLOCK_LOCATOR) - - for f in sortedfiles: - current_span = None - fout = f.replace(' ', '\\040') - for segment in stream[f]: - segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET] - if current_span is None: - current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]] - else: - if segmentoffset == current_span[1]: - current_span[1] += segment[arvados.SEGMENTSIZE] - else: - stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout)) - current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]] - - if current_span is not None: - stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout)) - - if not stream[f]: - stream_tokens.append("0:0:{0}".format(fout)) - - return stream_tokens - - class CollectionBase(object): def __enter__(self): return self diff --git a/sdk/python/arvados/ranges.py b/sdk/python/arvados/ranges.py new file mode 100644 index 0000000000..fe9c04b451 --- /dev/null +++ b/sdk/python/arvados/ranges.py @@ -0,0 +1,149 @@ +LOCATOR = 0 +BLOCKSIZE = 1 +OFFSET = 2 +SEGMENTSIZE = 3 + +def first_block(data_locators, range_start, range_size, debug=False): + block_start = 0L + + # range_start/block_start is the inclusive lower bound + # range_end/block_end is the exclusive upper bound + + hi = len(data_locators) + lo = 0 + i = int((hi + lo) / 2) + block_size = data_locators[i][BLOCKSIZE] + block_start = data_locators[i][OFFSET] + block_end = block_start + block_size + if debug: print '---' + + # perform a binary search for the first block + # assumes that all of the blocks are contigious, so range_start is guaranteed + # to either fall into the range of a block or be outside the block range entirely + while not (range_start >= block_start and range_start < block_end): + if lo == i: + # must be out of range, fail + return None + if range_start > block_start: + lo = i + else: + hi = i + i = int((hi + lo) / 2) + if debug: print lo, i, hi + block_size = data_locators[i][BLOCKSIZE] + block_start = data_locators[i][OFFSET] + block_end = block_start + block_size + + return i + +def locators_and_ranges(data_locators, range_start, range_size, debug=False): + ''' + Get blocks that are covered by the range + data_locators: list of [locator, block_size, block_start], assumes that blocks are in order and contigous + range_start: start of range + range_size: size of range + returns list of [block locator, blocksize, segment offset, segment size] that satisfies the range + ''' + if range_size == 0: + return [] + resp = [] + range_start = long(range_start) + range_size = long(range_size) + range_end = range_start + range_size + + i = first_block(data_locators, range_start, range_size, debug) + if i is None: + return [] + + while i < len(data_locators): + locator, block_size, block_start = data_locators[i] + block_end = block_start + block_size + if debug: + print locator, "range_start", range_start, "block_start", block_start, "range_end", range_end, "block_end", block_end + if range_end <= block_start: + # range ends before this block starts, so don't look at any more locators + break + + #if range_start >= block_end: + # range starts after this block ends, so go to next block + # we should always start at the first block due to the binary above, so this test is redundant + #next + + if range_start >= block_start and range_end <= block_end: + # range starts and ends in this block + resp.append([locator, block_size, range_start - block_start, range_size]) + elif range_start >= block_start and range_end > block_end: + # range starts in this block + resp.append([locator, block_size, range_start - block_start, block_end - range_start]) + elif range_start < block_start and range_end > block_end: + # range starts in a previous block and extends to further blocks + resp.append([locator, block_size, 0L, block_size]) + elif range_start < block_start and range_end <= block_end: + # range starts in a previous block and ends in this block + resp.append([locator, block_size, 0L, range_end - block_start]) + block_start = block_end + i += 1 + return resp + +def replace_range(data_locators, range_start, range_size, new_locator, debug=False): + ''' + Replace a range with a new block. + data_locators: list of [locator, block_size, block_start], assumes that blocks are in order and contigous + range_start: start of range + range_size: size of range + new_locator: locator for new block to be inserted + !!! data_locators will be updated in place !!! + ''' + if range_size == 0: + return + + range_start = long(range_start) + range_size = long(range_size) + range_end = range_start + range_size + + last = data_locators[-1] + if (last[OFFSET]+last[BLOCKSIZE]) == range_start: + # append new block + data_locators.append([new_locator, range_size, range_start]) + return + + i = first_block(data_locators, range_start, range_size, debug) + if i is None: + return + + while i < len(data_locators): + locator, block_size, block_start = data_locators[i] + block_end = block_start + block_size + if debug: + print locator, "range_start", range_start, "block_start", block_start, "range_end", range_end, "block_end", block_end + if range_end <= block_start: + # range ends before this block starts, so don't look at any more locators + break + + #if range_start >= block_end: + # range starts after this block ends, so go to next block + # we should always start at the first block due to the binary above, so this test is redundant + #next + + if range_start >= block_start and range_end <= block_end: + # range starts and ends in this block + # split block into 3 pieces + #resp.append([locator, block_size, range_start - block_start, range_size]) + pass + elif range_start >= block_start and range_end > block_end: + # range starts in this block + # split block into 2 pieces + #resp.append([locator, block_size, range_start - block_start, block_end - range_start]) + pass + elif range_start < block_start and range_end > block_end: + # range starts in a previous block and extends to further blocks + # zero out this block + #resp.append([locator, block_size, 0L, block_size]) + pass + elif range_start < block_start and range_end <= block_end: + # range starts in a previous block and ends in this block + # split into 2 pieces + #resp.append([locator, block_size, 0L, range_end - block_start]) + pass + block_start = block_end + i += 1 diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py index 3f6461573f..8623ab93d5 100644 --- a/sdk/python/arvados/stream.py +++ b/sdk/python/arvados/stream.py @@ -1,332 +1,59 @@ -import bz2 import collections import hashlib import os import re -import zlib import threading import functools -from .arvfile import ArvadosFileBase +from .ranges import * +from .arvfile import ArvadosFileBase, StreamFileReader, StreamFileWriter from arvados.retry import retry_method from keep import * import config import errors -LOCATOR = 0 -BLOCKSIZE = 1 -OFFSET = 2 -SEGMENTSIZE = 3 - -def first_block(data_locators, range_start, range_size, debug=False): - block_start = 0L - - # range_start/block_start is the inclusive lower bound - # range_end/block_end is the exclusive upper bound - - hi = len(data_locators) - lo = 0 - i = int((hi + lo) / 2) - block_size = data_locators[i][BLOCKSIZE] - block_start = data_locators[i][OFFSET] - block_end = block_start + block_size - if debug: print '---' - - # perform a binary search for the first block - # assumes that all of the blocks are contigious, so range_start is guaranteed - # to either fall into the range of a block or be outside the block range entirely - while not (range_start >= block_start and range_start < block_end): - if lo == i: - # must be out of range, fail - return None - if range_start > block_start: - lo = i - else: - hi = i - i = int((hi + lo) / 2) - if debug: print lo, i, hi - block_size = data_locators[i][BLOCKSIZE] - block_start = data_locators[i][OFFSET] - block_end = block_start + block_size - - return i - -def locators_and_ranges(data_locators, range_start, range_size, debug=False): - ''' - Get blocks that are covered by the range - data_locators: list of [locator, block_size, block_start], assumes that blocks are in order and contigous - range_start: start of range - range_size: size of range - returns list of [block locator, blocksize, segment offset, segment size] that satisfies the range +def normalize_stream(s, stream): ''' - if range_size == 0: - return [] - resp = [] - range_start = long(range_start) - range_size = long(range_size) - range_end = range_start + range_size - - i = first_block(data_locators, range_start, range_size, debug) - if i is None: - return [] - - while i < len(data_locators): - locator, block_size, block_start = data_locators[i] - block_end = block_start + block_size - if debug: - print locator, "range_start", range_start, "block_start", block_start, "range_end", range_end, "block_end", block_end - if range_end <= block_start: - # range ends before this block starts, so don't look at any more locators - break - - #if range_start >= block_end: - # range starts after this block ends, so go to next block - # we should always start at the first block due to the binary above, so this test is redundant - #next - - if range_start >= block_start and range_end <= block_end: - # range starts and ends in this block - resp.append([locator, block_size, range_start - block_start, range_size]) - elif range_start >= block_start and range_end > block_end: - # range starts in this block - resp.append([locator, block_size, range_start - block_start, block_end - range_start]) - elif range_start < block_start and range_end > block_end: - # range starts in a previous block and extends to further blocks - resp.append([locator, block_size, 0L, block_size]) - elif range_start < block_start and range_end <= block_end: - # range starts in a previous block and ends in this block - resp.append([locator, block_size, 0L, range_end - block_start]) - block_start = block_end - i += 1 - return resp - -def replace_range(data_locators, range_start, range_size, new_locator, debug=False): + s is the stream name + stream is a StreamReader object ''' - Replace a range with a new block. - data_locators: list of [locator, block_size, block_start], assumes that blocks are in order and contigous - range_start: start of range - range_size: size of range - new_locator: locator for new block to be inserted - !!! data_locators will be updated in place !!! - ''' - if range_size == 0: - return - - range_start = long(range_start) - range_size = long(range_size) - range_end = range_start + range_size - - last = data_locators[-1] - if (last[OFFSET]+last[BLOCKSIZE]) == range_start: - # append new block - data_locators.append([new_locator, range_size, range_start]) - return - - i = first_block(data_locators, range_start, range_size, debug) - if i is None: - return - - while i < len(data_locators): - locator, block_size, block_start = data_locators[i] - block_end = block_start + block_size - if debug: - print locator, "range_start", range_start, "block_start", block_start, "range_end", range_end, "block_end", block_end - if range_end <= block_start: - # range ends before this block starts, so don't look at any more locators - break - - #if range_start >= block_end: - # range starts after this block ends, so go to next block - # we should always start at the first block due to the binary above, so this test is redundant - #next - - if range_start >= block_start and range_end <= block_end: - # range starts and ends in this block - # split block into 3 pieces - #resp.append([locator, block_size, range_start - block_start, range_size]) - pass - elif range_start >= block_start and range_end > block_end: - # range starts in this block - # split block into 2 pieces - #resp.append([locator, block_size, range_start - block_start, block_end - range_start]) - pass - elif range_start < block_start and range_end > block_end: - # range starts in a previous block and extends to further blocks - # zero out this block - #resp.append([locator, block_size, 0L, block_size]) - pass - elif range_start < block_start and range_end <= block_end: - # range starts in a previous block and ends in this block - # split into 2 pieces - #resp.append([locator, block_size, 0L, range_end - block_start]) - pass - block_start = block_end - i += 1 - - -def split(path): - """split(path) -> streamname, filename - - Separate the stream name and file name in a /-separated stream path. - If no stream name is available, assume '.'. - """ - try: - stream_name, file_name = path.rsplit('/', 1) - except ValueError: # No / in string - stream_name, file_name = '.', path - return stream_name, file_name - -class StreamFileReader(ArvadosFileBase): - class _NameAttribute(str): - # The Python file API provides a plain .name attribute. - # Older SDK provided a name() method. - # This class provides both, for maximum compatibility. - def __call__(self): - return self - - - def __init__(self, stream, segments, name): - super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb') - self._stream = stream - self.segments = segments - self._filepos = 0L - self.num_retries = stream.num_retries - self._readline_cache = (None, None) - - def __iter__(self): - while True: - data = self.readline() - if not data: - break - yield data - - def decompressed_name(self): - return re.sub('\.(bz2|gz)$', '', self.name) - - def stream_name(self): - return self._stream.name() - - @ArvadosFileBase._before_close - def seek(self, pos, whence=os.SEEK_CUR): - if whence == os.SEEK_CUR: - pos += self._filepos - elif whence == os.SEEK_END: - pos += self.size() - self._filepos = min(max(pos, 0L), self._size()) - - def tell(self): - return self._filepos - - def _size(self): - n = self.segments[-1] - return n[OFFSET] + n[BLOCKSIZE] - - def size(self): - return self._size() - - @ArvadosFileBase._before_close - @retry_method - def read(self, size, num_retries=None): - """Read up to 'size' bytes from the stream, starting at the current file position""" - if size == 0: - return '' - - data = '' - available_chunks = locators_and_ranges(self.segments, self._filepos, size) - if available_chunks: - locator, blocksize, segmentoffset, segmentsize = available_chunks[0] - data = self._stream._readfrom(locator+segmentoffset, segmentsize, - num_retries=num_retries) - - self._filepos += len(data) - return data - - @ArvadosFileBase._before_close - @retry_method - def readfrom(self, start, size, num_retries=None): - """Read up to 'size' bytes from the stream, starting at 'start'""" - if size == 0: - return '' - - data = [] - for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, start, size): - data.append(self._stream._readfrom(locator+segmentoffset, segmentsize, - num_retries=num_retries)) - return ''.join(data) + stream_tokens = [s] + sortedfiles = list(stream.keys()) + sortedfiles.sort() + + blocks = {} + streamoffset = 0L + for f in sortedfiles: + for b in stream[f]: + if b[arvados.LOCATOR] not in blocks: + stream_tokens.append(b[arvados.LOCATOR]) + blocks[b[arvados.LOCATOR]] = streamoffset + streamoffset += b[arvados.BLOCKSIZE] + + if len(stream_tokens) == 1: + stream_tokens.append(config.EMPTY_BLOCK_LOCATOR) + + for f in sortedfiles: + current_span = None + fout = f.replace(' ', '\\040') + for segment in stream[f]: + segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET] + if current_span is None: + current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]] + else: + if segmentoffset == current_span[1]: + current_span[1] += segment[arvados.SEGMENTSIZE] + else: + stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout)) + current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]] - @ArvadosFileBase._before_close - @retry_method - def readall(self, size=2**20, num_retries=None): - while True: - data = self.read(size, num_retries=num_retries) - if data == '': - break - yield data - - @ArvadosFileBase._before_close - @retry_method - def readline(self, size=float('inf'), num_retries=None): - cache_pos, cache_data = self._readline_cache - if self.tell() == cache_pos: - data = [cache_data] - else: - data = [''] - data_size = len(data[-1]) - while (data_size < size) and ('\n' not in data[-1]): - next_read = self.read(2 ** 20, num_retries=num_retries) - if not next_read: - break - data.append(next_read) - data_size += len(next_read) - data = ''.join(data) - try: - nextline_index = data.index('\n') + 1 - except ValueError: - nextline_index = len(data) - nextline_index = min(nextline_index, size) - self._readline_cache = (self.tell(), data[nextline_index:]) - return data[:nextline_index] - - @ArvadosFileBase._before_close - @retry_method - def decompress(self, decompress, size, num_retries=None): - for segment in self.readall(size, num_retries): - data = decompress(segment) - if data: - yield data + if current_span is not None: + stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout)) - @ArvadosFileBase._before_close - @retry_method - def readall_decompressed(self, size=2**20, num_retries=None): - self.seek(0) - if self.name.endswith('.bz2'): - dc = bz2.BZ2Decompressor() - return self.decompress(dc.decompress, size, - num_retries=num_retries) - elif self.name.endswith('.gz'): - dc = zlib.decompressobj(16+zlib.MAX_WBITS) - return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment), - size, num_retries=num_retries) - else: - return self.readall(size, num_retries=num_retries) + if not stream[f]: + stream_tokens.append("0:0:{0}".format(fout)) - @ArvadosFileBase._before_close - @retry_method - def readlines(self, sizehint=float('inf'), num_retries=None): - data = [] - data_size = 0 - for s in self.readall(num_retries=num_retries): - data.append(s) - data_size += len(s) - if data_size >= sizehint: - break - return ''.join(data).splitlines(True) - - def as_manifest(self): - manifest_text = ['.'] - manifest_text.extend([d[LOCATOR] for d in self._stream._data_locators]) - manifest_text.extend(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], self.name().replace(' ', '\\040')) for seg in self.segments]) - return arvados.CollectionReader(' '.join(manifest_text) + '\n').manifest_text(normalize=True) + return stream_tokens class StreamReader(object): @@ -387,13 +114,15 @@ class StreamReader(object): def locators_and_ranges(self, range_start, range_size): return locators_and_ranges(self._data_locators, range_start, range_size) + @retry_method def _keepget(self, locator, num_retries=None): return self._keep.get(locator, num_retries=num_retries) @retry_method def readfrom(self, start, size, num_retries=None): - self._readfrom(start, size, num_retries=num_retries) + return self._readfrom(start, size, num_retries=num_retries) + @retry_method def _readfrom(self, start, size, num_retries=None): """Read up to 'size' bytes from the stream, starting at 'start'""" if size == 0: @@ -413,11 +142,12 @@ class StreamReader(object): manifest_text.append(m.group(0)) else: manifest_text.extend([d[LOCATOR] for d in self._data_locators]) - manifest_text.extend([' '.join(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], f.name().replace(' ', '\\040')) + manifest_text.extend([' '.join(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], f.name.replace(' ', '\\040')) for seg in f.segments]) for f in self._files.values()]) return ' '.join(manifest_text) + '\n' + class BufferBlock(object): def __init__(self, locator, streamoffset): self.locator = locator @@ -431,6 +161,7 @@ class BufferBlock(object): self.write_pointer += len(data) self.locator_list_entry[1] = self.write_pointer + class StreamWriter(StreamReader): def __init__(self, tokens, keep=None, debug=False, _empty=False, num_retries=0): @@ -454,9 +185,10 @@ class StreamWriter(StreamReader): return method(self, *args, **kwargs) return wrapper - for _method_name in ['name', 'files', 'all_files', 'size', 'locators_and_ranges', 'readfrom', 'manifest_text']: + for _method_name in ['files', 'all_files', 'size', 'locators_and_ranges', 'readfrom', 'manifest_text']: locals()[_method_name] = _proxy_method(_method_name) + @retry_method def _keepget(self, locator, num_retries=None): if locator in self.bufferblocks: bb = self.bufferblocks[locator] @@ -476,68 +208,3 @@ class StreamWriter(StreamReader): def append(self, data): with self.mutex: self._append(data) - -class StreamFileWriter(StreamFileReader): - def __init__(self, stream, segments, name): - super(StreamFileWriter, self).__init__(stream, segments, name) - self.mode = 'wb' - - # wrap superclass methods in mutex - def _proxy_method(name): - method = getattr(StreamFileReader, name) - @functools.wraps(method, ('__name__', '__doc__')) - def wrapper(self, *args, **kwargs): - with self._stream.mutex: - return method(self, *args, **kwargs) - return wrapper - - for _method_name in ['__iter__', 'seek', 'tell', 'size', 'read', 'readfrom', 'readall', 'readline', 'decompress', 'readall_decompressed', 'readlines', 'as_manifest']: - locals()[_method_name] = _proxy_method(_method_name) - - def truncate(self, size=None): - with self._stream.mutex: - if size is None: - size = self._filepos - - segs = locators_and_ranges(self.segments, 0, size) - - newstream = [] - self.segments = [] - streamoffset = 0L - fileoffset = 0L - - for seg in segs: - for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._stream._data_locators, seg[LOCATOR]+seg[OFFSET], seg[SEGMENTSIZE]): - newstream.append([locator, blocksize, streamoffset]) - self.segments.append([streamoffset+segmentoffset, segmentsize, fileoffset]) - streamoffset += blocksize - fileoffset += segmentsize - if len(newstream) == 0: - newstream.append(config.EMPTY_BLOCK_LOCATOR) - self.segments.append([0, 0, 0]) - self._stream._data_locators = newstream - if self._filepos > fileoffset: - self._filepos = fileoffset - - def _writeto(self, offset, data): - self._stream._append(data) - replace_range(self.segments, self._filepos, len(data), self._stream._size()-len(data)) - self._filepos += len(data) - - def writeto(self, offset, data): - with self._stream.mutex: - self._writeto(offset, data) - - def write(self, data): - with self._stream.mutex: - self._writeto(self._filepos, data) - self._filepos += len(data) - - def writelines(self, seq): - with self._stream.mutex: - for s in seq: - self._writeto(self._filepos, s) - self._filepos += len(s) - - def flush(self): - pass diff --git a/sdk/python/tests/arvados_testutil.py b/sdk/python/tests/arvados_testutil.py index 04ca6b5e10..aa7e6320d5 100644 --- a/sdk/python/tests/arvados_testutil.py +++ b/sdk/python/tests/arvados_testutil.py @@ -63,8 +63,10 @@ class MockStreamReader(object): return self._name def readfrom(self, start, size, num_retries=None): - return self._data[start:start + size] + self._readfrom(start, size, num_retries=num_retries) + def _readfrom(self, start, size, num_retries=None): + return self._data[start:start + size] class ArvadosBaseTestCase(unittest.TestCase): # This class provides common utility functions for our tests. diff --git a/sdk/python/tests/test_stream.py b/sdk/python/tests/test_stream.py index f07ca6c7c1..baafc32800 100644 --- a/sdk/python/tests/test_stream.py +++ b/sdk/python/tests/test_stream.py @@ -315,7 +315,7 @@ class StreamFileWriterTestCase(unittest.TestCase): writer.seek(10) writer.write("foo") self.assertEqual("56789foo", writer.readfrom(5, 8)) - #print stream.manifest_text() + #print arvados.normalize_stream(".", {"count.txt": stream.locators_and_ranges(0, stream.size())}) if __name__ == '__main__': unittest.main()