21639: Reenable prefetch, but not on every read()
[arvados.git] / sdk / python / arvados / keep.py
index 0018687ff35a585c33ce07378acb7f05e0b98522..d1be6b931e7b0ea1ae8009076a0c684aedaa3a2b 100644 (file)
@@ -15,6 +15,7 @@ from builtins import object
 import collections
 import datetime
 import hashlib
 import collections
 import datetime
 import hashlib
+import errno
 import io
 import logging
 import math
 import io
 import logging
 import math
@@ -26,8 +27,11 @@ import socket
 import ssl
 import sys
 import threading
 import ssl
 import sys
 import threading
+import resource
 from . import timer
 import urllib.parse
 from . import timer
 import urllib.parse
+import traceback
+import weakref
 
 if sys.version_info >= (3, 0):
     from io import BytesIO
 
 if sys.version_info >= (3, 0):
     from io import BytesIO
@@ -39,6 +43,8 @@ import arvados.config as config
 import arvados.errors
 import arvados.retry as retry
 import arvados.util
 import arvados.errors
 import arvados.retry as retry
 import arvados.util
+import arvados.diskcache
+from arvados._pycurlhelper import PyCurlHelper
 
 _logger = logging.getLogger('arvados.keep')
 global_client_object = None
 
 _logger = logging.getLogger('arvados.keep')
 global_client_object = None
@@ -159,7 +165,6 @@ class Keep(object):
                config.get('ARVADOS_API_TOKEN'),
                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
                config.get('ARVADOS_KEEP_PROXY'),
                config.get('ARVADOS_API_TOKEN'),
                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
                config.get('ARVADOS_KEEP_PROXY'),
-               config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
                os.environ.get('KEEP_LOCAL_STORE'))
         if (global_client_object is None) or (cls._last_key != key):
             global_client_object = KeepClient()
                os.environ.get('KEEP_LOCAL_STORE'))
         if (global_client_object is None) or (cls._last_key != key):
             global_client_object = KeepClient()
@@ -175,11 +180,65 @@ class Keep(object):
         return Keep.global_client_object().put(data, **kwargs)
 
 class KeepBlockCache(object):
         return Keep.global_client_object().put(data, **kwargs)
 
 class KeepBlockCache(object):
-    # Default RAM cache is 256MiB
-    def __init__(self, cache_max=(256 * 1024 * 1024)):
+    def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
         self.cache_max = cache_max
         self.cache_max = cache_max
-        self._cache = []
+        self._cache = collections.OrderedDict()
         self._cache_lock = threading.Lock()
         self._cache_lock = threading.Lock()
+        self._max_slots = max_slots
+        self._disk_cache = disk_cache
+        self._disk_cache_dir = disk_cache_dir
+        self._cache_updating = threading.Condition(self._cache_lock)
+
+        if self._disk_cache and self._disk_cache_dir is None:
+            self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
+            os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True)
+
+        if self._max_slots == 0:
+            if self._disk_cache:
+                # Each block uses two file descriptors, one used to
+                # open it initially and hold the flock(), and a second
+                # hidden one used by mmap().
+                #
+                # Set max slots to 1/8 of maximum file handles.  This
+                # means we'll use at most 1/4 of total file handles.
+                #
+                # NOFILE typically defaults to 1024 on Linux so this
+                # is 128 slots (256 file handles), which means we can
+                # cache up to 8 GiB of 64 MiB blocks.  This leaves
+                # 768 file handles for sockets and other stuff.
+                #
+                # When we want the ability to have more cache (e.g. in
+                # arv-mount) we'll increase rlimit before calling
+                # this.
+                self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8)
+            else:
+                # RAM cache slots
+                self._max_slots = 512
+
+        if self.cache_max == 0:
+            if self._disk_cache:
+                fs = os.statvfs(self._disk_cache_dir)
+                # Calculation of available space incorporates existing cache usage
+                existing_usage = arvados.diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
+                avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
+                maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
+                # pick smallest of:
+                # 10% of total disk size
+                # 25% of available space
+                # max_slots * 64 MiB
+                self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
+            else:
+                # 256 MiB in RAM
+                self.cache_max = (256 * 1024 * 1024)
+
+        self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
+
+        self.cache_total = 0
+        if self._disk_cache:
+            self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
+            for slot in self._cache.values():
+                self.cache_total += slot.size()
+            self.cap_cache()
 
     class CacheSlot(object):
         __slots__ = ("locator", "ready", "content")
 
     class CacheSlot(object):
         __slots__ = ("locator", "ready", "content")
