10383: Merge branch 'master' into 10383-arv-put-incremental-upload
[arvados.git] / sdk / python / arvados / arvfile.py
index f2f7df2dce2121b0c0c0e4b562f7c7963f1fc0ab..4cc2591ebb25034d0145de40c11f6638e3973864 100644 (file)
@@ -10,6 +10,8 @@ import copy
 import errno
 import re
 import logging
+import collections
+import uuid
 
 from .errors import KeepWriteError, AssertionError, ArgumentError
 from .keep import KeepLocator
@@ -405,7 +407,7 @@ class _BlockManager(object):
     def __init__(self, keep, copies=None):
         """keep: KeepClient object to use"""
         self._keep = keep
-        self._bufferblocks = {}
+        self._bufferblocks = collections.OrderedDict()
         self._put_queue = None
         self._put_threads = None
         self._prefetch_queue = None
@@ -415,6 +417,8 @@ class _BlockManager(object):
         self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
         self.copies = copies
+        self._pending_write_size = 0
+        self.threads_lock = threading.Lock()
 
     @synchronized
     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
@@ -430,8 +434,11 @@ class _BlockManager(object):
           ArvadosFile that owns this block
 
         """
+        return self._alloc_bufferblock(blockid, starting_capacity, owner)
+
+    def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
         if blockid is None:
-            blockid = "bufferblock%i" % len(self._bufferblocks)
+            blockid = "%s" % uuid.uuid4()
         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
         self._bufferblocks[bufferblock.blockid] = bufferblock
         return bufferblock
@@ -477,28 +484,28 @@ class _BlockManager(object):
                 if self._put_queue is not None:
                     self._put_queue.task_done()
 
-    @synchronized
     def start_put_threads(self):
-        if self._put_threads is None:
-            # Start uploader threads.
-
-            # If we don't limit the Queue size, the upload queue can quickly
-            # grow to take up gigabytes of RAM if the writing process is
-            # generating data more quickly than it can be send to the Keep
-            # servers.
-            #
-            # With two upload threads and a queue size of 2, this means up to 4
-            # blocks pending.  If they are full 64 MiB blocks, that means up to
-            # 256 MiB of internal buffering, which is the same size as the
-            # default download block cache in KeepClient.
-            self._put_queue = Queue.Queue(maxsize=2)
-
-            self._put_threads = []
-            for i in xrange(0, self.num_put_threads):
-                thread = threading.Thread(target=self._commit_bufferblock_worker)
-                self._put_threads.append(thread)
-                thread.daemon = True
-                thread.start()
+        with self.threads_lock:
+            if self._put_threads is None:
+                # Start uploader threads.
+
+                # If we don't limit the Queue size, the upload queue can quickly
+                # grow to take up gigabytes of RAM if the writing process is
+                # generating data more quickly than it can be send to the Keep
+                # servers.
+                #
+                # With two upload threads and a queue size of 2, this means up to 4
+                # blocks pending.  If they are full 64 MiB blocks, that means up to
+                # 256 MiB of internal buffering, which is the same size as the
+                # default download block cache in KeepClient.
+                self._put_queue = Queue.Queue(maxsize=2)
+
+                self._put_threads = []
+                for i in xrange(0, self.num_put_threads):
+                    thread = threading.Thread(target=self._commit_bufferblock_worker)
+                    self._put_threads.append(thread)
+                    thread.daemon = True
+                    thread.start()
 
     def _block_prefetch_worker(self):
         """The background downloader thread."""
@@ -509,7 +516,7 @@ class _BlockManager(object):
                     return
                 self._keep.get(b)
             except Exception:
-                pass
+                _logger.exception("Exception doing block prefetch")
 
     @synchronized
     def start_get_threads(self):
@@ -549,6 +556,43 @@ class _BlockManager(object):
     def __exit__(self, exc_type, exc_value, traceback):
         self.stop_threads()
 
+    @synchronized
+    def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
+        """Packs small blocks together before uploading"""
+        self._pending_write_size += closed_file_size
+
+        # Check if there are enough small blocks for filling up one in full
+        if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE):
+
+            # Search blocks ready for getting packed together before being committed to Keep.
+            # A WRITABLE block always has an owner.
+            # A WRITABLE block with its owner.closed() implies that it's
+            # size is <= KEEP_BLOCK_SIZE/2.
+            small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
+
+            if len(small_blocks) <= 1:
+                # Not enough small blocks for repacking
+                return
+
+            # Update the pending write size count with its true value, just in case
+            # some small file was opened, written and closed several times.
+            self._pending_write_size = sum([b.size() for b in small_blocks])
+            if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
+                return
+
+            new_bb = self._alloc_bufferblock()
+            while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
+                bb = small_blocks.pop(0)
+                arvfile = bb.owner
+                self._pending_write_size -= bb.size()
+                new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
+                arvfile.set_segments([Range(new_bb.blockid,
+                                            0,
+                                            bb.size(),
+                                            new_bb.write_pointer - bb.size())])
+                self._delete_bufferblock(bb.blockid)
+            self.commit_bufferblock(new_bb, sync=sync)
+
     def commit_bufferblock(self, block, sync):
         """Initiate a background upload of a bufferblock.
 
