X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/e21c8f8c0ba6f2012bc4108f8fa27411ab4375d1..3513c7def7eacdeef16c355f1b9be93830dcf946:/sdk/python/arvados/arvfile.py diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py index a043bee751..85366d2fdc 100644 --- a/sdk/python/arvados/arvfile.py +++ b/sdk/python/arvados/arvfile.py @@ -10,6 +10,7 @@ import copy import errno import re import logging +import collections from .errors import KeepWriteError, AssertionError, ArgumentError from .keep import KeepLocator @@ -405,7 +406,7 @@ class _BlockManager(object): def __init__(self, keep, copies=None): """keep: KeepClient object to use""" self._keep = keep - self._bufferblocks = {} + self._bufferblocks = collections.OrderedDict() self._put_queue = None self._put_threads = None self._prefetch_queue = None @@ -549,10 +550,11 @@ class _BlockManager(object): def __exit__(self, exc_type, exc_value, traceback): self.stop_threads() + @synchronized def repack_small_blocks(self, force=False, sync=False): """Packs small blocks together before uploading""" - # Search blocks ready for getting packed together before being committed to Keep - small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner and b.owner.closed() and b.owner.size() <= (config.KEEP_BLOCK_SIZE / 2)] + # Search blocks ready for getting packed together before being committed to Keep. + small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()] if len(small_blocks) <= 1: # Not enough small blocks for repacking return @@ -652,8 +654,9 @@ class _BlockManager(object): are uploaded. Raises KeepWriteError() if any blocks failed to upload. """ + self.repack_small_blocks(force=True, sync=True) + with self.lock: - self.repack_small_blocks(force=True, sync=True) items = self._bufferblocks.items() for k,v in items: