+ def _block_prefetch_worker(self):
+ """The background downloader thread."""
+ while True:
+ try:
+ b = self._prefetch_queue.get()
+ if b is None:
+ return
+ self.get(b, prefetch=True)
+ except Exception:
+ _logger.exception("Exception doing block prefetch")
+
+ def _start_prefetch_threads(self):
+ if self._prefetch_threads is None:
+ with self.lock:
+ if self._prefetch_threads is not None:
+ return
+ self._prefetch_queue = queue.Queue()
+ self._prefetch_threads = []
+ for i in range(0, self.num_prefetch_threads):
+ thread = threading.Thread(target=self._block_prefetch_worker)
+ self._prefetch_threads.append(thread)
+ thread.daemon = True
+ thread.start()
+
+ def block_prefetch(self, locator):
+ """
+ This relies on the fact that KeepClient implements a block cache,
+ so repeated requests for the same block will not result in repeated
+ downloads (unless the block is evicted from the cache.) This method
+ does not block.
+ """
+
+ self._start_prefetch_threads()
+ self._prefetch_queue.put(locator)
+
+ def stop_prefetch_threads(self):
+ with self.lock:
+ if self._prefetch_threads is not None:
+ for t in self._prefetch_threads:
+ self._prefetch_queue.put(None)
+ for t in self._prefetch_threads:
+ t.join()
+ self._prefetch_threads = None
+ self._prefetch_queue = None
+