@@ -194,8 +253,11 @@ class KeepBlockCache(object):
             return self.content
 
         def set(self, value):
             return self.content
 
         def set(self, value):
+            if self.content is not None:
+                return False
             self.content = value
             self.ready.set()
             self.content = value
             self.ready.set()
+            return True
 
         def size(self):
             if self.content is None:
 
         def size(self):
             if self.content is None:
@@ -203,29 +265,50 @@ class KeepBlockCache(object):
             else:
                 return len(self.content)
 
             else:
                 return len(self.content)
 
+        def evict(self):
+            self.content = None
+
+
+    def _resize_cache(self, cache_max, max_slots):
+        # Try and make sure the contents of the cache do not exceed
+        # the supplied maximums.
+
+        if self.cache_total <= cache_max and len(self._cache) <= max_slots:
+            return
+
+        _evict_candidates = collections.deque(self._cache.values())
+        while _evict_candidates and (self.cache_total > cache_max or len(self._cache) > max_slots):
+            slot = _evict_candidates.popleft()
+            if not slot.ready.is_set():
+                continue
+
+            sz = slot.size()
+            slot.evict()
+            self.cache_total -= sz
+            del self._cache[slot.locator]
+
+
     def cap_cache(self):
         '''Cap the cache size to self.cache_max'''
     def cap_cache(self):
         '''Cap the cache size to self.cache_max'''
-        with self._cache_lock:
-            # Select all slots except those where ready.is_set() and content is
-            # None (that means there was an error reading the block).
-            self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
-            sm = sum([slot.size() for slot in self._cache])
-            while len(self._cache) > 0 and sm > self.cache_max:
-                for i in range(len(self._cache)-1, -1, -1):
-                    if self._cache[i].ready.is_set():
-                        del self._cache[i]
-                        break
-                sm = sum([slot.size() for slot in self._cache])
+        with self._cache_updating:
+            self._resize_cache(self.cache_max, self._max_slots)
+            self._cache_updating.notify_all()
 
     def _get(self, locator):
         # Test if the locator is already in the cache
 
     def _get(self, locator):
         # Test if the locator is already in the cache
-        for i in range(0, len(self._cache)):
-            if self._cache[i].locator == locator:
-                n = self._cache[i]
-                if i != 0:
-                    # move it to the front
-                    del self._cache[i]
-                    self._cache.insert(0, n)
+        if locator in self._cache:
+            n = self._cache[locator]
+            if n.ready.is_set() and n.content is None:
+                del self._cache[n.locator]
+                return None
+            self._cache.move_to_end(locator)
+            return n
+        if self._disk_cache:
+            # see if it exists on disk
+            n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
+            if n is not None:
+                self._cache[n.locator] = n
+                self.cache_total += n.size()
                 return n
         return None
 
                 return n
         return None
 
@@ -236,16 +319,68 @@ class KeepBlockCache(object):
     def reserve_cache(self, locator):
         '''Reserve a cache slot for the specified locator,
         or return the existing slot.'''
     def reserve_cache(self, locator):
         '''Reserve a cache slot for the specified locator,
         or return the existing slot.'''
-        with self._cache_lock:
+        with self._cache_updating:
             n = self._get(locator)
             if n:
                 return n, False
             else:
                 # Add a new cache slot for the locator
             n = self._get(locator)
             if n:
                 return n, False
             else:
                 # Add a new cache slot for the locator
-                n = KeepBlockCache.CacheSlot(locator)
-                self._cache.insert(0, n)
+                self._resize_cache(self.cache_max, self._max_slots-1)
+                while len(self._cache) >= self._max_slots:
+                    # If there isn't a slot available, need to wait
+                    # for something to happen that releases one of the
+                    # cache slots.  Idle for 200 ms or woken up by
+                    # another thread
+                    self._cache_updating.wait(timeout=0.2)
+                    self._resize_cache(self.cache_max, self._max_slots-1)
+
+                if self._disk_cache:
+                    n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
+                else:
+                    n = KeepBlockCache.CacheSlot(locator)
+                self._cache[n.locator] = n
                 return n, True
 
                 return n, True
 
