10315: Enhanced performance when writing lots of small files without flushing by...
authorLucas Di Pentima <lucas@curoverse.com>
Tue, 25 Oct 2016 22:52:23 +0000 (19:52 -0300)
committerLucas Di Pentima <lucas@curoverse.com>
Tue, 25 Oct 2016 22:52:23 +0000 (19:52 -0300)
sdk/python/arvados/arvfile.py

index c394dab810715c2659b6f72f8f5f1e173d711ead..aac25956305db02996a9d7a54757195def56ef92 100644 (file)
@@ -417,6 +417,7 @@ class _BlockManager(object):
         self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
         self.copies = copies
+        self._pending_write_size = 0
 
     @synchronized
     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
@@ -555,24 +556,28 @@ class _BlockManager(object):
         self.stop_threads()
 
     @synchronized
-    def repack_small_blocks(self, force=False, sync=False):
+    def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
         """Packs small blocks together before uploading"""
-        # Search blocks ready for getting packed together before being committed to Keep.
-        # A WRITABLE block always has an owner.
-        # A WRITABLE block with its owner.closed() implies that it's
-        # size is <= KEEP_BLOCK_SIZE/2.
-        small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
-        if len(small_blocks) <= 1:
-            # Not enough small blocks for repacking
-            return
+        self._pending_write_size += closed_file_size
 
         # Check if there are enough small blocks for filling up one in full
-        pending_write_size = sum([b.size() for b in small_blocks])
-        if force or (pending_write_size >= config.KEEP_BLOCK_SIZE):
+        if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE):
+
+            # Search blocks ready for getting packed together before being committed to Keep.
+            # A WRITABLE block always has an owner.
+            # A WRITABLE block with its owner.closed() implies that it's
+            # size is <= KEEP_BLOCK_SIZE/2.
+            small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
+
+            if len(small_blocks) <= 1:
+                # Not enough small blocks for repacking
+                return
+
             new_bb = self._alloc_bufferblock()
             while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
                 bb = small_blocks.pop(0)
                 arvfile = bb.owner
+                self._pending_write_size -= bb.size()
                 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
                 arvfile.set_segments([Range(new_bb.blockid,
                                             0,
@@ -846,7 +851,7 @@ class ArvadosFile(object):
             self.flush()
         elif self.closed():
             # All writers closed and size is adequate for repacking
-            self.parent._my_block_manager().repack_small_blocks()
+            self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
 
     def closed(self):
         """