X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/7497b21c937bb6e8451f16047945b7cfc9081a53..3513c7def7eacdeef16c355f1b9be93830dcf946:/sdk/python/arvados/arvfile.py diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py index f2f7df2dce..85366d2fdc 100644 --- a/sdk/python/arvados/arvfile.py +++ b/sdk/python/arvados/arvfile.py @@ -10,6 +10,7 @@ import copy import errno import re import logging +import collections from .errors import KeepWriteError, AssertionError, ArgumentError from .keep import KeepLocator @@ -405,7 +406,7 @@ class _BlockManager(object): def __init__(self, keep, copies=None): """keep: KeepClient object to use""" self._keep = keep - self._bufferblocks = {} + self._bufferblocks = collections.OrderedDict() self._put_queue = None self._put_threads = None self._prefetch_queue = None @@ -549,6 +550,31 @@ class _BlockManager(object): def __exit__(self, exc_type, exc_value, traceback): self.stop_threads() + @synchronized + def repack_small_blocks(self, force=False, sync=False): + """Packs small blocks together before uploading""" + # Search blocks ready for getting packed together before being committed to Keep. + small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()] + if len(small_blocks) <= 1: + # Not enough small blocks for repacking + return + + # Check if there are enough small blocks for filling up one in full + pending_write_size = sum([b.size() for b in small_blocks]) + if force or (pending_write_size >= config.KEEP_BLOCK_SIZE): + new_bb = _BufferBlock("bufferblock%i" % len(self._bufferblocks), 2**14, None) + self._bufferblocks[new_bb.blockid] = new_bb + size = 0 + while len(small_blocks) > 0 and (size + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE: + bb = small_blocks.pop(0) + size += bb.size() + arvfile = bb.owner + new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes()) + arvfile.set_segments([Range(new_bb.blockid, 0, bb.size(), size-bb.size())]) + bb.clear() + del self._bufferblocks[bb.blockid] + self.commit_bufferblock(new_bb, sync=sync) + def commit_bufferblock(self, block, sync): """Initiate a background upload of a bufferblock. @@ -562,7 +588,6 @@ class _BlockManager(object): which case it will wait on an upload queue slot. """ - try: # Mark the block as PENDING so to disallow any more appends. block.set_state(_BufferBlock.PENDING) @@ -629,11 +654,13 @@ class _BlockManager(object): are uploaded. Raises KeepWriteError() if any blocks failed to upload. """ + self.repack_small_blocks(force=True, sync=True) + with self.lock: items = self._bufferblocks.items() for k,v in items: - if v.state() != _BufferBlock.COMMITTED: + if v.state() != _BufferBlock.COMMITTED and v.owner: v.owner.flush(sync=False) with self.lock: @@ -700,6 +727,7 @@ class ArvadosFile(object): """ self.parent = parent self.name = name + self._writers = set() self._committed = False self._segments = [] self.lock = parent.root_collection().lock @@ -776,9 +804,13 @@ class ArvadosFile(object): def __ne__(self, other): return not self.__eq__(other) + @synchronized + def set_segments(self, segs): + self._segments = segs + @synchronized def set_committed(self): - """Set committed flag to False""" + """Set committed flag to True""" self._committed = True @synchronized @@ -786,6 +818,34 @@ class ArvadosFile(object): """Get whether this is committed or not.""" return self._committed + @synchronized + def add_writer(self, writer): + """Add an ArvadosFileWriter reference to the list of writers""" + if isinstance(writer, ArvadosFileWriter): + self._writers.add(writer) + + @synchronized + def remove_writer(self, writer): + """ + Called from ArvadosFileWriter.close(). Remove a writer reference from the list + and do some block maintenance tasks. + """ + self._writers.remove(writer) + + if self.size() > config.KEEP_BLOCK_SIZE / 2: + # File writer closed, not small enough for repacking + self.flush() + elif self.closed(): + # All writers closed and size is adequate for repacking + self.parent._my_block_manager().repack_small_blocks() + + def closed(self): + """ + Get whether this is closed or not. When the writers list is empty, the file + is supposed to be closed. + """ + return len(self._writers) == 0 + @must_be_writable @synchronized def truncate(self, size): @@ -1067,6 +1127,7 @@ class ArvadosFileWriter(ArvadosFileReader): def __init__(self, arvadosfile, mode, num_retries=None): super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries) self.mode = mode + self.arvadosfile.add_writer(self) @_FileLikeObjectBase._before_close @retry_method @@ -1098,5 +1159,5 @@ class ArvadosFileWriter(ArvadosFileReader): def close(self): if not self.closed: - self.flush() + self.arvadosfile.remove_writer(self) super(ArvadosFileWriter, self).close()