X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/0988acb472849dc08d576ee40493e70bde2132ca..3513c7def7eacdeef16c355f1b9be93830dcf946:/sdk/python/arvados/arvfile.py diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py index ce0e5e3564..85366d2fdc 100644 --- a/sdk/python/arvados/arvfile.py +++ b/sdk/python/arvados/arvfile.py @@ -10,6 +10,7 @@ import copy import errno import re import logging +import collections from .errors import KeepWriteError, AssertionError, ArgumentError from .keep import KeepLocator @@ -108,6 +109,7 @@ class ArvadosFileReaderBase(_FileLikeObjectBase): cache_pos, cache_data = self._readline_cache if self.tell() == cache_pos: data = [cache_data] + self._filepos += len(cache_data) else: data = [''] data_size = len(data[-1]) @@ -123,6 +125,7 @@ class ArvadosFileReaderBase(_FileLikeObjectBase): except ValueError: nextline_index = len(data) nextline_index = min(nextline_index, size) + self._filepos -= len(data) - nextline_index self._readline_cache = (self.tell(), data[nextline_index:]) return data[:nextline_index] @@ -320,7 +323,7 @@ class _BufferBlock(object): @synchronized def set_state(self, nextstate, val=None): if (self._state, nextstate) not in self.STATE_TRANSITIONS: - raise StateChangeError("Invalid state change from %s to %s" % (self.state, nextstate), self.state, nextstate) + raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate) self._state = nextstate if self._state == _BufferBlock.PENDING: @@ -400,10 +403,10 @@ class _BlockManager(object): DEFAULT_PUT_THREADS = 2 DEFAULT_GET_THREADS = 2 - def __init__(self, keep): + def __init__(self, keep, copies=None): """keep: KeepClient object to use""" self._keep = keep - self._bufferblocks = {} + self._bufferblocks = collections.OrderedDict() self._put_queue = None self._put_threads = None self._prefetch_queue = None @@ -412,6 +415,7 @@ class _BlockManager(object): self.prefetch_enabled = True self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS + self.copies = copies @synchronized def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None): @@ -462,7 +466,10 @@ class _BlockManager(object): if bufferblock is None: return - loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()) + if self.copies is None: + loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()) + else: + loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies) bufferblock.set_state(_BufferBlock.COMMITTED, loc) except Exception as e: @@ -491,7 +498,7 @@ class _BlockManager(object): for i in xrange(0, self.num_put_threads): thread = threading.Thread(target=self._commit_bufferblock_worker) self._put_threads.append(thread) - thread.daemon = False + thread.daemon = True thread.start() def _block_prefetch_worker(self): @@ -543,8 +550,30 @@ class _BlockManager(object): def __exit__(self, exc_type, exc_value, traceback): self.stop_threads() - def __del__(self): - self.stop_threads() + @synchronized + def repack_small_blocks(self, force=False, sync=False): + """Packs small blocks together before uploading""" + # Search blocks ready for getting packed together before being committed to Keep. + small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()] + if len(small_blocks) <= 1: + # Not enough small blocks for repacking + return + + # Check if there are enough small blocks for filling up one in full + pending_write_size = sum([b.size() for b in small_blocks]) + if force or (pending_write_size >= config.KEEP_BLOCK_SIZE): + new_bb = _BufferBlock("bufferblock%i" % len(self._bufferblocks), 2**14, None) + self._bufferblocks[new_bb.blockid] = new_bb + size = 0 + while len(small_blocks) > 0 and (size + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE: + bb = small_blocks.pop(0) + size += bb.size() + arvfile = bb.owner + new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes()) + arvfile.set_segments([Range(new_bb.blockid, 0, bb.size(), size-bb.size())]) + bb.clear() + del self._bufferblocks[bb.blockid] + self.commit_bufferblock(new_bb, sync=sync) def commit_bufferblock(self, block, sync): """Initiate a background upload of a bufferblock. @@ -559,20 +588,28 @@ class _BlockManager(object): which case it will wait on an upload queue slot. """ - try: # Mark the block as PENDING so to disallow any more appends. block.set_state(_BufferBlock.PENDING) except StateChangeError as e: - if e.state == _BufferBlock.PENDING and sync: - block.wait_for_commit.wait() - if block.state() == _BufferBlock.ERROR: - raise block.error - return + if e.state == _BufferBlock.PENDING: + if sync: + block.wait_for_commit.wait() + else: + return + if block.state() == _BufferBlock.COMMITTED: + return + elif block.state() == _BufferBlock.ERROR: + raise block.error + else: + raise if sync: try: - loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes()) + if self.copies is None: + loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes()) + else: + loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies) block.set_state(_BufferBlock.COMMITTED, loc) except Exception as e: block.set_state(_BufferBlock.ERROR, e) @@ -617,11 +654,13 @@ class _BlockManager(object): are uploaded. Raises KeepWriteError() if any blocks failed to upload. """ + self.repack_small_blocks(force=True, sync=True) + with self.lock: items = self._bufferblocks.items() for k,v in items: - if v.state() != _BufferBlock.COMMITTED: + if v.state() != _BufferBlock.COMMITTED and v.owner: v.owner.flush(sync=False) with self.lock: @@ -688,6 +727,7 @@ class ArvadosFile(object): """ self.parent = parent self.name = name + self._writers = set() self._committed = False self._segments = [] self.lock = parent.root_collection().lock @@ -764,9 +804,13 @@ class ArvadosFile(object): def __ne__(self, other): return not self.__eq__(other) + @synchronized + def set_segments(self, segs): + self._segments = segs + @synchronized def set_committed(self): - """Set committed flag to False""" + """Set committed flag to True""" self._committed = True @synchronized @@ -774,6 +818,34 @@ class ArvadosFile(object): """Get whether this is committed or not.""" return self._committed + @synchronized + def add_writer(self, writer): + """Add an ArvadosFileWriter reference to the list of writers""" + if isinstance(writer, ArvadosFileWriter): + self._writers.add(writer) + + @synchronized + def remove_writer(self, writer): + """ + Called from ArvadosFileWriter.close(). Remove a writer reference from the list + and do some block maintenance tasks. + """ + self._writers.remove(writer) + + if self.size() > config.KEEP_BLOCK_SIZE / 2: + # File writer closed, not small enough for repacking + self.flush() + elif self.closed(): + # All writers closed and size is adequate for repacking + self.parent._my_block_manager().repack_small_blocks() + + def closed(self): + """ + Get whether this is closed or not. When the writers list is empty, the file + is supposed to be closed. + """ + return len(self._writers) == 0 + @must_be_writable @synchronized def truncate(self, size): @@ -929,7 +1001,7 @@ class ArvadosFile(object): bb = self.parent._my_block_manager().get_bufferblock(s.locator) if bb: if bb.state() != _BufferBlock.COMMITTED: - self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=True) + self.parent._my_block_manager().commit_bufferblock(bb, sync=True) to_delete.add(s.locator) s.locator = bb.locator() for s in to_delete: @@ -1055,6 +1127,7 @@ class ArvadosFileWriter(ArvadosFileReader): def __init__(self, arvadosfile, mode, num_retries=None): super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries) self.mode = mode + self.arvadosfile.add_writer(self) @_FileLikeObjectBase._before_close @retry_method @@ -1086,5 +1159,5 @@ class ArvadosFileWriter(ArvadosFileReader): def close(self): if not self.closed: - self.flush() + self.arvadosfile.remove_writer(self) super(ArvadosFileWriter, self).close()