21223: Add a few more --file-cache RLIMIT_NOFILE tests
[arvados.git] / sdk / python / arvados / keep.py
index bc07851835e2471ee9f1055b689fe6a789ea4d62..4b00f7df8b912d95488d86879e3a29b83c067fec 100644 (file)
@@ -15,6 +15,7 @@ from builtins import object
 import collections
 import datetime
 import hashlib
 import collections
 import datetime
 import hashlib
+import errno
 import io
 import logging
 import math
 import io
 import logging
 import math
@@ -26,8 +27,11 @@ import socket
 import ssl
 import sys
 import threading
 import ssl
 import sys
 import threading
+import resource
 from . import timer
 import urllib.parse
 from . import timer
 import urllib.parse
+import traceback
+import weakref
 
 if sys.version_info >= (3, 0):
     from io import BytesIO
 
 if sys.version_info >= (3, 0):
     from io import BytesIO
@@ -39,6 +43,8 @@ import arvados.config as config
 import arvados.errors
 import arvados.retry as retry
 import arvados.util
 import arvados.errors
 import arvados.retry as retry
 import arvados.util
+import arvados.diskcache
+from arvados._pycurlhelper import PyCurlHelper
 
 _logger = logging.getLogger('arvados.keep')
 global_client_object = None
 
 _logger = logging.getLogger('arvados.keep')
 global_client_object = None
@@ -159,7 +165,6 @@ class Keep(object):
                config.get('ARVADOS_API_TOKEN'),
                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
                config.get('ARVADOS_KEEP_PROXY'),
                config.get('ARVADOS_API_TOKEN'),
                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
                config.get('ARVADOS_KEEP_PROXY'),
-               config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
                os.environ.get('KEEP_LOCAL_STORE'))
         if (global_client_object is None) or (cls._last_key != key):
             global_client_object = KeepClient()
                os.environ.get('KEEP_LOCAL_STORE'))
         if (global_client_object is None) or (cls._last_key != key):
             global_client_object = KeepClient()
@@ -175,11 +180,63 @@ class Keep(object):
         return Keep.global_client_object().put(data, **kwargs)
 
 class KeepBlockCache(object):
         return Keep.global_client_object().put(data, **kwargs)
 
 class KeepBlockCache(object):
-    # Default RAM cache is 256MiB
-    def __init__(self, cache_max=(256 * 1024 * 1024)):
+    def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
         self.cache_max = cache_max
         self._cache = []
         self._cache_lock = threading.Lock()
         self.cache_max = cache_max
         self._cache = []
         self._cache_lock = threading.Lock()
+        self._max_slots = max_slots
+        self._disk_cache = disk_cache
+        self._disk_cache_dir = disk_cache_dir
+        self._cache_updating = threading.Condition(self._cache_lock)
+
+        if self._disk_cache and self._disk_cache_dir is None:
+            self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
+            os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True)
+
+        if self._max_slots == 0:
+            if self._disk_cache:
+                # Each block uses two file descriptors, one used to
+                # open it initially and hold the flock(), and a second
+                # hidden one used by mmap().
+                #
+                # Set max slots to 1/8 of maximum file handles.  This
+                # means we'll use at most 1/4 of total file handles.
+                #
+                # NOFILE typically defaults to 1024 on Linux so this
+                # is 128 slots (256 file handles), which means we can
+                # cache up to 8 GiB of 64 MiB blocks.  This leaves
+                # 768 file handles for sockets and other stuff.
+                #
+                # When we want the ability to have more cache (e.g. in
+                # arv-mount) we'll increase rlimit before calling
+                # this.
+                self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8)
+            else:
+                # RAM cache slots
+                self._max_slots = 512
+
+        if self.cache_max == 0:
+            if self._disk_cache:
+                fs = os.statvfs(self._disk_cache_dir)
+                # Calculation of available space incorporates existing cache usage
+                existing_usage = arvados.diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
+                avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
+                maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
+                # pick smallest of:
+                # 10% of total disk size
+                # 25% of available space
+                # max_slots * 64 MiB
+                self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
+            else:
+                # 256 MiB in RAM
+                self.cache_max = (256 * 1024 * 1024)
+
+        self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
+
+        if self._disk_cache:
+            self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
+            self.cap_cache()
+
 
     class CacheSlot(object):
         __slots__ = ("locator", "ready", "content")
 
     class CacheSlot(object):
         __slots__ = ("locator", "ready", "content")
