self._pending_write_size += closed_file_size
# Check if there are enough small blocks for filling up one in full
- if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE):
+ if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
+ return
- # Search blocks ready for getting packed together before being committed to Keep.
- # A WRITABLE block always has an owner.
- # A WRITABLE block with its owner.closed() implies that it's
- # size is <= KEEP_BLOCK_SIZE/2.
- try:
- small_blocks = [b for b in listvalues(self._bufferblocks) if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
- except AttributeError:
- # Writable blocks without owner shouldn't exist.
- raise UnownedBlockError()
- # Search blocks ready for getting packed together before being committed to Keep.
++ # Search blocks ready for getting packed together before being
++ # committed to Keep.
+ # A WRITABLE block always has an owner.
- # A WRITABLE block with its owner.closed() implies that it's
++ # A WRITABLE block with its owner.closed() implies that its
+ # size is <= KEEP_BLOCK_SIZE/2.
+ try:
- small_blocks = [b for b in self._bufferblocks.values()
++ small_blocks = [b for b in listvalues(self._bufferblocks)
+ if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
+ except AttributeError:
+ # Writable blocks without owner shouldn't exist.
+ raise UnownedBlockError()
+
+ if len(small_blocks) <= 1:
+ # Not enough small blocks for repacking
+ return
- if len(small_blocks) <= 1:
- # Not enough small blocks for repacking
- return
+ for bb in small_blocks:
+ bb.repack_writes()
- # Update the pending write size count with its true value, just in case
- # some small file was opened, written and closed several times.
- self._pending_write_size = sum([b.size() for b in small_blocks])
- if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
- return
+ # Update the pending write size count with its true value, just in case
+ # some small file was opened, written and closed several times.
+ self._pending_write_size = sum([b.size() for b in small_blocks])
- new_bb = self._alloc_bufferblock()
- while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
- bb = small_blocks.pop(0)
- arvfile = bb.owner
- self._pending_write_size -= bb.size()
- new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
- arvfile.set_segments([Range(new_bb.blockid,
- 0,
- bb.size(),
- new_bb.write_pointer - bb.size())])
- self._delete_bufferblock(bb.blockid)
- self.commit_bufferblock(new_bb, sync=sync)
+ if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
+ return
+
+ new_bb = self._alloc_bufferblock()
+ files = []
+ while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
+ bb = small_blocks.pop(0)
+ self._pending_write_size -= bb.size()
+ new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
+ files.append((bb, new_bb.write_pointer - bb.size()))
+
+ self.commit_bufferblock(new_bb, sync=sync)
+
+ for bb, new_bb_segment_offset in files:
+ newsegs = bb.owner.segments()
+ for s in newsegs:
+ if s.locator == bb.blockid:
+ s.locator = new_bb.locator()
+ s.segment_offset = new_bb_segment_offset+s.segment_offset
+ bb.owner.set_segments(newsegs)
+ self._delete_bufferblock(bb.blockid)
def commit_bufferblock(self, block, sync):
"""Initiate a background upload of a bufferblock.
self.parent._my_block_manager().block_prefetch(lr.locator)
locs.add(lr.locator)
- return ''.join(data)
+ return b''.join(data)
- def _repack_writes(self, num_retries):
- """Optimize buffer block by repacking segments in file sequence.
-
- When the client makes random writes, they appear in the buffer block in
- the sequence they were written rather than the sequence they appear in
- the file. This makes for inefficient, fragmented manifests. Attempt
- to optimize by repacking writes in file sequence.
-
- """
- segs = self._segments
-
- # Collect the segments that reference the buffer block.
- bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
-
- # Collect total data referenced by segments (could be smaller than
- # bufferblock size if a portion of the file was written and
- # then overwritten).
- write_total = sum([s.range_size for s in bufferblock_segs])
-
- if write_total < self._current_bblock.size() or len(bufferblock_segs) > 1:
- # If there's more than one segment referencing this block, it is
- # due to out-of-order writes and will produce a fragmented
- # manifest, so try to optimize by re-packing into a new buffer.
- contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
- new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
- for t in bufferblock_segs:
- new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
- t.segment_offset = new_bb.size() - t.range_size
-
- self._current_bblock = new_bb
-
@must_be_writable
@synchronized
def writeto(self, offset, data, num_retries):