Merge branch '10221-cwl-pathmapping' closes #10221
[arvados.git] / sdk / python / arvados / arvfile.py
index c72225b433a07c0f183f6315d3ae8f86a7a7a86b..c394dab810715c2659b6f72f8f5f1e173d711ead 100644 (file)
@@ -11,6 +11,7 @@ import errno
 import re
 import logging
 import collections
+import uuid
 
 from .errors import KeepWriteError, AssertionError, ArgumentError
 from .keep import KeepLocator
@@ -431,8 +432,11 @@ class _BlockManager(object):
           ArvadosFile that owns this block
 
         """
+        return self._alloc_bufferblock(blockid, starting_capacity, owner)
+
+    def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
         if blockid is None:
-            blockid = "bufferblock%i" % len(self._bufferblocks)
+            blockid = "%s" % uuid.uuid4()
         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
         self._bufferblocks[bufferblock.blockid] = bufferblock
         return bufferblock
@@ -554,6 +558,9 @@ class _BlockManager(object):
     def repack_small_blocks(self, force=False, sync=False):
         """Packs small blocks together before uploading"""
         # Search blocks ready for getting packed together before being committed to Keep.
+        # A WRITABLE block always has an owner.
+        # A WRITABLE block with its owner.closed() implies that it's
+        # size is <= KEEP_BLOCK_SIZE/2.
         small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
         if len(small_blocks) <= 1:
             # Not enough small blocks for repacking
@@ -562,8 +569,7 @@ class _BlockManager(object):
         # Check if there are enough small blocks for filling up one in full
         pending_write_size = sum([b.size() for b in small_blocks])
         if force or (pending_write_size >= config.KEEP_BLOCK_SIZE):
-            new_bb = _BufferBlock("bufferblock%i" % len(self._bufferblocks), 2**14, None)
-            self._bufferblocks[new_bb.blockid] = new_bb
+            new_bb = self._alloc_bufferblock()
             while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
                 bb = small_blocks.pop(0)
                 arvfile = bb.owner
@@ -572,8 +578,7 @@ class _BlockManager(object):
                                             0,
                                             bb.size(),
                                             new_bb.write_pointer - bb.size())])
-                bb.clear()
-                del self._bufferblocks[bb.blockid]
+                self._delete_bufferblock(bb.blockid)
             self.commit_bufferblock(new_bb, sync=sync)
 
     def commit_bufferblock(self, block, sync):
@@ -625,6 +630,9 @@ class _BlockManager(object):
 
     @synchronized
     def delete_bufferblock(self, locator):
+        self._delete_bufferblock(locator)
+
+    def _delete_bufferblock(self, locator):
         bb = self._bufferblocks[locator]
         bb.clear()
         del self._bufferblocks[locator]
@@ -826,14 +834,14 @@ class ArvadosFile(object):
             self._writers.add(writer)
 
     @synchronized
-    def remove_writer(self, writer):
+    def remove_writer(self, writer, flush):
         """
         Called from ArvadosFileWriter.close(). Remove a writer reference from the list
         and do some block maintenance tasks.
         """
         self._writers.remove(writer)
 
-        if self.size() > config.KEEP_BLOCK_SIZE / 2:
+        if flush or self.size() > config.KEEP_BLOCK_SIZE / 2:
             # File writer closed, not small enough for repacking
             self.flush()
         elif self.closed():
@@ -1158,7 +1166,7 @@ class ArvadosFileWriter(ArvadosFileReader):
     def flush(self):
         self.arvadosfile.flush()
 
-    def close(self):
+    def close(self, flush=True):
         if not self.closed:
-            self.arvadosfile.remove_writer(self)
+            self.arvadosfile.remove_writer(self, flush)
             super(ArvadosFileWriter, self).close()