@@ -203,19 +260,51 @@ class KeepBlockCache(object):
             else:
                 return len(self.content)
 
             else:
                 return len(self.content)
 
+        def evict(self):
+            self.content = None
+            return self.gone()
+
+        def gone(self):
+            return (self.content is None)
+
+    def _resize_cache(self, cache_max, max_slots):
+        # Try and make sure the contents of the cache do not exceed
+        # the supplied maximums.
+
+        # Select all slots except those where ready.is_set() and content is
+        # None (that means there was an error reading the block).
+        self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
+        sm = sum([slot.size() for slot in self._cache])
+        while len(self._cache) > 0 and (sm > cache_max or len(self._cache) > max_slots):
+            for i in range(len(self._cache)-1, -1, -1):
+                # start from the back, find a slot that is a candidate to evict
+                if self._cache[i].ready.is_set():
+                    sz = self._cache[i].size()
+
+                    # If evict returns false it means the
+                    # underlying disk cache couldn't lock the file
+                    # for deletion because another process was using
+                    # it. Don't count it as reducing the amount
+                    # of data in the cache, find something else to
+                    # throw out.
+                    if self._cache[i].evict():
+                        sm -= sz
+
+                    # check to make sure the underlying data is gone
+                    if self._cache[i].gone():
+                        # either way we forget about it.  either the
+                        # other process will delete it, or if we need
+                        # it again and it is still there, we'll find
+                        # it on disk.
+                        del self._cache[i]
+                    break
+
+
     def cap_cache(self):
         '''Cap the cache size to self.cache_max'''
     def cap_cache(self):
         '''Cap the cache size to self.cache_max'''
-        with self._cache_lock:
-            # Select all slots except those where ready.is_set() and content is
-            # None (that means there was an error reading the block).
-            self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
-            sm = sum([slot.size() for slot in self._cache])
-            while len(self._cache) > 0 and sm > self.cache_max:
-                for i in range(len(self._cache)-1, -1, -1):
-                    if self._cache[i].ready.is_set():
-                        del self._cache[i]
-                        break
-                sm = sum([slot.size() for slot in self._cache])
+        with self._cache_updating:
+            self._resize_cache(self.cache_max, self._max_slots)
+            self._cache_updating.notify_all()
 
     def _get(self, locator):
         # Test if the locator is already in the cache
 
     def _get(self, locator):
         # Test if the locator is already in the cache
@@ -227,6 +316,12 @@ class KeepBlockCache(object):
                     del self._cache[i]
                     self._cache.insert(0, n)
                 return n
                     del self._cache[i]
                     self._cache.insert(0, n)
                 return n
+        if self._disk_cache:
+            # see if it exists on disk
+            n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
+            if n is not None:
+                self._cache.insert(0, n)
+                return n
         return None
 
     def get(self, locator):
         return None
 
     def get(self, locator):
@@ -236,16 +331,66 @@ class KeepBlockCache(object):
     def reserve_cache(self, locator):
         '''Reserve a cache slot for the specified locator,
         or return the existing slot.'''
     def reserve_cache(self, locator):
         '''Reserve a cache slot for the specified locator,
         or return the existing slot.'''
-        with self._cache_lock:
+        with self._cache_updating:
             n = self._get(locator)
             if n:
                 return n, False
             else:
                 # Add a new cache slot for the locator
             n = self._get(locator)
             if n:
                 return n, False
             else:
                 # Add a new cache slot for the locator
-                n = KeepBlockCache.CacheSlot(locator)
+                self._resize_cache(self.cache_max, self._max_slots-1)
+                while len(self._cache) >= self._max_slots:
+                    # If there isn't a slot available, need to wait
+                    # for something to happen that releases one of the
+                    # cache slots.  Idle for 200 ms or woken up by
+                    # another thread
+                    self._cache_updating.wait(timeout=0.2)
+                    self._resize_cache(self.cache_max, self._max_slots-1)
+
+                if self._disk_cache:
+                    n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
+                else:
+                    n = KeepBlockCache.CacheSlot(locator)
                 self._cache.insert(0, n)
                 return n, True
 
                 self._cache.insert(0, n)
                 return n, True
 
