X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/9b15259759d5e73ca51f30956527dc774c5a80cf..ef6f7202858cba65e06cc1a32d52ee2305687bc8:/sdk/python/arvados/arvfile.py diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py index b78c63e301..c394dab810 100644 --- a/sdk/python/arvados/arvfile.py +++ b/sdk/python/arvados/arvfile.py @@ -10,6 +10,8 @@ import copy import errno import re import logging +import collections +import uuid from .errors import KeepWriteError, AssertionError, ArgumentError from .keep import KeepLocator @@ -402,10 +404,10 @@ class _BlockManager(object): DEFAULT_PUT_THREADS = 2 DEFAULT_GET_THREADS = 2 - def __init__(self, keep): + def __init__(self, keep, copies=None): """keep: KeepClient object to use""" self._keep = keep - self._bufferblocks = {} + self._bufferblocks = collections.OrderedDict() self._put_queue = None self._put_threads = None self._prefetch_queue = None @@ -414,6 +416,7 @@ class _BlockManager(object): self.prefetch_enabled = True self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS + self.copies = copies @synchronized def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None): @@ -429,8 +432,11 @@ class _BlockManager(object): ArvadosFile that owns this block """ + return self._alloc_bufferblock(blockid, starting_capacity, owner) + + def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None): if blockid is None: - blockid = "bufferblock%i" % len(self._bufferblocks) + blockid = "%s" % uuid.uuid4() bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner) self._bufferblocks[bufferblock.blockid] = bufferblock return bufferblock @@ -464,7 +470,10 @@ class _BlockManager(object): if bufferblock is None: return - loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()) + if self.copies is None: + loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()) + else: + loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies) bufferblock.set_state(_BufferBlock.COMMITTED, loc) except Exception as e: @@ -545,6 +554,33 @@ class _BlockManager(object): def __exit__(self, exc_type, exc_value, traceback): self.stop_threads() + @synchronized + def repack_small_blocks(self, force=False, sync=False): + """Packs small blocks together before uploading""" + # Search blocks ready for getting packed together before being committed to Keep. + # A WRITABLE block always has an owner. + # A WRITABLE block with its owner.closed() implies that it's + # size is <= KEEP_BLOCK_SIZE/2. + small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()] + if len(small_blocks) <= 1: + # Not enough small blocks for repacking + return + + # Check if there are enough small blocks for filling up one in full + pending_write_size = sum([b.size() for b in small_blocks]) + if force or (pending_write_size >= config.KEEP_BLOCK_SIZE): + new_bb = self._alloc_bufferblock() + while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE: + bb = small_blocks.pop(0) + arvfile = bb.owner + new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes()) + arvfile.set_segments([Range(new_bb.blockid, + 0, + bb.size(), + new_bb.write_pointer - bb.size())]) + self._delete_bufferblock(bb.blockid) + self.commit_bufferblock(new_bb, sync=sync) + def commit_bufferblock(self, block, sync): """Initiate a background upload of a bufferblock. @@ -558,7 +594,6 @@ class _BlockManager(object): which case it will wait on an upload queue slot. """ - try: # Mark the block as PENDING so to disallow any more appends. block.set_state(_BufferBlock.PENDING) @@ -577,7 +612,10 @@ class _BlockManager(object): if sync: try: - loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes()) + if self.copies is None: + loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes()) + else: + loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies) block.set_state(_BufferBlock.COMMITTED, loc) except Exception as e: block.set_state(_BufferBlock.ERROR, e) @@ -592,6 +630,9 @@ class _BlockManager(object): @synchronized def delete_bufferblock(self, locator): + self._delete_bufferblock(locator) + + def _delete_bufferblock(self, locator): bb = self._bufferblocks[locator] bb.clear() del self._bufferblocks[locator] @@ -622,11 +663,13 @@ class _BlockManager(object): are uploaded. Raises KeepWriteError() if any blocks failed to upload. """ + self.repack_small_blocks(force=True, sync=True) + with self.lock: items = self._bufferblocks.items() for k,v in items: - if v.state() != _BufferBlock.COMMITTED: + if v.state() != _BufferBlock.COMMITTED and v.owner: v.owner.flush(sync=False) with self.lock: @@ -693,6 +736,7 @@ class ArvadosFile(object): """ self.parent = parent self.name = name + self._writers = set() self._committed = False self._segments = [] self.lock = parent.root_collection().lock @@ -769,9 +813,13 @@ class ArvadosFile(object): def __ne__(self, other): return not self.__eq__(other) + @synchronized + def set_segments(self, segs): + self._segments = segs + @synchronized def set_committed(self): - """Set committed flag to False""" + """Set committed flag to True""" self._committed = True @synchronized @@ -779,6 +827,34 @@ class ArvadosFile(object): """Get whether this is committed or not.""" return self._committed + @synchronized + def add_writer(self, writer): + """Add an ArvadosFileWriter reference to the list of writers""" + if isinstance(writer, ArvadosFileWriter): + self._writers.add(writer) + + @synchronized + def remove_writer(self, writer, flush): + """ + Called from ArvadosFileWriter.close(). Remove a writer reference from the list + and do some block maintenance tasks. + """ + self._writers.remove(writer) + + if flush or self.size() > config.KEEP_BLOCK_SIZE / 2: + # File writer closed, not small enough for repacking + self.flush() + elif self.closed(): + # All writers closed and size is adequate for repacking + self.parent._my_block_manager().repack_small_blocks() + + def closed(self): + """ + Get whether this is closed or not. When the writers list is empty, the file + is supposed to be closed. + """ + return len(self._writers) == 0 + @must_be_writable @synchronized def truncate(self, size): @@ -1060,6 +1136,7 @@ class ArvadosFileWriter(ArvadosFileReader): def __init__(self, arvadosfile, mode, num_retries=None): super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries) self.mode = mode + self.arvadosfile.add_writer(self) @_FileLikeObjectBase._before_close @retry_method @@ -1089,7 +1166,7 @@ class ArvadosFileWriter(ArvadosFileReader): def flush(self): self.arvadosfile.flush() - def close(self): + def close(self, flush=True): if not self.closed: - self.flush() + self.arvadosfile.remove_writer(self, flush) super(ArvadosFileWriter, self).close()