+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
from __future__ import absolute_import
from __future__ import division
from future import standard_library
else:
loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
bufferblock.set_state(_BufferBlock.COMMITTED, loc)
-
except Exception as e:
bufferblock.set_state(_BufferBlock.ERROR, e)
finally:
return
new_bb = self._alloc_bufferblock()
+ new_bb.owner = []
files = []
while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
bb = small_blocks.pop(0)
+ new_bb.owner.append(bb.owner)
self._pending_write_size -= bb.size()
new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
files.append((bb, new_bb.write_pointer - bb.size()))
- self.commit_bufferblock(new_bb, sync=True)
+ self.commit_bufferblock(new_bb, sync=sync)
for bb, new_bb_segment_offset in files:
newsegs = bb.owner.segments()
for s in newsegs:
if s.locator == bb.blockid:
- s.locator = new_bb.locator()
+ s.locator = new_bb.blockid
s.segment_offset = new_bb_segment_offset+s.segment_offset
bb.owner.set_segments(newsegs)
self._delete_bufferblock(bb.blockid)
for k,v in items:
if v.state() != _BufferBlock.COMMITTED and v.owner:
- v.owner.flush(sync=False)
+ # Ignore blocks with a list of owners, as if they're not in COMMITTED
+ # state, they're already being committed asynchronously.
+ if isinstance(v.owner, ArvadosFile):
+ v.owner.flush(sync=False)
with self.lock:
if self._put_queue is not None:
# flush again with sync=True to remove committed bufferblocks from
# the segments.
if v.owner:
- v.owner.flush(sync=True)
+ if isinstance(v.owner, ArvadosFile):
+ v.owner.flush(sync=True)
+ elif isinstance(v.owner, list) and len(v.owner) > 0:
+ # This bufferblock is referenced by many files as a result
+ # of repacking small blocks, so don't delete it when flushing
+ # its owners, just do it after flushing them all.
+ for owner in v.owner:
+ owner.flush(sync=True)
+ self.delete_bufferblock(k)
def block_prefetch(self, locator):
"""Initiate a background download of a block.
to_delete.add(s.locator)
s.locator = bb.locator()
for s in to_delete:
- self.parent._my_block_manager().delete_bufferblock(s)
+ # Don't delete the bufferblock if it's owned by many files. It'll be
+ # deleted after all of its owners are flush()ed.
+ if self.parent._my_block_manager().get_bufferblock(s).owner is self:
+ self.parent._my_block_manager().delete_bufferblock(s)
self.parent.notify(MOD, self.parent, self.name, (self, self))