+ @synchronized
+ def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
+ """Packs small blocks together before uploading"""
+
+ self._pending_write_size += closed_file_size
+
+ # Check if there are enough small blocks for filling up one in full
+ if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
+ return
+
+ # Search blocks ready for getting packed together before being
+ # committed to Keep.
+ # A WRITABLE block always has an owner.
+ # A WRITABLE block with its owner.closed() implies that its
+ # size is <= KEEP_BLOCK_SIZE/2.
+ try:
+ small_blocks = [b for b in listvalues(self._bufferblocks)
+ if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
+ except AttributeError:
+ # Writable blocks without owner shouldn't exist.
+ raise UnownedBlockError()
+
+ if len(small_blocks) <= 1:
+ # Not enough small blocks for repacking
+ return
+
+ for bb in small_blocks:
+ bb.repack_writes()
+
+ # Update the pending write size count with its true value, just in case
+ # some small file was opened, written and closed several times.
+ self._pending_write_size = sum([b.size() for b in small_blocks])
+
+ if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
+ return
+
+ new_bb = self._alloc_bufferblock()
+ new_bb.owner = []
+ files = []
+ while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
+ bb = small_blocks.pop(0)
+ new_bb.owner.append(bb.owner)
+ self._pending_write_size -= bb.size()
+ new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
+ files.append((bb, new_bb.write_pointer - bb.size()))
+
+ self.commit_bufferblock(new_bb, sync=sync)
+
+ for bb, new_bb_segment_offset in files:
+ newsegs = bb.owner.segments()
+ for s in newsegs:
+ if s.locator == bb.blockid:
+ s.locator = new_bb.blockid
+ s.segment_offset = new_bb_segment_offset+s.segment_offset
+ bb.owner.set_segments(newsegs)
+ self._delete_bufferblock(bb.blockid)