Merge branch '20637-prefetch-threads' refs #20637
[arvados.git] / sdk / python / arvados / keep.py
index a2c8fd24948b9dfcd8198037700ac2ba3d55da47..4b00f7df8b912d95488d86879e3a29b83c067fec 100644 (file)
@@ -835,7 +835,7 @@ class KeepClient(object):
     def __init__(self, api_client=None, proxy=None,
                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
                  api_token=None, local_store=None, block_cache=None,
-                 num_retries=10, session=None):
+                 num_retries=10, session=None, num_prefetch_threads=None):
         """Initialize a new KeepClient.
 
         Arguments:
@@ -924,6 +924,9 @@ class KeepClient(object):
         self.misses_counter = Counter()
         self._storage_classes_unsupported_warning = False
         self._default_classes = []
+        self.num_prefetch_threads = num_prefetch_threads or 2
+        self._prefetch_queue = None
+        self._prefetch_threads = None
 
         if local_store:
             self.local_store = local_store
@@ -1391,6 +1394,51 @@ class KeepClient(object):
                 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
                     request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
 
+    def _block_prefetch_worker(self):
+        """The background downloader thread."""
+        while True:
+            try:
+                b = self._prefetch_queue.get()
+                if b is None:
+                    return
+                self.get(b, prefetch=True)
+            except Exception:
+                _logger.exception("Exception doing block prefetch")
+
+    def _start_prefetch_threads(self):
+        if self._prefetch_threads is None:
+            with self.lock:
+                if self._prefetch_threads is not None:
+                    return
+                self._prefetch_queue = queue.Queue()
+                self._prefetch_threads = []
+                for i in range(0, self.num_prefetch_threads):
+                    thread = threading.Thread(target=self._block_prefetch_worker)
+                    self._prefetch_threads.append(thread)
+                    thread.daemon = True
+                    thread.start()
+
+    def block_prefetch(self, locator):
+        """
+        This relies on the fact that KeepClient implements a block cache,
+        so repeated requests for the same block will not result in repeated
+        downloads (unless the block is evicted from the cache.)  This method
+        does not block.
+        """
+
+        self._start_prefetch_threads()
+        self._prefetch_queue.put(locator)
+
+    def stop_prefetch_threads(self):
+        with self.lock:
+            if self._prefetch_threads is not None:
+                for t in self._prefetch_threads:
+                    self._prefetch_queue.put(None)
+                for t in self._prefetch_threads:
+                    t.join()
+            self._prefetch_threads = None
+            self._prefetch_queue = None
+
     def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
         """A stub for put().