Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz@curii.com>
b = self._prefetch_queue.get()
if b is None:
return
b = self._prefetch_queue.get()
if b is None:
return
- self._keep.get(b, cache_slot_get=False)
+ self._keep.get(b, prefetch=True)
except Exception:
_logger.exception("Exception doing block prefetch")
except Exception:
_logger.exception("Exception doing block prefetch")
return
self.start_get_threads()
return
self.start_get_threads()
- # _logger.debug("pushing %s to prefetch", locator)
self._prefetch_queue.put(locator)
self._prefetch_queue.put(locator)
def get(self, loc_s, **kwargs):
return self._get_or_head(loc_s, method="GET", **kwargs)
def get(self, loc_s, **kwargs):
return self._get_or_head(loc_s, method="GET", **kwargs)
- def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, cache_slot_get=True):
+ def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
"""Get data from Keep.
This method fetches one or more blocks of data from Keep. It
"""Get data from Keep.
This method fetches one or more blocks of data from Keep. It
if method == "GET":
slot, first = self.block_cache.reserve_cache(locator.md5sum)
if not first:
if method == "GET":
slot, first = self.block_cache.reserve_cache(locator.md5sum)
if not first:
- self.hits_counter.add(1)
- if cache_slot_get:
- blob = slot.get()
- if blob is None:
- raise arvados.errors.KeepReadError(
- "failed to read {}".format(loc_s))
- return blob
- else:
- slot = None # prevent finally from calling slot.set()
+ if prefetch:
+ # this is request for a prefetch, if it is
+ # already in flight, return immediately.
+ # clear 'slot' to prevent finally block from
+ # calling slot.set()
+ slot = None
+ self.hits_counter.add(1)
+ blob = slot.get()
+ if blob is None:
+ raise arvados.errors.KeepReadError(
+ "failed to read {}".format(loc_s))
+ return blob
self.misses_counter.add(1)
self.misses_counter.add(1)
def __init__(self, blocks):
self.blocks = blocks
self.requests = []
def __init__(self, blocks):
self.blocks = blocks
self.requests = []
- def get(self, locator, num_retries=0, cache_slot_get=None):
+ def get(self, locator, num_retries=0, prefetch=False):
self.requests.append(locator)
return self.blocks.get(locator)
def get_from_cache(self, locator):
self.requests.append(locator)
return self.blocks.get(locator)
def get_from_cache(self, locator):
def __init__(self, content, num_retries=0):
self.content = content
def __init__(self, content, num_retries=0):
self.content = content
- def get(self, locator, num_retries=0, cache_slot_get=None):
+ def get(self, locator, num_retries=0, prefetch=False):
return self.content[locator]
def test_stream_reader(self):
return self.content[locator]
def test_stream_reader(self):