X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/c70fbca19235ebff6ab560ced462496cea7d2f72..2d216d80f51d74c5a9a9d5167fa26a43bbb03d1f:/sdk/python/arvados/arvfile.py diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py index 9ba43490e7..aa6bdad90b 100644 --- a/sdk/python/arvados/arvfile.py +++ b/sdk/python/arvados/arvfile.py @@ -1,3 +1,7 @@ +# Copyright (C) The Arvados Authors. All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + from __future__ import absolute_import from __future__ import division from future import standard_library @@ -557,7 +561,6 @@ class _BlockManager(object): else: loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies) bufferblock.set_state(_BufferBlock.COMMITTED, loc) - except Exception as e: bufferblock.set_state(_BufferBlock.ERROR, e) finally: @@ -673,20 +676,22 @@ class _BlockManager(object): return new_bb = self._alloc_bufferblock() + new_bb.owner = [] files = [] while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE: bb = small_blocks.pop(0) + new_bb.owner.append(bb.owner) self._pending_write_size -= bb.size() new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes()) files.append((bb, new_bb.write_pointer - bb.size())) - self.commit_bufferblock(new_bb, sync=True) + self.commit_bufferblock(new_bb, sync=sync) for bb, new_bb_segment_offset in files: newsegs = bb.owner.segments() for s in newsegs: if s.locator == bb.blockid: - s.locator = new_bb.locator() + s.locator = new_bb.blockid s.segment_offset = new_bb_segment_offset+s.segment_offset bb.owner.set_segments(newsegs) self._delete_bufferblock(bb.blockid) @@ -796,7 +801,10 @@ class _BlockManager(object): for k,v in items: if v.state() != _BufferBlock.COMMITTED and v.owner: - v.owner.flush(sync=False) + # Ignore blocks with a list of owners, as if they're not in COMMITTED + # state, they're already being committed asynchronously. + if isinstance(v.owner, ArvadosFile): + v.owner.flush(sync=False) with self.lock: if self._put_queue is not None: @@ -813,7 +821,15 @@ class _BlockManager(object): # flush again with sync=True to remove committed bufferblocks from # the segments. if v.owner: - v.owner.flush(sync=True) + if isinstance(v.owner, ArvadosFile): + v.owner.flush(sync=True) + elif isinstance(v.owner, list) and len(v.owner) > 0: + # This bufferblock is referenced by many files as a result + # of repacking small blocks, so don't delete it when flushing + # its owners, just do it after flushing them all. + for owner in v.owner: + owner.flush(sync=True) + self.delete_bufferblock(k) def block_prefetch(self, locator): """Initiate a background download of a block. @@ -1144,7 +1160,10 @@ class ArvadosFile(object): to_delete.add(s.locator) s.locator = bb.locator() for s in to_delete: - self.parent._my_block_manager().delete_bufferblock(s) + # Don't delete the bufferblock if it's owned by many files. It'll be + # deleted after all of its owners are flush()ed. + if self.parent._my_block_manager().get_bufferblock(s).owner is self: + self.parent._my_block_manager().delete_bufferblock(s) self.parent.notify(MOD, self.parent, self.name, (self, self))