if v.state() != _BufferBlock.COMMITTED and v.owner:
# Ignore blocks with a list of owners, as if they're not in COMMITTED
# state, they're already being committed asynchronously.
- if not isinstance(v.owner, list):
+ if isinstance(v.owner, ArvadosFile):
v.owner.flush(sync=False)
with self.lock:
# of repacking small blocks, so don't delete it when flushing
# its owners, just do it after flushing them all.
for owner in v.owner:
- owner.flush(sync=True, delete_bufferblock=False)
+ owner.flush(sync=True)
self.delete_bufferblock(k)
def block_prefetch(self, locator):
return len(data)
@synchronized
- def flush(self, sync=True, num_retries=0, delete_bufferblock=True):
+ def flush(self, sync=True, num_retries=0):
"""Flush the current bufferblock to Keep.
:sync:
self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
to_delete.add(s.locator)
s.locator = bb.locator()
- if delete_bufferblock:
- for s in to_delete:
+ for s in to_delete:
+ # Don't delete the bufferblock if it's owned by many files. It'll be
+ # deleted after all of its owners are flush()ed.
+ if self.parent._my_block_manager().get_bufferblock(s).owner is self:
self.parent._my_block_manager().delete_bufferblock(s)
self.parent.notify(MOD, self.parent, self.name, (self, self))
mockkeep = mock.MagicMock()
with arvados.arvfile._BlockManager(mockkeep) as blockmanager:
bufferblock = blockmanager.alloc_bufferblock()
- bufferblock.owner = mock.MagicMock()
+ bufferblock.owner = mock.MagicMock(spec=arvados.arvfile.ArvadosFile)
def flush(sync=None):
blockmanager.commit_bufferblock(bufferblock, sync)
bufferblock.owner.flush.side_effect = flush
mockkeep.put.side_effect = arvados.errors.KeepWriteError("fail")
with arvados.arvfile._BlockManager(mockkeep) as blockmanager:
bufferblock = blockmanager.alloc_bufferblock()
- bufferblock.owner = mock.MagicMock()
+ bufferblock.owner = mock.MagicMock(spec=arvados.arvfile.ArvadosFile)
def flush(sync=None):
blockmanager.commit_bufferblock(bufferblock, sync)
bufferblock.owner.flush.side_effect = flush