def __init__(self, api_client=None, proxy=None,
timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
api_token=None, local_store=None, block_cache=None,
- num_retries=10, session=None):
+ num_retries=10, session=None, num_prefetch_threads=None):
"""Initialize a new KeepClient.
Arguments:
self.misses_counter = Counter()
self._storage_classes_unsupported_warning = False
self._default_classes = []
+ self.num_prefetch_threads = num_prefetch_threads or 2
+ self._prefetch_queue = None
+ self._prefetch_threads = None
if local_store:
self.local_store = local_store
"[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
+ def _block_prefetch_worker(self):
+ """The background downloader thread."""
+ while True:
+ try:
+ b = self._prefetch_queue.get()
+ if b is None:
+ return
+ self.get(b, prefetch=True)
+ except Exception:
+ _logger.exception("Exception doing block prefetch")
+
+ def _start_prefetch_threads(self):
+ if self._prefetch_threads is None:
+ with self.lock:
+ if self._prefetch_threads is not None:
+ return
+ self._prefetch_queue = queue.Queue()
+ self._prefetch_threads = []
+ for i in range(0, self.num_prefetch_threads):
+ thread = threading.Thread(target=self._block_prefetch_worker)
+ self._prefetch_threads.append(thread)
+ thread.daemon = True
+ thread.start()
+
+ def block_prefetch(self, locator):
+ """
+ This relies on the fact that KeepClient implements a block cache,
+ so repeated requests for the same block will not result in repeated
+ downloads (unless the block is evicted from the cache.) This method
+ does not block.
+ """
+
+ self._start_prefetch_threads()
+ self._prefetch_queue.put(locator)
+
+ def stop_prefetch_threads(self):
+ with self.lock:
+ if self._prefetch_threads is not None:
+ for t in self._prefetch_threads:
+ self._prefetch_queue.put(None)
+ for t in self._prefetch_threads:
+ t.join()
+ self._prefetch_threads = None
+ self._prefetch_queue = None
+
def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
"""A stub for put().