+    def set(self, slot, blob):
+        try:
+            if slot.set(blob):
+                self.cache_total += slot.size()
+            return
+        except OSError as e:
+            if e.errno == errno.ENOMEM:
+                # Reduce max slots to current - 4, cap cache and retry
+                with self._cache_lock:
+                    self._max_slots = max(4, len(self._cache) - 4)
+            elif e.errno == errno.ENOSPC:
+                # Reduce disk max space to current - 256 MiB, cap cache and retry
+                with self._cache_lock:
+                    sm = sum(st.size() for st in self._cache.values())
+                    self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
+            elif e.errno == errno.ENODEV:
+                _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
+        except Exception as e:
+            pass
+        finally:
+            # Check if we should evict things from the cache.  Either
+            # because we added a new thing or there was an error and
+            # we possibly adjusted the limits down, so we might need
+            # to push something out.
+            self.cap_cache()
+
+        try:
+            # Only gets here if there was an error the first time. The
+            # exception handler adjusts limits downward in some cases
+            # to free up resources, which would make the operation
+            # succeed.
+            if slot.set(blob):
+                self.cache_total += slot.size()
+        except Exception as e:
+            # It failed again.  Give up.
+            slot.set(None)
+            raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
+
+        self.cap_cache()
+
 class Counter(object):
     def __init__(self, v=0):
         self._lk = threading.Lock()
 class Counter(object):
     def __init__(self, v=0):
         self._lk = threading.Lock()
@@ -261,18 +396,10 @@ class Counter(object):
 
 
 class KeepClient(object):
 
 
 class KeepClient(object):
+    DEFAULT_TIMEOUT = PyCurlHelper.DEFAULT_TIMEOUT
+    DEFAULT_PROXY_TIMEOUT = PyCurlHelper.DEFAULT_PROXY_TIMEOUT
 
 
-    # Default Keep server connection timeout:  2 seconds
-    # Default Keep server read timeout:       256 seconds
-    # Default Keep server bandwidth minimum:  32768 bytes per second
-    # Default Keep proxy connection timeout:  20 seconds
-    # Default Keep proxy read timeout:        256 seconds
-    # Default Keep proxy bandwidth minimum:   32768 bytes per second
-    DEFAULT_TIMEOUT = (2, 256, 32768)
-    DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
-
-
-    class KeepService(object):
+    class KeepService(PyCurlHelper):
         """Make requests to a single Keep service, and track results.
 
         A KeepService is intended to last long enough to perform one
         """Make requests to a single Keep service, and track results.
 
         A KeepService is intended to last long enough to perform one
@@ -295,6 +422,7 @@ class KeepClient(object):
                      download_counter=None,
                      headers={},
                      insecure=False):
                      download_counter=None,
                      headers={},
                      insecure=False):
+            super(KeepClient.KeepService, self).__init__()
             self.root = root
             self._user_agent_pool = user_agent_pool
             self._result = {'error': None}
             self.root = root
             self._user_agent_pool = user_agent_pool
             self._result = {'error': None}
@@ -332,30 +460,6 @@ class KeepClient(object):
             except:
                 ua.close()
 
             except:
                 ua.close()
 
