X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/a3e32a2f4a702c076d46a5b19305dd20a1ee3012..3678eda6ea138d948919944d1b54e680ba20ea81:/sdk/python/arvados/arvfile.py diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py index 0326608a0b..f2f7df2dce 100644 --- a/sdk/python/arvados/arvfile.py +++ b/sdk/python/arvados/arvfile.py @@ -108,6 +108,7 @@ class ArvadosFileReaderBase(_FileLikeObjectBase): cache_pos, cache_data = self._readline_cache if self.tell() == cache_pos: data = [cache_data] + self._filepos += len(cache_data) else: data = [''] data_size = len(data[-1]) @@ -123,6 +124,7 @@ class ArvadosFileReaderBase(_FileLikeObjectBase): except ValueError: nextline_index = len(data) nextline_index = min(nextline_index, size) + self._filepos -= len(data) - nextline_index self._readline_cache = (self.tell(), data[nextline_index:]) return data[:nextline_index] @@ -400,7 +402,7 @@ class _BlockManager(object): DEFAULT_PUT_THREADS = 2 DEFAULT_GET_THREADS = 2 - def __init__(self, keep): + def __init__(self, keep, copies=None): """keep: KeepClient object to use""" self._keep = keep self._bufferblocks = {} @@ -412,6 +414,7 @@ class _BlockManager(object): self.prefetch_enabled = True self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS + self.copies = copies @synchronized def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None): @@ -462,7 +465,10 @@ class _BlockManager(object): if bufferblock is None: return - loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()) + if self.copies is None: + loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()) + else: + loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies) bufferblock.set_state(_BufferBlock.COMMITTED, loc) except Exception as e: @@ -543,9 +549,6 @@ class _BlockManager(object): def __exit__(self, exc_type, exc_value, traceback): self.stop_threads() - def __del__(self): - self.stop_threads() - def commit_bufferblock(self, block, sync): """Initiate a background upload of a bufferblock. @@ -578,7 +581,10 @@ class _BlockManager(object): if sync: try: - loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes()) + if self.copies is None: + loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes()) + else: + loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies) block.set_state(_BufferBlock.COMMITTED, loc) except Exception as e: block.set_state(_BufferBlock.ERROR, e)