+    def set(self, slot, blob):
+        try:
+            slot.set(blob)
+            return
+        except OSError as e:
+            if e.errno == errno.ENOMEM:
+                # Reduce max slots to current - 4, cap cache and retry
+                with self._cache_lock:
+                    self._max_slots = max(4, len(self._cache) - 4)
+            elif e.errno == errno.ENOSPC:
+                # Reduce disk max space to current - 256 MiB, cap cache and retry
+                with self._cache_lock:
+                    sm = sum([st.size() for st in self._cache])
+                    self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
+            elif e.errno == errno.ENODEV:
+                _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
+        except Exception as e:
+            pass
+        finally:
+            # Check if we should evict things from the cache.  Either
+            # because we added a new thing or there was an error and
+            # we possibly adjusted the limits down, so we might need
+            # to push something out.
+            self.cap_cache()
+
+        try:
+            # Only gets here if there was an error the first time. The
+            # exception handler adjusts limits downward in some cases
+            # to free up resources, which would make the operation
+            # succeed.
+            slot.set(blob)
+        except Exception as e:
+            # It failed again.  Give up.
+            slot.set(None)
+            raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
+
+        self.cap_cache()
+
 class Counter(object):
     def __init__(self, v=0):
         self._lk = threading.Lock()
 class Counter(object):
     def __init__(self, v=0):
         self._lk = threading.Lock()
@@ -261,18 +406,10 @@ class Counter(object):
 
 
 class KeepClient(object):
 
 
 class KeepClient(object):
+    DEFAULT_TIMEOUT = PyCurlHelper.DEFAULT_TIMEOUT
+    DEFAULT_PROXY_TIMEOUT = PyCurlHelper.DEFAULT_PROXY_TIMEOUT
 
 
-    # Default Keep server connection timeout:  2 seconds
-    # Default Keep server read timeout:       256 seconds
-    # Default Keep server bandwidth minimum:  32768 bytes per second
-    # Default Keep proxy connection timeout:  20 seconds
-    # Default Keep proxy read timeout:        256 seconds
-    # Default Keep proxy bandwidth minimum:   32768 bytes per second
-    DEFAULT_TIMEOUT = (2, 256, 32768)
-    DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
-
-
-    class KeepService(object):
+    class KeepService(PyCurlHelper):
         """Make requests to a single Keep service, and track results.
 
         A KeepService is intended to last long enough to perform one
         """Make requests to a single Keep service, and track results.
 
         A KeepService is intended to last long enough to perform one
@@ -295,6 +432,7 @@ class KeepClient(object):
                      download_counter=None,
                      headers={},
                      insecure=False):
                      download_counter=None,
                      headers={},
                      insecure=False):
+            super(KeepClient.KeepService, self).__init__()
             self.root = root
             self._user_agent_pool = user_agent_pool
             self._result = {'error': None}
             self.root = root
             self._user_agent_pool = user_agent_pool
             self._result = {'error': None}
@@ -332,30 +470,6 @@ class KeepClient(object):
             except:
                 ua.close()
 
             except:
                 ua.close()
 