-        def _socket_open(self, *args, **kwargs):
-            if len(args) + len(kwargs) == 2:
-                return self._socket_open_pycurl_7_21_5(*args, **kwargs)
-            else:
-                return self._socket_open_pycurl_7_19_3(*args, **kwargs)
-
-        def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
-            return self._socket_open_pycurl_7_21_5(
-                purpose=None,
-                address=collections.namedtuple(
-                    'Address', ['family', 'socktype', 'protocol', 'addr'],
-                )(family, socktype, protocol, address))
-
-        def _socket_open_pycurl_7_21_5(self, purpose, address):
-            """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
-            s = socket.socket(address.family, address.socktype, address.protocol)
-            s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
-            # Will throw invalid protocol error on mac. This test prevents that.
-            if hasattr(socket, 'TCP_KEEPIDLE'):
-                s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
-            s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
-            self._socket = s
-            return s
-
         def get(self, locator, method="GET", timeout=None):
             # locator is a KeepLocator object.
             url = self.root + str(locator)
         def get(self, locator, method="GET", timeout=None):
             # locator is a KeepLocator object.
             url = self.root + str(locator)
@@ -376,10 +480,13 @@ class KeepClient(object):
                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
                     if self.insecure:
                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
                     if self.insecure:
                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
+                        curl.setopt(pycurl.SSL_VERIFYHOST, 0)
                     else:
                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
                     if method == "HEAD":
                         curl.setopt(pycurl.NOBODY, True)
                     else:
                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
                     if method == "HEAD":
                         curl.setopt(pycurl.NOBODY, True)
+                    else:
+                        curl.setopt(pycurl.HTTPGET, True)
                     self._setcurltimeouts(curl, timeout, method=="HEAD")
 
                     try:
                     self._setcurltimeouts(curl, timeout, method=="HEAD")
 
                     try:
@@ -478,6 +585,7 @@ class KeepClient(object):
                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
                     if self.insecure:
                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
                     if self.insecure:
                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
+                        curl.setopt(pycurl.SSL_VERIFYHOST, 0)
                     else:
                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
                     self._setcurltimeouts(curl, timeout)
                     else:
                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
                     self._setcurltimeouts(curl, timeout)
@@ -523,43 +631,6 @@ class KeepClient(object):
                 self.upload_counter.add(len(body))
             return True
 
                 self.upload_counter.add(len(body))
             return True
 
-        def _setcurltimeouts(self, curl, timeouts, ignore_bandwidth=False):
-            if not timeouts:
-                return
-            elif isinstance(timeouts, tuple):
-                if len(timeouts) == 2:
-                    conn_t, xfer_t = timeouts
-                    bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
-                else:
-                    conn_t, xfer_t, bandwidth_bps = timeouts
-            else:
-                conn_t, xfer_t = (timeouts, timeouts)
-                bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
-            curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
-            if not ignore_bandwidth:
-                curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
-                curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
-
-        def _headerfunction(self, header_line):
-            if isinstance(header_line, bytes):
-                header_line = header_line.decode('iso-8859-1')
-            if ':' in header_line:
-                name, value = header_line.split(':', 1)
-                name = name.strip().lower()
-                value = value.strip()
-            elif self._headers:
-                name = self._lastheadername
-                value = self._headers[name] + ' ' + header_line.strip()
-            elif header_line.startswith('HTTP/'):
-                name = 'x-status-line'
-                value = header_line
-            else:
-                _logger.error("Unexpected header line: %s", header_line)
-                return
-            self._lastheadername = name
-            self._headers[name] = value
-            # Returning None implies all bytes were written
-
 
     class KeepWriterQueue(queue.Queue):
         def __init__(self, copies, classes=[]):
 
     class KeepWriterQueue(queue.Queue):
         def __init__(self, copies, classes=[]):
@@ -754,7 +825,7 @@ class KeepClient(object):
     def __init__(self, api_client=None, proxy=None,
                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
                  api_token=None, local_store=None, block_cache=None,
     def __init__(self, api_client=None, proxy=None,
                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
                  api_token=None, local_store=None, block_cache=None,
-                 num_retries=0, session=None):
+                 num_retries=10, session=None, num_prefetch_threads=None):
         """Initialize a new KeepClient.
 
         Arguments:
         """Initialize a new KeepClient.
 
         Arguments:
@@ -807,7 +878,7 @@ class KeepClient(object):
         :num_retries:
           The default number of times to retry failed requests.
           This will be used as the default num_retries value when get() and
         :num_retries:
           The default number of times to retry failed requests.
           This will be used as the default num_retries value when get() and
-          put() are called.  Default 0.
+          put() are called.  Default 10.
         """
         self.lock = threading.Lock()
         if proxy is None:
         """
         self.lock = threading.Lock()
         if proxy is None:
@@ -843,6 +914,12 @@ class KeepClient(object):
         self.misses_counter = Counter()
         self._storage_classes_unsupported_warning = False
         self._default_classes = []
         self.misses_counter = Counter()
         self._storage_classes_unsupported_warning = False
         self._default_classes = []
+        if num_prefetch_threads is not None:
+            self.num_prefetch_threads = num_prefetch_threads
+        else:
+            self.num_prefetch_threads = 2
+        self._prefetch_queue = None
+        self._prefetch_threads = None
 
         if local_store:
             self.local_store = local_store
 
         if local_store:
             self.local_store = local_store
@@ -1034,9 +1111,10 @@ class KeepClient(object):
         else:
             return None
 
         else:
             return None
 
-    def get_from_cache(self, loc):
+    def get_from_cache(self, loc_s):
         """Fetch a block only if is in the cache, otherwise return None."""
         """Fetch a block only if is in the cache, otherwise return None."""
-        slot = self.block_cache.get(loc)
+        locator = KeepLocator(loc_s)
+        slot = self.block_cache.get(locator.md5sum)
         if slot is not None and slot.ready.is_set():
             return slot.get()
         else:
         if slot is not None and slot.ready.is_set():
             return slot.get()
         else:
@@ -1055,7 +1133,7 @@ class KeepClient(object):
     def get(self, loc_s, **kwargs):
         return self._get_or_head(loc_s, method="GET", **kwargs)
 
     def get(self, loc_s, **kwargs):
         return self._get_or_head(loc_s, method="GET", **kwargs)
 
-    def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None):
+    def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
         """Get data from Keep.
 
         This method fetches one or more blocks of data from Keep.  It
         """Get data from Keep.
 
         This method fetches one or more blocks of data from Keep.  It
@@ -1092,14 +1170,39 @@ class KeepClient(object):
         try:
             locator = KeepLocator(loc_s)
             if method == "GET":
         try:
             locator = KeepLocator(loc_s)
             if method == "GET":
-                slot, first = self.block_cache.reserve_cache(locator.md5sum)
-                if not first:
-                    self.hits_counter.add(1)
+                while slot is None:
+                    slot, first = self.block_cache.reserve_cache(locator.md5sum)
+                    if first:
+                        # Fresh and empty "first time it is used" slot
+                        break
+                    if prefetch:
+                        # this is request for a prefetch to fill in
+                        # the cache, don't need to wait for the
+                        # result, so if it is already in flight return
+                        # immediately.  Clear 'slot' to prevent
+                        # finally block from calling slot.set()
+                        if slot.ready.is_set():
+                            slot.get()
+                        slot = None
+                        return None
+
                     blob = slot.get()
                     blob = slot.get()
-                    if blob is None:
-                        raise arvados.errors.KeepReadError(
-                            "failed to read {}".format(loc_s))
-                    return blob
+                    if blob is not None:
+                        self.hits_counter.add(1)
+                        return blob
+
+                    # If blob is None, this means either
+                    #
+                    # (a) another thread was fetching this block and
+                    # failed with an error or
+                    #
+                    # (b) cache thrashing caused the slot to be
+                    # evicted (content set to None) by another thread
+                    # between the call to reserve_cache() and get().
+                    #
+                    # We'll handle these cases by reserving a new slot
+                    # and then doing a full GET request.
+                    slot = None
 
             self.misses_counter.add(1)
 
 
             self.misses_counter.add(1)
 
@@ -1161,8 +1264,7 @@ class KeepClient(object):
                 return blob
         finally:
             if slot is not None:
                 return blob
         finally:
             if slot is not None:
-                slot.set(blob)
-                self.block_cache.cap_cache()
+                self.block_cache.set(slot, blob)
 
         # Q: Including 403 is necessary for the Keep tests to continue
         # passing, but maybe they should expect KeepReadError instead?
 
         # Q: Including 403 is necessary for the Keep tests to continue
         # passing, but maybe they should expect KeepReadError instead?
@@ -1287,6 +1389,54 @@ class KeepClient(object):
                 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
                     request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
 
                 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
                     request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
 
+    def _block_prefetch_worker(self):
+        """The background downloader thread."""
+        while True:
+            try:
+                b = self._prefetch_queue.get()
+                if b is None:
+                    return
+                self.get(b, prefetch=True)
+            except Exception:
+                _logger.exception("Exception doing block prefetch")
+
+    def _start_prefetch_threads(self):
+        if self._prefetch_threads is None:
+            with self.lock:
+                if self._prefetch_threads is not None:
+                    return
+                self._prefetch_queue = queue.Queue()
+                self._prefetch_threads = []
+                for i in range(0, self.num_prefetch_threads):
+                    thread = threading.Thread(target=self._block_prefetch_worker)
+                    self._prefetch_threads.append(thread)
+                    thread.daemon = True
+                    thread.start()
+
+    def block_prefetch(self, locator):
+        """
+        This relies on the fact that KeepClient implements a block cache,
+        so repeated requests for the same block will not result in repeated
+        downloads (unless the block is evicted from the cache.)  This method
+        does not block.
+        """
+
+        if self.block_cache.get(locator) is not None:
+            return
+
+        self._start_prefetch_threads()
+        self._prefetch_queue.put(locator)
+
+    def stop_prefetch_threads(self):
+        with self.lock:
+            if self._prefetch_threads is not None:
+                for t in self._prefetch_threads:
+                    self._prefetch_queue.put(None)
+                for t in self._prefetch_threads:
+                    t.join()
+            self._prefetch_threads = None
+            self._prefetch_queue = None
+
     def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
         """A stub for put().
 
     def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
         """A stub for put().
 
@@ -1330,6 +1480,3 @@ class KeepClient(object):
             return True
         if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
             return True
             return True
         if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
             return True
-
-    def is_cached(self, locator):
-        return self.block_cache.reserve_cache(expect_hash)