From ad7294edfcc59c3e67548328a88c9e689c3ae2cf Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Fri, 21 Apr 2017 16:36:21 -0400 Subject: [PATCH] 11507: Move repack_writes() method from ArvadosFile to BufferBlock. --- sdk/python/arvados/arvfile.py | 161 ++++++++++++++++++---------------- 1 file changed, 83 insertions(+), 78 deletions(-) diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py index 64e0f05280..a2ec76a076 100644 --- a/sdk/python/arvados/arvfile.py +++ b/sdk/python/arvados/arvfile.py @@ -389,6 +389,46 @@ class _BufferBlock(object): self.buffer_block = None self.buffer_view = None + @synchronized + def repack_writes(self): + """Optimize buffer block by repacking segments in file sequence. + + When the client makes random writes, they appear in the buffer block in + the sequence they were written rather than the sequence they appear in + the file. This makes for inefficient, fragmented manifests. Attempt + to optimize by repacking writes in file sequence. + + """ + if self._state != _BufferBlock.WRITABLE: + raise AssertionError("Cannot repack non-writable block") + + segs = self.owner.segments() + + # Collect the segments that reference the buffer block. + bufferblock_segs = [s for s in segs if s.locator == self.blockid] + + # Collect total data referenced by segments (could be smaller than + # bufferblock size if a portion of the file was written and + # then overwritten). + write_total = sum([s.range_size for s in bufferblock_segs]) + + if write_total < self.size() or len(bufferblock_segs) > 1: + # If there's more than one segment referencing this block, it is + # due to out-of-order writes and will produce a fragmented + # manifest, so try to optimize by re-packing into a new buffer. + contents = self.buffer_view[0:self.write_pointer].tobytes() + new_bb = _BufferBlock(None, write_total, None) + for t in bufferblock_segs: + new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size]) + t.segment_offset = new_bb.size() - t.range_size + + self.buffer_block = new_bb.buffer_block + self.buffer_view = new_bb.buffer_view + self.write_pointer = new_bb.write_pointer + self._locator = None + new_bb.clear() + self.owner.set_segments(segs) + def __repr__(self): return "" % (self.blockid) @@ -583,63 +623,59 @@ class _BlockManager(object): def __exit__(self, exc_type, exc_value, traceback): self.stop_threads() + @synchronized def repack_small_blocks(self, force=False, sync=False, closed_file_size=0): """Packs small blocks together before uploading""" - with self.lock: - self._pending_write_size += closed_file_size - - # Check if there are enough small blocks for filling up one in full - if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)): - return + self._pending_write_size += closed_file_size - # Search blocks ready for getting packed together before being committed to Keep. - # A WRITABLE block always has an owner. - # A WRITABLE block with its owner.closed() implies that it's - # size is <= KEEP_BLOCK_SIZE/2. - bufferblocks = self._bufferblocks.values() + # Check if there are enough small blocks for filling up one in full + if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)): + return + # Search blocks ready for getting packed together before being committed to Keep. + # A WRITABLE block always has an owner. + # A WRITABLE block with its owner.closed() implies that it's + # size is <= KEEP_BLOCK_SIZE/2. try: - for b in bufferblocks: - if b.state() == _BufferBlock.WRITABLE and b.owner.closed(): - b.owner._repack_writes(0) + small_blocks = [b for b in self._bufferblocks.values() + if b.state() == _BufferBlock.WRITABLE and b.owner.closed()] except AttributeError: # Writable blocks without owner shouldn't exist. raise UnownedBlockError() - with self.lock: - small_blocks = [b for b in self._bufferblocks.values() - if b.state() == _BufferBlock.WRITABLE and b.owner.closed()] + if len(small_blocks) <= 1: + # Not enough small blocks for repacking + return - if len(small_blocks) <= 1: - # Not enough small blocks for repacking - return + for bb in small_blocks: + bb.repack_writes() - # Update the pending write size count with its true value, just in case - # some small file was opened, written and closed several times. - self._pending_write_size = sum([b.size() for b in small_blocks]) - if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force: - return + # Update the pending write size count with its true value, just in case + # some small file was opened, written and closed several times. + self._pending_write_size = sum([b.size() for b in small_blocks]) - new_bb = self._alloc_bufferblock() - files = [] - while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE: - bb = small_blocks.pop(0) - self._pending_write_size -= bb.size() - new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes()) - files.append((bb, new_bb.write_pointer - bb.size())) - - self.commit_bufferblock(new_bb, sync=sync) - - for bb, new_bb_segment_offset in files: - newsegs = [] - for s in bb.owner.segments(): - if s.locator == bb.blockid: - newsegs.append(Range(new_bb.locator(), s.range_start, s.range_size, new_bb_segment_offset+s.segment_offset)) - else: - newsegs.append(s) - bb.owner.set_segments(newsegs) - self._delete_bufferblock(bb.blockid) + if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force: + return + + new_bb = self._alloc_bufferblock() + files = [] + while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE: + bb = small_blocks.pop(0) + self._pending_write_size -= bb.size() + new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes()) + files.append((bb, new_bb.write_pointer - bb.size())) + + self.commit_bufferblock(new_bb, sync=sync) + + for bb, new_bb_segment_offset in files: + newsegs = bb.owner.segments() + for s in newsegs: + if s.locator == bb.blockid: + s.locator = new_bb.locator() + s.segment_offset = new_bb_segment_offset+s.segment_offset + bb.owner.set_segments(newsegs) + self._delete_bufferblock(bb.blockid) def commit_bufferblock(self, block, sync): """Initiate a background upload of a bufferblock. @@ -1021,37 +1057,6 @@ class ArvadosFile(object): return ''.join(data) - def _repack_writes(self, num_retries): - """Optimize buffer block by repacking segments in file sequence. - - When the client makes random writes, they appear in the buffer block in - the sequence they were written rather than the sequence they appear in - the file. This makes for inefficient, fragmented manifests. Attempt - to optimize by repacking writes in file sequence. - - """ - segs = self._segments - - # Collect the segments that reference the buffer block. - bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid] - - # Collect total data referenced by segments (could be smaller than - # bufferblock size if a portion of the file was written and - # then overwritten). - write_total = sum([s.range_size for s in bufferblock_segs]) - - if write_total < self._current_bblock.size() or len(bufferblock_segs) > 1: - # If there's more than one segment referencing this block, it is - # due to out-of-order writes and will produce a fragmented - # manifest, so try to optimize by re-packing into a new buffer. - contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries) - new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self) - for t in bufferblock_segs: - new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size]) - t.segment_offset = new_bb.size() - t.range_size - self._current_bblock.clear() - self._current_bblock = new_bb - @must_be_writable @synchronized def writeto(self, offset, data, num_retries): @@ -1082,7 +1087,7 @@ class ArvadosFile(object): self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: - self._repack_writes(num_retries) + self._current_bblock.repack_writes() if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False) self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) @@ -1109,7 +1114,7 @@ class ArvadosFile(object): if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED: if self._current_bblock.state() == _BufferBlock.WRITABLE: - self._repack_writes(num_retries) + self._current_bblock.repack_writes() if self._current_bblock.state() != _BufferBlock.DELETED: self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync) @@ -1160,7 +1165,7 @@ class ArvadosFile(object): normalize=False, only_committed=False): buf = "" filestream = [] - for segment in self.segments(): + for segment in self._segments: loc = segment.locator if self.parent._my_block_manager().is_bufferblock(loc): if only_committed: -- 2.30.2