-        def _socket_open(self, *args, **kwargs):
-            if len(args) + len(kwargs) == 2:
-                return self._socket_open_pycurl_7_21_5(*args, **kwargs)
-            else:
-                return self._socket_open_pycurl_7_19_3(*args, **kwargs)
-
-        def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
-            return self._socket_open_pycurl_7_21_5(
-                purpose=None,
-                address=collections.namedtuple(
-                    'Address', ['family', 'socktype', 'protocol', 'addr'],
-                )(family, socktype, protocol, address))
-
-        def _socket_open_pycurl_7_21_5(self, purpose, address):
-            """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
-            s = socket.socket(address.family, address.socktype, address.protocol)
-            s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
-            # Will throw invalid protocol error on mac. This test prevents that.
-            if hasattr(socket, 'TCP_KEEPIDLE'):
-                s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
-            s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
-            self._socket = s
-            return s
-
         def get(self, locator, method="GET", timeout=None):
             # locator is a KeepLocator object.
             url = self.root + str(locator)
         def get(self, locator, method="GET", timeout=None):
             # locator is a KeepLocator object.
             url = self.root + str(locator)
@@ -376,10 +490,13 @@ class KeepClient(object):
                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
                     if self.insecure:
                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
                     if self.insecure:
                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
+                        curl.setopt(pycurl.SSL_VERIFYHOST, 0)
                     else:
                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
                     if method == "HEAD":
                         curl.setopt(pycurl.NOBODY, True)
                     else:
                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
                     if method == "HEAD":
                         curl.setopt(pycurl.NOBODY, True)
+                    else:
+                        curl.setopt(pycurl.HTTPGET, True)
                     self._setcurltimeouts(curl, timeout, method=="HEAD")
 
                     try:
                     self._setcurltimeouts(curl, timeout, method=="HEAD")
 
                     try:
@@ -478,6 +595,7 @@ class KeepClient(object):
                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
                     if self.insecure:
                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
                     if self.insecure:
                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
+                        curl.setopt(pycurl.SSL_VERIFYHOST, 0)
                     else:
                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
                     self._setcurltimeouts(curl, timeout)
                     else:
                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
                     self._setcurltimeouts(curl, timeout)
@@ -523,43 +641,6 @@ class KeepClient(object):
                 self.upload_counter.add(len(body))
             return True
 
                 self.upload_counter.add(len(body))
             return True
 
-        def _setcurltimeouts(self, curl, timeouts, ignore_bandwidth=False):
-            if not timeouts:
-                return
-            elif isinstance(timeouts, tuple):
-                if len(timeouts) == 2:
-                    conn_t, xfer_t = timeouts
-                    bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
-                else:
-                    conn_t, xfer_t, bandwidth_bps = timeouts
-            else:
-                conn_t, xfer_t = (timeouts, timeouts)
-                bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
-            curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
-            if not ignore_bandwidth:
-                curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
-                curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
-
-        def _headerfunction(self, header_line):
-            if isinstance(header_line, bytes):
-                header_line = header_line.decode('iso-8859-1')
-            if ':' in header_line:
-                name, value = header_line.split(':', 1)
-                name = name.strip().lower()
-                value = value.strip()
-            elif self._headers:
-                name = self._lastheadername
-                value = self._headers[name] + ' ' + header_line.strip()
-            elif header_line.startswith('HTTP/'):
-                name = 'x-status-line'
-                value = header_line
-            else:
-                _logger.error("Unexpected header line: %s", header_line)
-                return
-            self._lastheadername = name
-            self._headers[name] = value
-            # Returning None implies all bytes were written
-
 
     class KeepWriterQueue(queue.Queue):
         def __init__(self, copies, classes=[]):
 
     class KeepWriterQueue(queue.Queue):
         def __init__(self, copies, classes=[]):
@@ -754,7 +835,7 @@ class KeepClient(object):
     def __init__(self, api_client=None, proxy=None,
                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
                  api_token=None, local_store=None, block_cache=None,
     def __init__(self, api_client=None, proxy=None,
                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
                  api_token=None, local_store=None, block_cache=None,
-                 num_retries=0, session=None):
+                 num_retries=10, session=None, num_prefetch_threads=None):
         """Initialize a new KeepClient.
 
         Arguments:
         """Initialize a new KeepClient.
 
         Arguments:
@@ -807,7 +888,7 @@ class KeepClient(object):
         :num_retries:
           The default number of times to retry failed requests.
           This will be used as the default num_retries value when get() and
         :num_retries:
           The default number of times to retry failed requests.
           This will be used as the default num_retries value when get() and
-          put() are called.  Default 0.
+          put() are called.  Default 10.
         """
         self.lock = threading.Lock()
         if proxy is None:
         """
         self.lock = threading.Lock()
         if proxy is None:
@@ -843,6 +924,9 @@ class KeepClient(object):
         self.misses_counter = Counter()
         self._storage_classes_unsupported_warning = False
         self._default_classes = []
         self.misses_counter = Counter()
         self._storage_classes_unsupported_warning = False
         self._default_classes = []
+        self.num_prefetch_threads = num_prefetch_threads or 2
+        self._prefetch_queue = None
+        self._prefetch_threads = None
 
         if local_store:
             self.local_store = local_store
 
         if local_store:
             self.local_store = local_store
@@ -1034,9 +1118,10 @@ class KeepClient(object):
         else:
             return None
 
         else:
             return None
 
-    def get_from_cache(self, loc):
+    def get_from_cache(self, loc_s):
         """Fetch a block only if is in the cache, otherwise return None."""
         """Fetch a block only if is in the cache, otherwise return None."""
-        slot = self.block_cache.get(loc)
+        locator = KeepLocator(loc_s)
+        slot = self.block_cache.get(locator.md5sum)
         if slot is not None and slot.ready.is_set():
             return slot.get()
         else:
         if slot is not None and slot.ready.is_set():
             return slot.get()
         else:
@@ -1055,7 +1140,7 @@ class KeepClient(object):
     def get(self, loc_s, **kwargs):
         return self._get_or_head(loc_s, method="GET", **kwargs)
 
     def get(self, loc_s, **kwargs):
         return self._get_or_head(loc_s, method="GET", **kwargs)
 
-    def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None):
+    def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
         """Get data from Keep.
 
         This method fetches one or more blocks of data from Keep.  It
         """Get data from Keep.
 
         This method fetches one or more blocks of data from Keep.  It
@@ -1080,28 +1165,52 @@ class KeepClient(object):
 
         self.get_counter.add(1)
 
 
         self.get_counter.add(1)
 
+        request_id = (request_id or
+                      (hasattr(self, 'api_client') and self.api_client.request_id) or
+                      arvados.util.new_request_id())
+        if headers is None:
+            headers = {}
+        headers['X-Request-Id'] = request_id
+
         slot = None
         blob = None
         try:
             locator = KeepLocator(loc_s)
             if method == "GET":
         slot = None
         blob = None
         try:
             locator = KeepLocator(loc_s)
             if method == "GET":
-                slot, first = self.block_cache.reserve_cache(locator.md5sum)
-                if not first:
-                    self.hits_counter.add(1)
+                while slot is None:
+                    slot, first = self.block_cache.reserve_cache(locator.md5sum)
+                    if first:
+                        # Fresh and empty "first time it is used" slot
+                        break
+                    if prefetch:
+                        # this is request for a prefetch to fill in
+                        # the cache, don't need to wait for the
+                        # result, so if it is already in flight return
+                        # immediately.  Clear 'slot' to prevent
+                        # finally block from calling slot.set()
+                        slot = None
+                        return None
+
                     blob = slot.get()
                     blob = slot.get()
-                    if blob is None:
-                        raise arvados.errors.KeepReadError(
-                            "failed to read {}".format(loc_s))
-                    return blob
+                    if blob is not None:
+                        self.hits_counter.add(1)
+                        return blob
+
+                    # If blob is None, this means either
+                    #
+                    # (a) another thread was fetching this block and
+                    # failed with an error or
+                    #
+                    # (b) cache thrashing caused the slot to be
+                    # evicted (content set to None) by another thread
+                    # between the call to reserve_cache() and get().
+                    #
+                    # We'll handle these cases by reserving a new slot
+                    # and then doing a full GET request.
+                    slot = None
 
             self.misses_counter.add(1)
 
 
             self.misses_counter.add(1)
 
-            if headers is None:
-                headers = {}
-            headers['X-Request-Id'] = (request_id or
-                                        (hasattr(self, 'api_client') and self.api_client.request_id) or
-                                        arvados.util.new_request_id())
-
             # If the locator has hints specifying a prefix (indicating a
             # remote keepproxy) or the UUID of a local gateway service,
             # read data from the indicated service(s) instead of the usual
             # If the locator has hints specifying a prefix (indicating a
             # remote keepproxy) or the UUID of a local gateway service,
             # read data from the indicated service(s) instead of the usual
@@ -1160,8 +1269,7 @@ class KeepClient(object):
                 return blob
         finally:
             if slot is not None:
                 return blob
         finally:
             if slot is not None:
-                slot.set(blob)
-                self.block_cache.cap_cache()
+                self.block_cache.set(slot, blob)
 
         # Q: Including 403 is necessary for the Keep tests to continue
         # passing, but maybe they should expect KeepReadError instead?
 
         # Q: Including 403 is necessary for the Keep tests to continue
         # passing, but maybe they should expect KeepReadError instead?
@@ -1171,14 +1279,14 @@ class KeepClient(object):
                           for key in sorted_roots)
         if not roots_map:
             raise arvados.errors.KeepReadError(
                           for key in sorted_roots)
         if not roots_map:
             raise arvados.errors.KeepReadError(
-                "failed to read {}: no Keep services available ({})".format(
-                    loc_s, loop.last_result()))
+                "[{}] failed to read {}: no Keep services available ({})".format(
+                    request_id, loc_s, loop.last_result()))
         elif not_founds == len(sorted_roots):
             raise arvados.errors.NotFoundError(
         elif not_founds == len(sorted_roots):
             raise arvados.errors.NotFoundError(
-                "{} not found".format(loc_s), service_errors)
+                "[{}] {} not found".format(request_id, loc_s), service_errors)
         else:
             raise arvados.errors.KeepReadError(
         else:
             raise arvados.errors.KeepReadError(
-                "failed to read {} after {}".format(loc_s, loop.attempts_str()), service_errors, label="service")
+                "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
 
     @retry.retry_method
     def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
 
     @retry.retry_method
     def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
@@ -1215,10 +1323,11 @@ class KeepClient(object):
             return loc_s
         locator = KeepLocator(loc_s)
 
             return loc_s
         locator = KeepLocator(loc_s)
 
+        request_id = (request_id or
+                      (hasattr(self, 'api_client') and self.api_client.request_id) or
+                      arvados.util.new_request_id())
         headers = {
         headers = {
-            'X-Request-Id': (request_id or
-                             (hasattr(self, 'api_client') and self.api_client.request_id) or
-                             arvados.util.new_request_id()),
+            'X-Request-Id': request_id,
             'X-Keep-Desired-Replicas': str(copies),
         }
         roots_map = {}
             'X-Keep-Desired-Replicas': str(copies),
         }
         roots_map = {}
@@ -1275,15 +1384,60 @@ class KeepClient(object):
             return writer_pool.response()
         if not roots_map:
             raise arvados.errors.KeepWriteError(
             return writer_pool.response()
         if not roots_map:
             raise arvados.errors.KeepWriteError(
-                "failed to write {}: no Keep services available ({})".format(
-                    data_hash, loop.last_result()))
+                "[{}] failed to write {}: no Keep services available ({})".format(
+                    request_id, data_hash, loop.last_result()))
         else:
             service_errors = ((key, roots_map[key].last_result()['error'])
                               for key in sorted_roots
                               if roots_map[key].last_result()['error'])
             raise arvados.errors.KeepWriteError(
         else:
             service_errors = ((key, roots_map[key].last_result()['error'])
                               for key in sorted_roots
                               if roots_map[key].last_result()['error'])
             raise arvados.errors.KeepWriteError(
-                "failed to write {} after {} (wanted {} copies but wrote {})".format(
-                    data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
+                "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
+                    request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
+
+    def _block_prefetch_worker(self):
+        """The background downloader thread."""
+        while True:
+            try:
+                b = self._prefetch_queue.get()
+                if b is None:
+                    return
+                self.get(b, prefetch=True)
+            except Exception:
+                _logger.exception("Exception doing block prefetch")
+
+    def _start_prefetch_threads(self):
+        if self._prefetch_threads is None:
+            with self.lock:
+                if self._prefetch_threads is not None:
+                    return
+                self._prefetch_queue = queue.Queue()
+                self._prefetch_threads = []
+                for i in range(0, self.num_prefetch_threads):
+                    thread = threading.Thread(target=self._block_prefetch_worker)
+                    self._prefetch_threads.append(thread)
+                    thread.daemon = True
+                    thread.start()
+
+    def block_prefetch(self, locator):
+        """
+        This relies on the fact that KeepClient implements a block cache,
+        so repeated requests for the same block will not result in repeated
+        downloads (unless the block is evicted from the cache.)  This method
+        does not block.
+        """
+
+        self._start_prefetch_threads()
+        self._prefetch_queue.put(locator)
+
+    def stop_prefetch_threads(self):
+        with self.lock:
+            if self._prefetch_threads is not None:
+                for t in self._prefetch_threads:
+                    self._prefetch_queue.put(None)
+                for t in self._prefetch_threads:
+                    t.join()
+            self._prefetch_threads = None
+            self._prefetch_queue = None
 
     def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
         """A stub for put().
 
     def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
         """A stub for put().
@@ -1328,6 +1482,3 @@ class KeepClient(object):
             return True
         if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
             return True
             return True
         if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
             return True
-
-    def is_cached(self, locator):
-        return self.block_cache.reserve_cache(expect_hash)