import errno
import re
import logging
+import collections
from .errors import KeepWriteError, AssertionError, ArgumentError
from .keep import KeepLocator
def __init__(self, keep, copies=None):
"""keep: KeepClient object to use"""
self._keep = keep
- self._bufferblocks = {}
+ self._bufferblocks = collections.OrderedDict()
self._put_queue = None
self._put_threads = None
self._prefetch_queue = None
def __exit__(self, exc_type, exc_value, traceback):
self.stop_threads()
+ @synchronized
def repack_small_blocks(self, force=False, sync=False):
"""Packs small blocks together before uploading"""
- # Search blocks ready for getting packed together before being committed to Keep
- small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner and b.owner.closed() and b.owner.size() <= (config.KEEP_BLOCK_SIZE / 2)]
+ # Search blocks ready for getting packed together before being committed to Keep.
+ small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
if len(small_blocks) <= 1:
# Not enough small blocks for repacking
return
are uploaded. Raises KeepWriteError() if any blocks failed to upload.
"""
+ self.repack_small_blocks(force=True, sync=True)
+
with self.lock:
- self.repack_small_blocks(force=True, sync=True)
items = self._bufferblocks.items()
for k,v in items: