def __init__(self, api_client=None, proxy=None,
timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
api_token=None, local_store=None, block_cache=None,
- num_retries=0, session=None):
+ num_retries=10, session=None, num_prefetch_threads=None):
"""Initialize a new KeepClient.
Arguments:
:num_retries:
The default number of times to retry failed requests.
This will be used as the default num_retries value when get() and
- put() are called. Default 0.
+ put() are called. Default 10.
"""
self.lock = threading.Lock()
if proxy is None:
self.misses_counter = Counter()
self._storage_classes_unsupported_warning = False
self._default_classes = []
+ self.num_prefetch_threads = num_prefetch_threads or 2
+ self._prefetch_queue = None
+ self._prefetch_threads = None
if local_store:
self.local_store = local_store
try:
locator = KeepLocator(loc_s)
if method == "GET":
- slot, first = self.block_cache.reserve_cache(locator.md5sum)
- if not first:
+ while slot is None:
+ slot, first = self.block_cache.reserve_cache(locator.md5sum)
+ if first:
+ # Fresh and empty "first time it is used" slot
+ break
if prefetch:
- # this is request for a prefetch, if it is
- # already in flight, return immediately.
- # clear 'slot' to prevent finally block from
- # calling slot.set()
+ # this is request for a prefetch to fill in
+ # the cache, don't need to wait for the
+ # result, so if it is already in flight return
+ # immediately. Clear 'slot' to prevent
+ # finally block from calling slot.set()
slot = None
return None
- self.hits_counter.add(1)
+
blob = slot.get()
- if blob is None:
- raise arvados.errors.KeepReadError(
- "failed to read {}".format(loc_s))
- return blob
+ if blob is not None:
+ self.hits_counter.add(1)
+ return blob
+
+ # If blob is None, this means either
+ #
+ # (a) another thread was fetching this block and
+ # failed with an error or
+ #
+ # (b) cache thrashing caused the slot to be
+ # evicted (content set to None) by another thread
+ # between the call to reserve_cache() and get().
+ #
+ # We'll handle these cases by reserving a new slot
+ # and then doing a full GET request.
+ slot = None
self.misses_counter.add(1)
"[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
+ def _block_prefetch_worker(self):
+ """The background downloader thread."""
+ while True:
+ try:
+ b = self._prefetch_queue.get()
+ if b is None:
+ return
+ self.get(b, prefetch=True)
+ except Exception:
+ _logger.exception("Exception doing block prefetch")
+
+ def _start_prefetch_threads(self):
+ if self._prefetch_threads is None:
+ with self.lock:
+ if self._prefetch_threads is not None:
+ return
+ self._prefetch_queue = queue.Queue()
+ self._prefetch_threads = []
+ for i in range(0, self.num_prefetch_threads):
+ thread = threading.Thread(target=self._block_prefetch_worker)
+ self._prefetch_threads.append(thread)
+ thread.daemon = True
+ thread.start()
+
+ def block_prefetch(self, locator):
+ """
+ This relies on the fact that KeepClient implements a block cache,
+ so repeated requests for the same block will not result in repeated
+ downloads (unless the block is evicted from the cache.) This method
+ does not block.
+ """
+
+ self._start_prefetch_threads()
+ self._prefetch_queue.put(locator)
+
+ def stop_prefetch_threads(self):
+ with self.lock:
+ if self._prefetch_threads is not None:
+ for t in self._prefetch_threads:
+ self._prefetch_queue.put(None)
+ for t in self._prefetch_threads:
+ t.join()
+ self._prefetch_threads = None
+ self._prefetch_queue = None
+
def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
"""A stub for put().