self.buffer_block = None
self.buffer_view = None
+ @synchronized
+ def repack_writes(self):
+ """Optimize buffer block by repacking segments in file sequence.
+
+ When the client makes random writes, they appear in the buffer block in
+ the sequence they were written rather than the sequence they appear in
+ the file. This makes for inefficient, fragmented manifests. Attempt
+ to optimize by repacking writes in file sequence.
+
+ """
+ if self._state != _BufferBlock.WRITABLE:
+ raise AssertionError("Cannot repack non-writable block")
+
+ segs = self.owner.segments()
+
+ # Collect the segments that reference the buffer block.
+ bufferblock_segs = [s for s in segs if s.locator == self.blockid]
+
+ # Collect total data referenced by segments (could be smaller than
+ # bufferblock size if a portion of the file was written and
+ # then overwritten).
+ write_total = sum([s.range_size for s in bufferblock_segs])
+
+ if write_total < self.size() or len(bufferblock_segs) > 1:
+ # If there's more than one segment referencing this block, it is
+ # due to out-of-order writes and will produce a fragmented
+ # manifest, so try to optimize by re-packing into a new buffer.
+ contents = self.buffer_view[0:self.write_pointer].tobytes()
+ new_bb = _BufferBlock(None, write_total, None)
+ for t in bufferblock_segs:
+ new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
+ t.segment_offset = new_bb.size() - t.range_size
+
+ self.buffer_block = new_bb.buffer_block
+ self.buffer_view = new_bb.buffer_view
+ self.write_pointer = new_bb.write_pointer
+ self._locator = None
+ new_bb.clear()
+ self.owner.set_segments(segs)
+
def __repr__(self):
return "<BufferBlock %s>" % (self.blockid)
def __exit__(self, exc_type, exc_value, traceback):
self.stop_threads()
+ @synchronized
def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
"""Packs small blocks together before uploading"""
- with self.lock:
- self._pending_write_size += closed_file_size
-
- # Check if there are enough small blocks for filling up one in full
- if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
- return
+ self._pending_write_size += closed_file_size
- # Search blocks ready for getting packed together before being committed to Keep.
- # A WRITABLE block always has an owner.
- # A WRITABLE block with its owner.closed() implies that it's
- # size is <= KEEP_BLOCK_SIZE/2.
- bufferblocks = self._bufferblocks.values()
+ # Check if there are enough small blocks for filling up one in full
+ if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
+ return
+ # Search blocks ready for getting packed together before being committed to Keep.
+ # A WRITABLE block always has an owner.
+ # A WRITABLE block with its owner.closed() implies that it's
+ # size is <= KEEP_BLOCK_SIZE/2.
try:
- for b in bufferblocks:
- if b.state() == _BufferBlock.WRITABLE and b.owner.closed():
- b.owner._repack_writes(0)
+ small_blocks = [b for b in self._bufferblocks.values()
+ if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
except AttributeError:
# Writable blocks without owner shouldn't exist.
raise UnownedBlockError()
- with self.lock:
- small_blocks = [b for b in self._bufferblocks.values()
- if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
+ if len(small_blocks) <= 1:
+ # Not enough small blocks for repacking
+ return
- if len(small_blocks) <= 1:
- # Not enough small blocks for repacking
- return
+ for bb in small_blocks:
+ bb.repack_writes()
- # Update the pending write size count with its true value, just in case
- # some small file was opened, written and closed several times.
- self._pending_write_size = sum([b.size() for b in small_blocks])
- if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
- return
+ # Update the pending write size count with its true value, just in case
+ # some small file was opened, written and closed several times.
+ self._pending_write_size = sum([b.size() for b in small_blocks])
- new_bb = self._alloc_bufferblock()
- files = []
- while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
- bb = small_blocks.pop(0)
- self._pending_write_size -= bb.size()
- new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
- files.append((bb, new_bb.write_pointer - bb.size()))
-
- self.commit_bufferblock(new_bb, sync=sync)
-
- for bb, new_bb_segment_offset in files:
- newsegs = []
- for s in bb.owner.segments():
- if s.locator == bb.blockid:
- newsegs.append(Range(new_bb.locator(), s.range_start, s.range_size, new_bb_segment_offset+s.segment_offset))
- else:
- newsegs.append(s)
- bb.owner.set_segments(newsegs)
- self._delete_bufferblock(bb.blockid)
+ if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
+ return
+
+ new_bb = self._alloc_bufferblock()
+ files = []
+ while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
+ bb = small_blocks.pop(0)
+ self._pending_write_size -= bb.size()
+ new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
+ files.append((bb, new_bb.write_pointer - bb.size()))
+
+ self.commit_bufferblock(new_bb, sync=sync)
+
+ for bb, new_bb_segment_offset in files:
+ newsegs = bb.owner.segments()
+ for s in newsegs:
+ if s.locator == bb.blockid:
+ s.locator = new_bb.locator()
+ s.segment_offset = new_bb_segment_offset+s.segment_offset
+ bb.owner.set_segments(newsegs)
+ self._delete_bufferblock(bb.blockid)
def commit_bufferblock(self, block, sync):
"""Initiate a background upload of a bufferblock.
return ''.join(data)
- def _repack_writes(self, num_retries):
- """Optimize buffer block by repacking segments in file sequence.
-
- When the client makes random writes, they appear in the buffer block in
- the sequence they were written rather than the sequence they appear in
- the file. This makes for inefficient, fragmented manifests. Attempt
- to optimize by repacking writes in file sequence.
-
- """
- segs = self._segments
-
- # Collect the segments that reference the buffer block.
- bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
-
- # Collect total data referenced by segments (could be smaller than
- # bufferblock size if a portion of the file was written and
- # then overwritten).
- write_total = sum([s.range_size for s in bufferblock_segs])
-
- if write_total < self._current_bblock.size() or len(bufferblock_segs) > 1:
- # If there's more than one segment referencing this block, it is
- # due to out-of-order writes and will produce a fragmented
- # manifest, so try to optimize by re-packing into a new buffer.
- contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
- new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
- for t in bufferblock_segs:
- new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
- t.segment_offset = new_bb.size() - t.range_size
- self._current_bblock.clear()
- self._current_bblock = new_bb
-
@must_be_writable
@synchronized
def writeto(self, offset, data, num_retries):
self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
- self._repack_writes(num_retries)
+ self._current_bblock.repack_writes()
if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
if self._current_bblock.state() == _BufferBlock.WRITABLE:
- self._repack_writes(num_retries)
+ self._current_bblock.repack_writes()
if self._current_bblock.state() != _BufferBlock.DELETED:
self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
normalize=False, only_committed=False):
buf = ""
filestream = []
- for segment in self.segments():
+ for segment in self._segments:
loc = segment.locator
if self.parent._my_block_manager().is_bufferblock(loc):
if only_committed: