9701: Several corrections/enhancements:
authorLucas Di Pentima <lucas@curoverse.com>
Thu, 6 Oct 2016 20:23:09 +0000 (17:23 -0300)
committerLucas Di Pentima <lucas@curoverse.com>
Thu, 6 Oct 2016 20:23:09 +0000 (17:23 -0300)
* Added a 'sync' parameter on repack_small_blocks() so that blocks commits can
  be done in either way depending from where it's called.
* Allow packing small buffer blocks up to a full block on Keep.
* Replaced ArvadosFile's _closed flag with a list of its ArvadosFileWriter objects,
  so that it's used as a reference counter.
* Moved ArvadosFile flush behaviour from ArvadosFileWriter.close() method to
  ArvadosFile.remove_writer() so that it can decide whether it should commit the
  buffer block or repack it with others.

sdk/python/arvados/arvfile.py

index ec5e4d5f0473474bf8cc36ad197cfcccb9a82bfa..a043bee75178957301faa5e8a9b12cb38ca57c9d 100644 (file)
@@ -549,35 +549,29 @@ class _BlockManager(object):
     def __exit__(self, exc_type, exc_value, traceback):
         self.stop_threads()
 
-    def repack_small_blocks(self, force=False):
+    def repack_small_blocks(self, force=False, sync=False):
         """Packs small blocks together before uploading"""
-        # Candidate bblocks -- This could be sorted in some way to prioritize some
-        # kind of bblocks
+        # Search blocks ready for getting packed together before being committed to Keep
         small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner and b.owner.closed() and b.owner.size() <= (config.KEEP_BLOCK_SIZE / 2)]
-        if len(small_blocks) == 0:
+        if len(small_blocks) <= 1:
+            # Not enough small blocks for repacking
             return
 
-        # Check if there's enough small blocks for combining and uploading
+        # Check if there are enough small blocks for filling up one in full
         pending_write_size = sum([b.size() for b in small_blocks])
-        if force or (pending_write_size > (config.KEEP_BLOCK_SIZE / 2)):
-            if len(small_blocks) == 1:
-                # No small blocks for repacking, leave this one alone
-                # so it's committed before exiting.
-                return
+        if force or (pending_write_size >= config.KEEP_BLOCK_SIZE):
             new_bb = _BufferBlock("bufferblock%i" % len(self._bufferblocks), 2**14, None)
             self._bufferblocks[new_bb.blockid] = new_bb
             size = 0
-            while len(small_blocks) > 0 and size <= (config.KEEP_BLOCK_SIZE / 2):
+            while len(small_blocks) > 0 and (size + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
                 bb = small_blocks.pop(0)
                 size += bb.size()
-                new_segs = []
+                arvfile = bb.owner
                 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
-                # FIXME: We shoudn't be accessing _segments directly
-                bb.owner._segments = [Range(new_bb.blockid, 0, bb.size(), size-bb.size())]
+                arvfile.set_segments([Range(new_bb.blockid, 0, bb.size(), size-bb.size())])
                 bb.clear()
                 del self._bufferblocks[bb.blockid]
-            # new_bb's size greater half a keep block, let's commit it
-            self.commit_bufferblock(new_bb, sync=True)
+            self.commit_bufferblock(new_bb, sync=sync)
 
     def commit_bufferblock(self, block, sync):
         """Initiate a background upload of a bufferblock.
@@ -659,7 +653,7 @@ class _BlockManager(object):
 
         """
         with self.lock:
-            self.repack_small_blocks(force=True)
+            self.repack_small_blocks(force=True, sync=True)
             items = self._bufferblocks.items()
 
         for k,v in items:
@@ -730,7 +724,7 @@ class ArvadosFile(object):
         """
         self.parent = parent
         self.name = name
-        self._closed = False
+        self._writers = set()
         self._committed = False
         self._segments = []
         self.lock = parent.root_collection().lock
@@ -807,9 +801,13 @@ class ArvadosFile(object):
     def __ne__(self, other):
         return not self.__eq__(other)
 
+    @synchronized
+    def set_segments(self, segs):
+        self._segments = segs
+
     @synchronized
     def set_committed(self):
-        """Set committed flag to False"""
+        """Set committed flag to True"""
         self._committed = True
 
     @synchronized
@@ -818,15 +816,32 @@ class ArvadosFile(object):
         return self._committed
 
     @synchronized
-    def set_closed(self):
-        """Set current block as pending and closed flag to False"""
-        self._closed = True
-        self.parent._my_block_manager().repack_small_blocks()
+    def add_writer(self, writer):
+        """Add an ArvadosFileWriter reference to the list of writers"""
+        if isinstance(writer, ArvadosFileWriter):
+            self._writers.add(writer)
 
     @synchronized
+    def remove_writer(self, writer):
+        """
+        Called from ArvadosFileWriter.close(). Remove a writer reference from the list
+        and do some block maintenance tasks.
+        """
+        self._writers.remove(writer)
+
+        if self.size() > config.KEEP_BLOCK_SIZE / 2:
+            # File writer closed, not small enough for repacking
+            self.flush()
+        elif self.closed():
+            # All writers closed and size is adequate for repacking
+            self.parent._my_block_manager().repack_small_blocks()
+
     def closed(self):
-        """Get whether this is closed or not."""
-        return self._closed
+        """
+        Get whether this is closed or not. When the writers list is empty, the file
+        is supposed to be closed.
+        """
+        return len(self._writers) == 0
 
     @must_be_writable
     @synchronized
@@ -1109,6 +1124,7 @@ class ArvadosFileWriter(ArvadosFileReader):
     def __init__(self, arvadosfile, mode, num_retries=None):
         super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
         self.mode = mode
+        self.arvadosfile.add_writer(self)
 
     @_FileLikeObjectBase._before_close
     @retry_method
@@ -1138,9 +1154,7 @@ class ArvadosFileWriter(ArvadosFileReader):
     def flush(self):
         self.arvadosfile.flush()
 
-    def close(self, flush=True):
+    def close(self):
         if not self.closed:
-            if flush:
-                self.flush()
-            self.arvadosfile.set_closed()
+            self.arvadosfile.remove_writer(self)
             super(ArvadosFileWriter, self).close()