18941: Separate get() behavior for prefetch
authorPeter Amstutz <peter.amstutz@curii.com>
Tue, 29 Mar 2022 20:52:47 +0000 (16:52 -0400)
committerPeter Amstutz <peter.amstutz@curii.com>
Tue, 29 Mar 2022 20:52:47 +0000 (16:52 -0400)
Add flag to get() which causes it to return None immediately if a cache
slot already exists for a block.

In the standard behavior, if multiple readers try to get() the same
block, the first one will start downloading the block, and all the
others will wait and return the block content from the cache slot
when complete.

With the new optional behavior, if multiple readers try to get() the same
block, the first one will start downloading the block, and all the
others will immediately return None.

Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz@curii.com>

sdk/python/arvados/arvfile.py
sdk/python/arvados/keep.py

index fbf593d02619810b1647f78520217ac6a9b4bef7..a13575b715922f306c14ce4bb6a82fbe766fae8f 100644 (file)
@@ -593,10 +593,7 @@ class _BlockManager(object):
                 b = self._prefetch_queue.get()
                 if b is None:
                     return
-                if self._keep.has_cache_slot(b):
-                    continue
-                _logger.debug("prefetching %s", b)
-                self._keep.get(b)
+                self._keep.get(b, cache_slot_get=False)
             except Exception:
                 _logger.exception("Exception doing block prefetch")
 
@@ -844,9 +841,6 @@ class _BlockManager(object):
         if not self.prefetch_enabled:
             return
 
-        if self._keep.has_cache_slot(locator):
-            return
-
         with self.lock:
             if locator in self._bufferblocks:
                 return
index 94104586deb46a4c24c05c41e683a50b28c69d1d..43d71f17e4cb160238b729d740390f3f7a5bdde4 100644 (file)
@@ -1045,10 +1045,6 @@ class KeepClient(object):
         else:
             return None
 
-    def has_cache_slot(self, loc_s):
-        locator = KeepLocator(loc_s)
-        return self.block_cache.get(locator.md5sum) is not None
-
     def refresh_signature(self, loc):
         """Ask Keep to get the remote block and return its local signature"""
         now = datetime.datetime.utcnow().isoformat("T") + 'Z'
@@ -1062,7 +1058,7 @@ class KeepClient(object):
     def get(self, loc_s, **kwargs):
         return self._get_or_head(loc_s, method="GET", **kwargs)
 
-    def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None):
+    def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, cache_slot_get=True):
         """Get data from Keep.
 
         This method fetches one or more blocks of data from Keep.  It
@@ -1102,11 +1098,14 @@ class KeepClient(object):
                 slot, first = self.block_cache.reserve_cache(locator.md5sum)
                 if not first:
                     self.hits_counter.add(1)
-                    blob = slot.get()
-                    if blob is None:
-                        raise arvados.errors.KeepReadError(
-                            "failed to read {}".format(loc_s))
-                    return blob
+                    if cache_slot_get:
+                        blob = slot.get()
+                        if blob is None:
+                            raise arvados.errors.KeepReadError(
+                                "failed to read {}".format(loc_s))
+                        return blob
+                    else:
+                        return None
 
             self.misses_counter.add(1)