10315: Added back the BlockManager's put threads lazy start, but with a specific...
authorLucas Di Pentima <lucas@curoverse.com>
Fri, 28 Oct 2016 14:27:22 +0000 (11:27 -0300)
committerLucas Di Pentima <lucas@curoverse.com>
Fri, 28 Oct 2016 14:27:22 +0000 (11:27 -0300)
sdk/python/arvados/arvfile.py

index 5ae53f47eb6de48ceffddbe287d8716b69add4ed..2858a480ead0aa52454952ab4b1936d75817a3cf 100644 (file)
@@ -418,7 +418,7 @@ class _BlockManager(object):
         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
         self.copies = copies
         self._pending_write_size = 0
-        self.start_put_threads()
+        self.threads_lock = threading.Lock()
 
     @synchronized
     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
@@ -484,28 +484,28 @@ class _BlockManager(object):
                 if self._put_queue is not None:
                     self._put_queue.task_done()
 
-    @synchronized
     def start_put_threads(self):
-        if self._put_threads is None:
-            # Start uploader threads.
-
-            # If we don't limit the Queue size, the upload queue can quickly
-            # grow to take up gigabytes of RAM if the writing process is
-            # generating data more quickly than it can be send to the Keep
-            # servers.
-            #
-            # With two upload threads and a queue size of 2, this means up to 4
-            # blocks pending.  If they are full 64 MiB blocks, that means up to
-            # 256 MiB of internal buffering, which is the same size as the
-            # default download block cache in KeepClient.
-            self._put_queue = Queue.Queue(maxsize=2)
-
-            self._put_threads = []
-            for i in xrange(0, self.num_put_threads):
-                thread = threading.Thread(target=self._commit_bufferblock_worker)
-                self._put_threads.append(thread)
-                thread.daemon = True
-                thread.start()
+        with self.threads_lock:
+            if self._put_threads is None:
+                # Start uploader threads.
+
+                # If we don't limit the Queue size, the upload queue can quickly
+                # grow to take up gigabytes of RAM if the writing process is
+                # generating data more quickly than it can be send to the Keep
+                # servers.
+                #
+                # With two upload threads and a queue size of 2, this means up to 4
+                # blocks pending.  If they are full 64 MiB blocks, that means up to
+                # 256 MiB of internal buffering, which is the same size as the
+                # default download block cache in KeepClient.
+                self._put_queue = Queue.Queue(maxsize=2)
+
+                self._put_threads = []
+                for i in xrange(0, self.num_put_threads):
+                    thread = threading.Thread(target=self._commit_bufferblock_worker)
+                    self._put_threads.append(thread)
+                    thread.daemon = True
+                    thread.start()
 
     def _block_prefetch_worker(self):
         """The background downloader thread."""
@@ -627,6 +627,7 @@ class _BlockManager(object):
                 block.set_state(_BufferBlock.ERROR, e)
                 raise
         else:
+            self.start_put_threads()
             self._put_queue.put(block)
 
     @synchronized