18941: Rename cache_slot_get option to 'prefetch' for clarity
authorPeter Amstutz <peter.amstutz@curii.com>
Wed, 30 Mar 2022 03:09:46 +0000 (23:09 -0400)
committerPeter Amstutz <peter.amstutz@curii.com>
Wed, 30 Mar 2022 03:09:46 +0000 (23:09 -0400)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz@curii.com>

sdk/python/arvados/arvfile.py
sdk/python/arvados/keep.py
sdk/python/tests/test_arvfile.py
sdk/python/tests/test_collections.py

index a13575b715922f306c14ce4bb6a82fbe766fae8f..2ce0e46b30bd67ad948f832183ab091865c2ea53 100644 (file)
@@ -593,7 +593,7 @@ class _BlockManager(object):
                 b = self._prefetch_queue.get()
                 if b is None:
                     return
-                self._keep.get(b, cache_slot_get=False)
+                self._keep.get(b, prefetch=True)
             except Exception:
                 _logger.exception("Exception doing block prefetch")
 
@@ -846,7 +846,6 @@ class _BlockManager(object):
                 return
 
         self.start_get_threads()
-        # _logger.debug("pushing %s to prefetch", locator)
         self._prefetch_queue.put(locator)
 
 
index 53776017dbad07cfc1e73f96f0f0d978626a4f98..7c05cc0a6a2c72ca818686b6eea5c6f0a4874d3d 100644 (file)
@@ -1058,7 +1058,7 @@ class KeepClient(object):
     def get(self, loc_s, **kwargs):
         return self._get_or_head(loc_s, method="GET", **kwargs)
 
-    def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, cache_slot_get=True):
+    def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
         """Get data from Keep.
 
         This method fetches one or more blocks of data from Keep.  It
@@ -1097,16 +1097,19 @@ class KeepClient(object):
             if method == "GET":
                 slot, first = self.block_cache.reserve_cache(locator.md5sum)
                 if not first:
-                    self.hits_counter.add(1)
-                    if cache_slot_get:
-                        blob = slot.get()
-                        if blob is None:
-                            raise arvados.errors.KeepReadError(
-                                "failed to read {}".format(loc_s))
-                        return blob
-                    else:
-                        slot = None  # prevent finally from calling slot.set()
+                    if prefetch:
+                        # this is request for a prefetch, if it is
+                        # already in flight, return immediately.
+                        # clear 'slot' to prevent finally block from
+                        # calling slot.set()
+                        slot = None
                         return None
+                    self.hits_counter.add(1)
+                    blob = slot.get()
+                    if blob is None:
+                        raise arvados.errors.KeepReadError(
+                            "failed to read {}".format(loc_s))
+                    return blob
 
             self.misses_counter.add(1)
 
index fce48479a250cc9213d03732cec815509d117ab4..b45a592ecd0fbd1b1d4722bd63f6e1e0b25514dd 100644 (file)
@@ -27,7 +27,7 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
         def __init__(self, blocks):
             self.blocks = blocks
             self.requests = []
-        def get(self, locator, num_retries=0, cache_slot_get=None):
+        def get(self, locator, num_retries=0, prefetch=False):
             self.requests.append(locator)
             return self.blocks.get(locator)
         def get_from_cache(self, locator):
index e2d644b8676ba3a57acdccb148be1fd998826b40..5cf4993b2f3804d22209ae16db41fc7bc505efd8 100644 (file)
@@ -320,7 +320,7 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
         def __init__(self, content, num_retries=0):
             self.content = content
 
-        def get(self, locator, num_retries=0, cache_slot_get=None):
+        def get(self, locator, num_retries=0, prefetch=False):
             return self.content[locator]
 
     def test_stream_reader(self):