self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
self.copies = copies
+ self._pending_write_size = 0
@synchronized
def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
self.stop_threads()
@synchronized
- def repack_small_blocks(self, force=False, sync=False):
+ def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
"""Packs small blocks together before uploading"""
- # Search blocks ready for getting packed together before being committed to Keep.
- # A WRITABLE block always has an owner.
- # A WRITABLE block with its owner.closed() implies that it's
- # size is <= KEEP_BLOCK_SIZE/2.
- small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
- if len(small_blocks) <= 1:
- # Not enough small blocks for repacking
- return
+ self._pending_write_size += closed_file_size
# Check if there are enough small blocks for filling up one in full
- pending_write_size = sum([b.size() for b in small_blocks])
- if force or (pending_write_size >= config.KEEP_BLOCK_SIZE):
+ if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE):
+
+ # Search blocks ready for getting packed together before being committed to Keep.
+ # A WRITABLE block always has an owner.
+ # A WRITABLE block with its owner.closed() implies that it's
+ # size is <= KEEP_BLOCK_SIZE/2.
+ small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
+
+ if len(small_blocks) <= 1:
+ # Not enough small blocks for repacking
+ return
+
new_bb = self._alloc_bufferblock()
while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
bb = small_blocks.pop(0)
arvfile = bb.owner
+ self._pending_write_size -= bb.size()
new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
arvfile.set_segments([Range(new_bb.blockid,
0,
self.flush()
elif self.closed():
# All writers closed and size is adequate for repacking
- self.parent._my_block_manager().repack_small_blocks()
+ self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
def closed(self):
"""