@@ -562,7 +606,6 @@ class _BlockManager(object):
           which case it will wait on an upload queue slot.
 
         """
-
         try:
             # Mark the block as PENDING so to disallow any more appends.
             block.set_state(_BufferBlock.PENDING)
@@ -599,6 +642,9 @@ class _BlockManager(object):
 
     @synchronized
     def delete_bufferblock(self, locator):
+        self._delete_bufferblock(locator)
+
+    def _delete_bufferblock(self, locator):
         bb = self._bufferblocks[locator]
         bb.clear()
         del self._bufferblocks[locator]
@@ -629,11 +675,13 @@ class _BlockManager(object):
         are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
 
         """
+        self.repack_small_blocks(force=True, sync=True)
+
         with self.lock:
             items = self._bufferblocks.items()
 
         for k,v in items:
-            if v.state() != _BufferBlock.COMMITTED:
+            if v.state() != _BufferBlock.COMMITTED and v.owner:
                 v.owner.flush(sync=False)
 
         with self.lock:
@@ -700,6 +748,7 @@ class ArvadosFile(object):
         """
         self.parent = parent
         self.name = name
+        self._writers = set()
         self._committed = False
         self._segments = []
         self.lock = parent.root_collection().lock
@@ -710,6 +759,14 @@ class ArvadosFile(object):
     def writable(self):
         return self.parent.writable()
 
+    @synchronized
+    def permission_expired(self, as_of_dt=None):
+        """Returns True if any of the segment's locators is expired"""
+        for r in self._segments:
+            if KeepLocator(r.locator).permission_expired(as_of_dt):
+                return True
+        return False
+
     @synchronized
     def segments(self):
         return copy.copy(self._segments)
@@ -776,9 +833,13 @@ class ArvadosFile(object):
     def __ne__(self, other):
         return not self.__eq__(other)
 
+    @synchronized
+    def set_segments(self, segs):
+        self._segments = segs
+
     @synchronized
     def set_committed(self):
-        """Set committed flag to False"""
+        """Set committed flag to True"""
         self._committed = True
 
     @synchronized
@@ -786,6 +847,34 @@ class ArvadosFile(object):
         """Get whether this is committed or not."""
         return self._committed
 
+    @synchronized
+    def add_writer(self, writer):
+        """Add an ArvadosFileWriter reference to the list of writers"""
+        if isinstance(writer, ArvadosFileWriter):
+            self._writers.add(writer)
+
+    @synchronized
+    def remove_writer(self, writer, flush):
+        """
+        Called from ArvadosFileWriter.close(). Remove a writer reference from the list
+        and do some block maintenance tasks.
+        """
+        self._writers.remove(writer)
+
+        if flush or self.size() > config.KEEP_BLOCK_SIZE / 2:
+            # File writer closed, not small enough for repacking
+            self.flush()
+        elif self.closed():
+            # All writers closed and size is adequate for repacking
+            self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
+
+    def closed(self):
+        """
+        Get whether this is closed or not. When the writers list is empty, the file
+        is supposed to be closed.
+        """
+        return len(self._writers) == 0
+
     @must_be_writable
     @synchronized
     def truncate(self, size):
@@ -1067,6 +1156,7 @@ class ArvadosFileWriter(ArvadosFileReader):
     def __init__(self, arvadosfile, mode, num_retries=None):
         super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
         self.mode = mode
+        self.arvadosfile.add_writer(self)
 
     @_FileLikeObjectBase._before_close
     @retry_method
@@ -1096,7 +1186,7 @@ class ArvadosFileWriter(ArvadosFileReader):
     def flush(self):
         self.arvadosfile.flush()
 
-    def close(self):
+    def close(self, flush=True):
         if not self.closed:
-            self.flush()
+            self.arvadosfile.remove_writer(self, flush)
             super(ArvadosFileWriter, self).close()