3198: Apply StreamFileReader tests to ArvadosFileReader
[arvados.git] / sdk / python / arvados / keep.py
index 36ec56ce05db95b4b09ea2afe737a7583473bafb..a087838d261d59d0d43eaa252693cf041f3f4a94 100644 (file)
@@ -186,24 +186,34 @@ class KeepBlockCache(object):
                         break
                 sm = sum([slot.size() for slot in self._cache])
 
+    def _get(self, locator):
+        # Test if the locator is already in the cache
+        for i in xrange(0, len(self._cache)):
+            if self._cache[i].locator == locator:
+                n = self._cache[i]
+                if i != 0:
+                    # move it to the front
+                    del self._cache[i]
+                    self._cache.insert(0, n)
+                return n
+        return None
+
+    def get(self, locator):
+        with self._cache_lock:
+            return self._get(locator)
+
     def reserve_cache(self, locator):
         '''Reserve a cache slot for the specified locator,
         or return the existing slot.'''
         with self._cache_lock:
-            # Test if the locator is already in the cache
-            for i in xrange(0, len(self._cache)):
-                if self._cache[i].locator == locator:
-                    n = self._cache[i]
-                    if i != 0:
-                        # move it to the front
-                        del self._cache[i]
-                        self._cache.insert(0, n)
-                    return n, False
-
-            # Add a new cache slot for the locator
-            n = KeepBlockCache.CacheSlot(locator)
-            self._cache.insert(0, n)
-            return n, True
+            n = self._get(locator)
+            if n:
+                return n, False
+            else:
+                # Add a new cache slot for the locator
+                n = KeepBlockCache.CacheSlot(locator)
+                self._cache.insert(0, n)
+                return n, True
 
 class KeepClient(object):
 
@@ -576,7 +586,7 @@ class KeepClient(object):
             return None
 
     @retry.retry_method
-    def get(self, loc_s, num_retries=None):
+    def get(self, loc_s, num_retries=None, cache_only=False):
         """Get data from Keep.
 
         This method fetches one or more blocks of data from Keep.  It
@@ -595,12 +605,21 @@ class KeepClient(object):
           to fetch data from every available Keep service, along with any
           that are named in location hints in the locator.  The default value
           is set when the KeepClient is initialized.
+        * cache_only: If true, return the block data only if already present in
+          cache, otherwise return None.
         """
         if ',' in loc_s:
             return ''.join(self.get(x) for x in loc_s.split(','))
         locator = KeepLocator(loc_s)
         expect_hash = locator.md5sum
 
+        if cache_only:
+            slot = self.block_cache.get(expect_hash)
+            if slot.ready.is_set():
+                return slot.get()
+            else:
+                return None
+
         slot, first = self.block_cache.reserve_cache(expect_hash)
         if not first:
             v = slot.get()
@@ -741,3 +760,6 @@ class KeepClient(object):
             return ''
         with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
             return f.read()
+
+    def is_cached(self, locator):
+        return self.block_cache.reserve_cache(expect_hash)