b = self._prefetch_queue.get()
if b is None:
return
- self._keep.get(b, cache_slot_get=False)
+ self._keep.get(b, prefetch=True)
except Exception:
_logger.exception("Exception doing block prefetch")
return
self.start_get_threads()
- # _logger.debug("pushing %s to prefetch", locator)
self._prefetch_queue.put(locator)
def get(self, loc_s, **kwargs):
return self._get_or_head(loc_s, method="GET", **kwargs)
- def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, cache_slot_get=True):
+ def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
"""Get data from Keep.
This method fetches one or more blocks of data from Keep. It
if method == "GET":
slot, first = self.block_cache.reserve_cache(locator.md5sum)
if not first:
- self.hits_counter.add(1)
- if cache_slot_get:
- blob = slot.get()
- if blob is None:
- raise arvados.errors.KeepReadError(
- "failed to read {}".format(loc_s))
- return blob
- else:
- slot = None # prevent finally from calling slot.set()
+ if prefetch:
+ # this is request for a prefetch, if it is
+ # already in flight, return immediately.
+ # clear 'slot' to prevent finally block from
+ # calling slot.set()
+ slot = None
return None
+ self.hits_counter.add(1)
+ blob = slot.get()
+ if blob is None:
+ raise arvados.errors.KeepReadError(
+ "failed to read {}".format(loc_s))
+ return blob
self.misses_counter.add(1)
def __init__(self, blocks):
self.blocks = blocks
self.requests = []
- def get(self, locator, num_retries=0, cache_slot_get=None):
+ def get(self, locator, num_retries=0, prefetch=False):
self.requests.append(locator)
return self.blocks.get(locator)
def get_from_cache(self, locator):
def __init__(self, content, num_retries=0):
self.content = content
- def get(self, locator, num_retries=0, cache_slot_get=None):
+ def get(self, locator, num_retries=0, prefetch=False):
return self.content[locator]
def test_stream_reader(self):