X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/bb39fb01d3147c6009ee35920ae0637201b11dd2..5c260a4bfcce9f967dc1518bd52aaaa6d6335c60:/sdk/python/arvados/keep.py diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py index a2c8fd2494..4b00f7df8b 100644 --- a/sdk/python/arvados/keep.py +++ b/sdk/python/arvados/keep.py @@ -835,7 +835,7 @@ class KeepClient(object): def __init__(self, api_client=None, proxy=None, timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT, api_token=None, local_store=None, block_cache=None, - num_retries=10, session=None): + num_retries=10, session=None, num_prefetch_threads=None): """Initialize a new KeepClient. Arguments: @@ -924,6 +924,9 @@ class KeepClient(object): self.misses_counter = Counter() self._storage_classes_unsupported_warning = False self._default_classes = [] + self.num_prefetch_threads = num_prefetch_threads or 2 + self._prefetch_queue = None + self._prefetch_threads = None if local_store: self.local_store = local_store @@ -1391,6 +1394,51 @@ class KeepClient(object): "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format( request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service") + def _block_prefetch_worker(self): + """The background downloader thread.""" + while True: + try: + b = self._prefetch_queue.get() + if b is None: + return + self.get(b, prefetch=True) + except Exception: + _logger.exception("Exception doing block prefetch") + + def _start_prefetch_threads(self): + if self._prefetch_threads is None: + with self.lock: + if self._prefetch_threads is not None: + return + self._prefetch_queue = queue.Queue() + self._prefetch_threads = [] + for i in range(0, self.num_prefetch_threads): + thread = threading.Thread(target=self._block_prefetch_worker) + self._prefetch_threads.append(thread) + thread.daemon = True + thread.start() + + def block_prefetch(self, locator): + """ + This relies on the fact that KeepClient implements a block cache, + so repeated requests for the same block will not result in repeated + downloads (unless the block is evicted from the cache.) This method + does not block. + """ + + self._start_prefetch_threads() + self._prefetch_queue.put(locator) + + def stop_prefetch_threads(self): + with self.lock: + if self._prefetch_threads is not None: + for t in self._prefetch_threads: + self._prefetch_queue.put(None) + for t in self._prefetch_threads: + t.join() + self._prefetch_threads = None + self._prefetch_queue = None + def local_store_put(self, data, copies=1, num_retries=None, classes=[]): """A stub for put().