X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/b04638275cff9b393e1bc04136d44f361b999cf8..f2f8340b18430738a9527f05e707dd8f03508cc0:/sdk/python/arvados/arvfile.py?ds=sidebyside diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py index aad3ce12a5..a2ec76a076 100644 --- a/sdk/python/arvados/arvfile.py +++ b/sdk/python/arvados/arvfile.py @@ -16,7 +16,7 @@ import uuid from .errors import KeepWriteError, AssertionError, ArgumentError from .keep import KeepLocator from ._normalize_stream import normalize_stream -from ._ranges import locators_and_ranges, replace_range, Range +from ._ranges import locators_and_ranges, replace_range, Range, LocatorAndRange from .retry import retry_method MOD = "mod" @@ -289,6 +289,7 @@ class _BufferBlock(object): PENDING = 1 COMMITTED = 2 ERROR = 3 + DELETED = 4 def __init__(self, blockid, starting_capacity, owner): """ @@ -383,10 +384,54 @@ class _BufferBlock(object): @synchronized def clear(self): + self._state = _BufferBlock.DELETED self.owner = None self.buffer_block = None self.buffer_view = None + @synchronized + def repack_writes(self): + """Optimize buffer block by repacking segments in file sequence. + + When the client makes random writes, they appear in the buffer block in + the sequence they were written rather than the sequence they appear in + the file. This makes for inefficient, fragmented manifests. Attempt + to optimize by repacking writes in file sequence. + + """ + if self._state != _BufferBlock.WRITABLE: + raise AssertionError("Cannot repack non-writable block") + + segs = self.owner.segments() + + # Collect the segments that reference the buffer block. + bufferblock_segs = [s for s in segs if s.locator == self.blockid] + + # Collect total data referenced by segments (could be smaller than + # bufferblock size if a portion of the file was written and + # then overwritten). + write_total = sum([s.range_size for s in bufferblock_segs]) + + if write_total < self.size() or len(bufferblock_segs) > 1: + # If there's more than one segment referencing this block, it is + # due to out-of-order writes and will produce a fragmented + # manifest, so try to optimize by re-packing into a new buffer. + contents = self.buffer_view[0:self.write_pointer].tobytes() + new_bb = _BufferBlock(None, write_total, None) + for t in bufferblock_segs: + new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size]) + t.segment_offset = new_bb.size() - t.range_size + + self.buffer_block = new_bb.buffer_block + self.buffer_view = new_bb.buffer_view + self.write_pointer = new_bb.write_pointer + self._locator = None + new_bb.clear() + self.owner.set_segments(segs) + + def __repr__(self): + return "" % (self.blockid) + class NoopLock(object): def __enter__(self): @@ -460,7 +505,7 @@ class _BlockManager(object): def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None): if blockid is None: - blockid = "%s" % uuid.uuid4() + blockid = str(uuid.uuid4()) bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner) self._bufferblocks[bufferblock.blockid] = bufferblock return bufferblock @@ -476,7 +521,7 @@ class _BlockManager(object): ArvadosFile that owns the new block """ - new_blockid = "bufferblock%i" % len(self._bufferblocks) + new_blockid = str(uuid.uuid4()) bufferblock = block.clone(new_blockid, owner) self._bufferblocks[bufferblock.blockid] = bufferblock return bufferblock @@ -581,43 +626,56 @@ class _BlockManager(object): @synchronized def repack_small_blocks(self, force=False, sync=False, closed_file_size=0): """Packs small blocks together before uploading""" + self._pending_write_size += closed_file_size # Check if there are enough small blocks for filling up one in full - if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE): + if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)): + return - # Search blocks ready for getting packed together before being committed to Keep. - # A WRITABLE block always has an owner. - # A WRITABLE block with its owner.closed() implies that it's - # size is <= KEEP_BLOCK_SIZE/2. - try: - small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()] - except AttributeError: - # Writable blocks without owner shouldn't exist. - raise UnownedBlockError() + # Search blocks ready for getting packed together before being committed to Keep. + # A WRITABLE block always has an owner. + # A WRITABLE block with its owner.closed() implies that it's + # size is <= KEEP_BLOCK_SIZE/2. + try: + small_blocks = [b for b in self._bufferblocks.values() + if b.state() == _BufferBlock.WRITABLE and b.owner.closed()] + except AttributeError: + # Writable blocks without owner shouldn't exist. + raise UnownedBlockError() + + if len(small_blocks) <= 1: + # Not enough small blocks for repacking + return - if len(small_blocks) <= 1: - # Not enough small blocks for repacking - return + for bb in small_blocks: + bb.repack_writes() - # Update the pending write size count with its true value, just in case - # some small file was opened, written and closed several times. - self._pending_write_size = sum([b.size() for b in small_blocks]) - if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force: - return + # Update the pending write size count with its true value, just in case + # some small file was opened, written and closed several times. + self._pending_write_size = sum([b.size() for b in small_blocks]) - new_bb = self._alloc_bufferblock() - while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE: - bb = small_blocks.pop(0) - arvfile = bb.owner - self._pending_write_size -= bb.size() - new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes()) - arvfile.set_segments([Range(new_bb.blockid, - 0, - bb.size(), - new_bb.write_pointer - bb.size())]) - self._delete_bufferblock(bb.blockid) - self.commit_bufferblock(new_bb, sync=sync) + if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force: + return + + new_bb = self._alloc_bufferblock() + files = [] + while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE: + bb = small_blocks.pop(0) + self._pending_write_size -= bb.size() + new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes()) + files.append((bb, new_bb.write_pointer - bb.size())) + + self.commit_bufferblock(new_bb, sync=sync) + + for bb, new_bb_segment_offset in files: + newsegs = bb.owner.segments() + for s in newsegs: + if s.locator == bb.blockid: + s.locator = new_bb.locator() + s.segment_offset = new_bb_segment_offset+s.segment_offset + bb.owner.set_segments(newsegs) + self._delete_bufferblock(bb.blockid) def commit_bufferblock(self, block, sync): """Initiate a background upload of a bufferblock. @@ -999,37 +1057,6 @@ class ArvadosFile(object): return ''.join(data) - def _repack_writes(self, num_retries): - """Optimize buffer block by repacking segments in file sequence. - - When the client makes random writes, they appear in the buffer block in - the sequence they were written rather than the sequence they appear in - the file. This makes for inefficient, fragmented manifests. Attempt - to optimize by repacking writes in file sequence. - - """ - segs = self._segments - - # Collect the segments that reference the buffer block. - bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid] - - # Collect total data referenced by segments (could be smaller than - # bufferblock size if a portion of the file was written and - # then overwritten). - write_total = sum([s.range_size for s in bufferblock_segs]) - - if write_total < self._current_bblock.size() or len(bufferblock_segs) > 1: - # If there's more than one segment referencing this block, it is - # due to out-of-order writes and will produce a fragmented - # manifest, so try to optimize by re-packing into a new buffer. - contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries) - new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self) - for t in bufferblock_segs: - new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size]) - t.segment_offset = new_bb.size() - t.range_size - - self._current_bblock = new_bb - @must_be_writable @synchronized def writeto(self, offset, data, num_retries): @@ -1060,7 +1087,7 @@ class ArvadosFile(object): self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: - self._repack_writes(num_retries) + self._current_bblock.repack_writes() if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False) self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) @@ -1087,8 +1114,9 @@ class ArvadosFile(object): if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED: if self._current_bblock.state() == _BufferBlock.WRITABLE: - self._repack_writes(num_retries) - self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync) + self._current_bblock.repack_writes() + if self._current_bblock.state() != _BufferBlock.DELETED: + self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync) if sync: to_delete = set() @@ -1137,17 +1165,17 @@ class ArvadosFile(object): normalize=False, only_committed=False): buf = "" filestream = [] - for segment in self.segments: + for segment in self._segments: loc = segment.locator if self.parent._my_block_manager().is_bufferblock(loc): if only_committed: continue - loc = self._bufferblocks[loc].calculate_locator() + loc = self.parent._my_block_manager().get_bufferblock(loc).locator() if portable_locators: loc = KeepLocator(loc).stripped() - filestream.append(LocatorAndRange(loc, locator_block_size(loc), + filestream.append(LocatorAndRange(loc, KeepLocator(loc).size, segment.segment_offset, segment.range_size)) - buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream})) + buf += ' '.join(normalize_stream(stream_name, {self.name: filestream})) buf += "\n" return buf