Merge branch '8784-dir-listings'
[arvados.git] / sdk / python / arvados / arvfile.py
index 2fc9c73afe031543ab7afb67edcd2e7a5c4c7d6f..aa6bdad90bea0551f7043fbdd0889f6dea2ff6a8 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 from __future__ import absolute_import
 from __future__ import division
 from future import standard_library
@@ -557,7 +561,6 @@ class _BlockManager(object):
                 else:
                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
-
             except Exception as e:
                 bufferblock.set_state(_BufferBlock.ERROR, e)
             finally:
@@ -673,9 +676,11 @@ class _BlockManager(object):
             return
 
         new_bb = self._alloc_bufferblock()
+        new_bb.owner = []
         files = []
         while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
             bb = small_blocks.pop(0)
+            new_bb.owner.append(bb.owner)
             self._pending_write_size -= bb.size()
             new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
             files.append((bb, new_bb.write_pointer - bb.size()))
@@ -686,7 +691,7 @@ class _BlockManager(object):
             newsegs = bb.owner.segments()
             for s in newsegs:
                 if s.locator == bb.blockid:
-                    s.locator = new_bb.locator()
+                    s.locator = new_bb.blockid
                     s.segment_offset = new_bb_segment_offset+s.segment_offset
             bb.owner.set_segments(newsegs)
             self._delete_bufferblock(bb.blockid)
@@ -796,7 +801,10 @@ class _BlockManager(object):
 
         for k,v in items:
             if v.state() != _BufferBlock.COMMITTED and v.owner:
-                v.owner.flush(sync=False)
+                # Ignore blocks with a list of owners, as if they're not in COMMITTED
+                # state, they're already being committed asynchronously.
+                if isinstance(v.owner, ArvadosFile):
+                    v.owner.flush(sync=False)
 
         with self.lock:
             if self._put_queue is not None:
@@ -813,7 +821,15 @@ class _BlockManager(object):
             # flush again with sync=True to remove committed bufferblocks from
             # the segments.
             if v.owner:
-                v.owner.flush(sync=True)
+                if isinstance(v.owner, ArvadosFile):
+                    v.owner.flush(sync=True)
+                elif isinstance(v.owner, list) and len(v.owner) > 0:
+                    # This bufferblock is referenced by many files as a result
+                    # of repacking small blocks, so don't delete it when flushing
+                    # its owners, just do it after flushing them all.
+                    for owner in v.owner:
+                        owner.flush(sync=True)
+                    self.delete_bufferblock(k)
 
     def block_prefetch(self, locator):
         """Initiate a background download of a block.
@@ -1144,7 +1160,10 @@ class ArvadosFile(object):
                     to_delete.add(s.locator)
                     s.locator = bb.locator()
             for s in to_delete:
-               self.parent._my_block_manager().delete_bufferblock(s)
+                # Don't delete the bufferblock if it's owned by many files. It'll be
+                # deleted after all of its owners are flush()ed.
+                if self.parent._my_block_manager().get_bufferblock(s).owner is self:
+                    self.parent._my_block_manager().delete_bufferblock(s)
 
         self.parent.notify(MOD, self.parent, self.name, (self, self))