Merge branch '10221-cwl-pathmapping' closes #10221
[arvados.git] / sdk / python / arvados / arvfile.py
index 3d5e9215eff4ecd80f82c94b7f78677f1e211c67..c394dab810715c2659b6f72f8f5f1e173d711ead 100644 (file)
@@ -10,6 +10,8 @@ import copy
 import errno
 import re
 import logging
+import collections
+import uuid
 
 from .errors import KeepWriteError, AssertionError, ArgumentError
 from .keep import KeepLocator
@@ -405,7 +407,7 @@ class _BlockManager(object):
     def __init__(self, keep, copies=None):
         """keep: KeepClient object to use"""
         self._keep = keep
-        self._bufferblocks = {}
+        self._bufferblocks = collections.OrderedDict()
         self._put_queue = None
         self._put_threads = None
         self._prefetch_queue = None
@@ -430,8 +432,11 @@ class _BlockManager(object):
           ArvadosFile that owns this block
 
         """
+        return self._alloc_bufferblock(blockid, starting_capacity, owner)
+
+    def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
         if blockid is None:
-            blockid = "bufferblock%i" % len(self._bufferblocks)
+            blockid = "%s" % uuid.uuid4()
         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
         self._bufferblocks[bufferblock.blockid] = bufferblock
         return bufferblock
@@ -553,6 +558,9 @@ class _BlockManager(object):
     def repack_small_blocks(self, force=False, sync=False):
         """Packs small blocks together before uploading"""
         # Search blocks ready for getting packed together before being committed to Keep.
+        # A WRITABLE block always has an owner.
+        # A WRITABLE block with its owner.closed() implies that it's
+        # size is <= KEEP_BLOCK_SIZE/2.
         small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
         if len(small_blocks) <= 1:
             # Not enough small blocks for repacking
@@ -561,17 +569,16 @@ class _BlockManager(object):
         # Check if there are enough small blocks for filling up one in full
         pending_write_size = sum([b.size() for b in small_blocks])
         if force or (pending_write_size >= config.KEEP_BLOCK_SIZE):
-            new_bb = _BufferBlock("bufferblock%i" % len(self._bufferblocks), 2**14, None)
-            self._bufferblocks[new_bb.blockid] = new_bb
-            size = 0
-            while len(small_blocks) > 0 and (size + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
+            new_bb = self._alloc_bufferblock()
+            while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
                 bb = small_blocks.pop(0)
-                size += bb.size()
                 arvfile = bb.owner
                 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
-                arvfile.set_segments([Range(new_bb.blockid, 0, bb.size(), size-bb.size())])
-                bb.clear()
-                del self._bufferblocks[bb.blockid]
+                arvfile.set_segments([Range(new_bb.blockid,
+                                            0,
+                                            bb.size(),
+                                            new_bb.write_pointer - bb.size())])
+                self._delete_bufferblock(bb.blockid)
             self.commit_bufferblock(new_bb, sync=sync)
 
     def commit_bufferblock(self, block, sync):
@@ -623,6 +630,9 @@ class _BlockManager(object):
 
     @synchronized
     def delete_bufferblock(self, locator):
+        self._delete_bufferblock(locator)
+
+    def _delete_bufferblock(self, locator):
         bb = self._bufferblocks[locator]
         bb.clear()
         del self._bufferblocks[locator]
@@ -824,14 +834,14 @@ class ArvadosFile(object):
             self._writers.add(writer)
 
     @synchronized
-    def remove_writer(self, writer):
+    def remove_writer(self, writer, flush):
         """
         Called from ArvadosFileWriter.close(). Remove a writer reference from the list
         and do some block maintenance tasks.
         """
         self._writers.remove(writer)
 
-        if self.size() > config.KEEP_BLOCK_SIZE / 2:
+        if flush or self.size() > config.KEEP_BLOCK_SIZE / 2:
             # File writer closed, not small enough for repacking
             self.flush()
         elif self.closed():
@@ -1156,7 +1166,7 @@ class ArvadosFileWriter(ArvadosFileReader):
     def flush(self):
         self.arvadosfile.flush()
 
-    def close(self):
+    def close(self, flush=True):
         if not self.closed:
-            self.arvadosfile.remove_writer(self)
+            self.arvadosfile.remove_writer(self, flush)
             super(ArvadosFileWriter, self).close()