20933: Use [0-9] instead of \d in regex
[arvados.git] / sdk / python / arvados / keep.py
index eac25e9d3f65dc57f1e9e9d1fbd3734152800814..4b00f7df8b912d95488d86879e3a29b83c067fec 100644 (file)
@@ -4,6 +4,7 @@
 
 from __future__ import absolute_import
 from __future__ import division
+import copy
 from future import standard_library
 from future.utils import native_str
 standard_library.install_aliases()
@@ -14,6 +15,7 @@ from builtins import object
 import collections
 import datetime
 import hashlib
+import errno
 import io
 import logging
 import math
@@ -25,8 +27,11 @@ import socket
 import ssl
 import sys
 import threading
+import resource
 from . import timer
 import urllib.parse
+import traceback
+import weakref
 
 if sys.version_info >= (3, 0):
     from io import BytesIO
@@ -38,6 +43,8 @@ import arvados.config as config
 import arvados.errors
 import arvados.retry as retry
 import arvados.util
+import arvados.diskcache
+from arvados._pycurlhelper import PyCurlHelper
 
 _logger = logging.getLogger('arvados.keep')
 global_client_object = None
@@ -158,7 +165,6 @@ class Keep(object):
                config.get('ARVADOS_API_TOKEN'),
                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
                config.get('ARVADOS_KEEP_PROXY'),
-               config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
                os.environ.get('KEEP_LOCAL_STORE'))
         if (global_client_object is None) or (cls._last_key != key):
             global_client_object = KeepClient()
@@ -174,11 +180,63 @@ class Keep(object):
         return Keep.global_client_object().put(data, **kwargs)
 
 class KeepBlockCache(object):
-    # Default RAM cache is 256MiB
-    def __init__(self, cache_max=(256 * 1024 * 1024)):
+    def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
         self.cache_max = cache_max
         self._cache = []
         self._cache_lock = threading.Lock()
+        self._max_slots = max_slots
+        self._disk_cache = disk_cache
+        self._disk_cache_dir = disk_cache_dir
+        self._cache_updating = threading.Condition(self._cache_lock)
+
+        if self._disk_cache and self._disk_cache_dir is None:
+            self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
+            os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True)
+
+        if self._max_slots == 0:
+            if self._disk_cache:
+                # Each block uses two file descriptors, one used to
+                # open it initially and hold the flock(), and a second
+                # hidden one used by mmap().
+                #
+                # Set max slots to 1/8 of maximum file handles.  This
+                # means we'll use at most 1/4 of total file handles.
+                #
+                # NOFILE typically defaults to 1024 on Linux so this
+                # is 128 slots (256 file handles), which means we can
+                # cache up to 8 GiB of 64 MiB blocks.  This leaves
+                # 768 file handles for sockets and other stuff.
+                #
+                # When we want the ability to have more cache (e.g. in
+                # arv-mount) we'll increase rlimit before calling
+                # this.
+                self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8)
+            else:
+                # RAM cache slots
+                self._max_slots = 512
+
+        if self.cache_max == 0:
+            if self._disk_cache:
+                fs = os.statvfs(self._disk_cache_dir)
+                # Calculation of available space incorporates existing cache usage
+                existing_usage = arvados.diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
+                avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
+                maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
+                # pick smallest of:
+                # 10% of total disk size
+                # 25% of available space
+                # max_slots * 64 MiB
+                self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
+            else:
+                # 256 MiB in RAM
+                self.cache_max = (256 * 1024 * 1024)
+
+        self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
+
+        if self._disk_cache:
+            self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
+            self.cap_cache()
+
 
     class CacheSlot(object):
         __slots__ = ("locator", "ready", "content")
@@ -202,19 +260,51 @@ class KeepBlockCache(object):
             else:
                 return len(self.content)
 
+        def evict(self):
+            self.content = None
+            return self.gone()
+
+        def gone(self):
+            return (self.content is None)
+
+    def _resize_cache(self, cache_max, max_slots):
+        # Try and make sure the contents of the cache do not exceed
+        # the supplied maximums.
+
+        # Select all slots except those where ready.is_set() and content is
+        # None (that means there was an error reading the block).
+        self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
+        sm = sum([slot.size() for slot in self._cache])
+        while len(self._cache) > 0 and (sm > cache_max or len(self._cache) > max_slots):
+            for i in range(len(self._cache)-1, -1, -1):
+                # start from the back, find a slot that is a candidate to evict
+                if self._cache[i].ready.is_set():
+                    sz = self._cache[i].size()
+
+                    # If evict returns false it means the
+                    # underlying disk cache couldn't lock the file
+                    # for deletion because another process was using
+                    # it. Don't count it as reducing the amount
+                    # of data in the cache, find something else to
+                    # throw out.
+                    if self._cache[i].evict():
+                        sm -= sz
+
+                    # check to make sure the underlying data is gone
+                    if self._cache[i].gone():
+                        # either way we forget about it.  either the
+                        # other process will delete it, or if we need
+                        # it again and it is still there, we'll find
+                        # it on disk.
+                        del self._cache[i]
+                    break
+
+
     def cap_cache(self):
         '''Cap the cache size to self.cache_max'''
-        with self._cache_lock:
-            # Select all slots except those where ready.is_set() and content is
-            # None (that means there was an error reading the block).
-            self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
-            sm = sum([slot.size() for slot in self._cache])
-            while len(self._cache) > 0 and sm > self.cache_max:
-                for i in range(len(self._cache)-1, -1, -1):
-                    if self._cache[i].ready.is_set():
-                        del self._cache[i]
-                        break
-                sm = sum([slot.size() for slot in self._cache])
+        with self._cache_updating:
+            self._resize_cache(self.cache_max, self._max_slots)
+            self._cache_updating.notify_all()
 
     def _get(self, locator):
         # Test if the locator is already in the cache
@@ -226,6 +316,12 @@ class KeepBlockCache(object):
                     del self._cache[i]
                     self._cache.insert(0, n)
                 return n
+        if self._disk_cache:
+            # see if it exists on disk
+            n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
+            if n is not None:
+                self._cache.insert(0, n)
+                return n
         return None
 
     def get(self, locator):
@@ -235,16 +331,66 @@ class KeepBlockCache(object):
     def reserve_cache(self, locator):
         '''Reserve a cache slot for the specified locator,
         or return the existing slot.'''
-        with self._cache_lock:
+        with self._cache_updating:
             n = self._get(locator)
             if n:
                 return n, False
             else:
                 # Add a new cache slot for the locator
-                n = KeepBlockCache.CacheSlot(locator)
+                self._resize_cache(self.cache_max, self._max_slots-1)
+                while len(self._cache) >= self._max_slots:
+                    # If there isn't a slot available, need to wait
+                    # for something to happen that releases one of the
+                    # cache slots.  Idle for 200 ms or woken up by
+                    # another thread
+                    self._cache_updating.wait(timeout=0.2)
+                    self._resize_cache(self.cache_max, self._max_slots-1)
+
+                if self._disk_cache:
+                    n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
+                else:
+                    n = KeepBlockCache.CacheSlot(locator)
                 self._cache.insert(0, n)
                 return n, True
 
+    def set(self, slot, blob):
+        try:
+            slot.set(blob)
+            return
+        except OSError as e:
+            if e.errno == errno.ENOMEM:
+                # Reduce max slots to current - 4, cap cache and retry
+                with self._cache_lock:
+                    self._max_slots = max(4, len(self._cache) - 4)
+            elif e.errno == errno.ENOSPC:
+                # Reduce disk max space to current - 256 MiB, cap cache and retry
+                with self._cache_lock:
+                    sm = sum([st.size() for st in self._cache])
+                    self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
+            elif e.errno == errno.ENODEV:
+                _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
+        except Exception as e:
+            pass
+        finally:
+            # Check if we should evict things from the cache.  Either
+            # because we added a new thing or there was an error and
+            # we possibly adjusted the limits down, so we might need
+            # to push something out.
+            self.cap_cache()
+
+        try:
+            # Only gets here if there was an error the first time. The
+            # exception handler adjusts limits downward in some cases
+            # to free up resources, which would make the operation
+            # succeed.
+            slot.set(blob)
+        except Exception as e:
+            # It failed again.  Give up.
+            slot.set(None)
+            raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
+
+        self.cap_cache()
+
 class Counter(object):
     def __init__(self, v=0):
         self._lk = threading.Lock()
@@ -260,18 +406,10 @@ class Counter(object):
 
 
 class KeepClient(object):
+    DEFAULT_TIMEOUT = PyCurlHelper.DEFAULT_TIMEOUT
+    DEFAULT_PROXY_TIMEOUT = PyCurlHelper.DEFAULT_PROXY_TIMEOUT
 
-    # Default Keep server connection timeout:  2 seconds
-    # Default Keep server read timeout:       256 seconds
-    # Default Keep server bandwidth minimum:  32768 bytes per second
-    # Default Keep proxy connection timeout:  20 seconds
-    # Default Keep proxy read timeout:        256 seconds
-    # Default Keep proxy bandwidth minimum:   32768 bytes per second
-    DEFAULT_TIMEOUT = (2, 256, 32768)
-    DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
-
-
-    class KeepService(object):
+    class KeepService(PyCurlHelper):
         """Make requests to a single Keep service, and track results.
 
         A KeepService is intended to last long enough to perform one
@@ -294,6 +432,7 @@ class KeepClient(object):
                      download_counter=None,
                      headers={},
                      insecure=False):
+            super(KeepClient.KeepService, self).__init__()
             self.root = root
             self._user_agent_pool = user_agent_pool
             self._result = {'error': None}
@@ -331,30 +470,6 @@ class KeepClient(object):
             except:
                 ua.close()
 
-        def _socket_open(self, *args, **kwargs):
-            if len(args) + len(kwargs) == 2:
-                return self._socket_open_pycurl_7_21_5(*args, **kwargs)
-            else:
-                return self._socket_open_pycurl_7_19_3(*args, **kwargs)
-
-        def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
-            return self._socket_open_pycurl_7_21_5(
-                purpose=None,
-                address=collections.namedtuple(
-                    'Address', ['family', 'socktype', 'protocol', 'addr'],
-                )(family, socktype, protocol, address))
-
-        def _socket_open_pycurl_7_21_5(self, purpose, address):
-            """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
-            s = socket.socket(address.family, address.socktype, address.protocol)
-            s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
-            # Will throw invalid protocol error on mac. This test prevents that.
-            if hasattr(socket, 'TCP_KEEPIDLE'):
-                s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
-            s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
-            self._socket = s
-            return s
-
         def get(self, locator, method="GET", timeout=None):
             # locator is a KeepLocator object.
             url = self.root + str(locator)
@@ -375,10 +490,13 @@ class KeepClient(object):
                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
                     if self.insecure:
                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
+                        curl.setopt(pycurl.SSL_VERIFYHOST, 0)
                     else:
                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
                     if method == "HEAD":
                         curl.setopt(pycurl.NOBODY, True)
+                    else:
+                        curl.setopt(pycurl.HTTPGET, True)
                     self._setcurltimeouts(curl, timeout, method=="HEAD")
 
                     try:
@@ -446,7 +564,9 @@ class KeepClient(object):
                 return None
             return self._result['body']
 
-        def put(self, hash_s, body, timeout=None):
+        def put(self, hash_s, body, timeout=None, headers={}):
+            put_headers = copy.copy(self.put_headers)
+            put_headers.update(headers)
             url = self.root + hash_s
             _logger.debug("Request: PUT %s", url)
             curl = self._get_user_agent()
@@ -470,11 +590,12 @@ class KeepClient(object):
                     curl.setopt(pycurl.INFILESIZE, len(body))
                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
                     curl.setopt(pycurl.HTTPHEADER, [
-                        '{}: {}'.format(k,v) for k,v in self.put_headers.items()])
+                        '{}: {}'.format(k,v) for k,v in put_headers.items()])
                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
                     if self.insecure:
                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
+                        curl.setopt(pycurl.SSL_VERIFYHOST, 0)
                     else:
                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
                     self._setcurltimeouts(curl, timeout)
@@ -520,43 +641,6 @@ class KeepClient(object):
                 self.upload_counter.add(len(body))
             return True
 
-        def _setcurltimeouts(self, curl, timeouts, ignore_bandwidth=False):
-            if not timeouts:
-                return
-            elif isinstance(timeouts, tuple):
-                if len(timeouts) == 2:
-                    conn_t, xfer_t = timeouts
-                    bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
-                else:
-                    conn_t, xfer_t, bandwidth_bps = timeouts
-            else:
-                conn_t, xfer_t = (timeouts, timeouts)
-                bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
-            curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
-            if not ignore_bandwidth:
-                curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
-                curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
-
-        def _headerfunction(self, header_line):
-            if isinstance(header_line, bytes):
-                header_line = header_line.decode('iso-8859-1')
-            if ':' in header_line:
-                name, value = header_line.split(':', 1)
-                name = name.strip().lower()
-                value = value.strip()
-            elif self._headers:
-                name = self._lastheadername
-                value = self._headers[name] + ' ' + header_line.strip()
-            elif header_line.startswith('HTTP/'):
-                name = 'x-status-line'
-                value = header_line
-            else:
-                _logger.error("Unexpected header line: %s", header_line)
-                return
-            self._lastheadername = name
-            self._headers[name] = value
-            # Returning None implies all bytes were written
-
 
     class KeepWriterQueue(queue.Queue):
         def __init__(self, copies, classes=[]):
@@ -566,18 +650,23 @@ class KeepClient(object):
             self.successful_copies = 0
             self.confirmed_storage_classes = {}
             self.response = None
-            self.queue_data_lock = threading.Lock()
-            self.pending_tries = copies
+            self.storage_classes_tracking = True
+            self.queue_data_lock = threading.RLock()
+            self.pending_tries = max(copies, len(classes))
             self.pending_tries_notification = threading.Condition()
 
         def write_success(self, response, replicas_nr, classes_confirmed):
             with self.queue_data_lock:
                 self.successful_copies += replicas_nr
-                for st_class, st_copies in classes_confirmed.items():
-                    try:
-                        self.confirmed_storage_classes[st_class] += st_copies
-                    except KeyError:
-                        self.confirmed_storage_classes[st_class] = st_copies
+                if classes_confirmed is None:
+                    self.storage_classes_tracking = False
+                elif self.storage_classes_tracking:
+                    for st_class, st_copies in classes_confirmed.items():
+                        try:
+                            self.confirmed_storage_classes[st_class] += st_copies
+                        except KeyError:
+                            self.confirmed_storage_classes[st_class] = st_copies
+                    self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
                 self.response = response
             with self.pending_tries_notification:
                 self.pending_tries_notification.notify_all()
@@ -591,12 +680,22 @@ class KeepClient(object):
             with self.queue_data_lock:
                 return self.wanted_copies - self.successful_copies
 
+        def satisfied_classes(self):
+            with self.queue_data_lock:
+                if not self.storage_classes_tracking:
+                    # Notifies disabled storage classes expectation to
+                    # the outer loop.
+                    return None
+            return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
+
         def pending_classes(self):
             with self.queue_data_lock:
-                unsatisfied_classes = []
+                if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
+                    return []
+                unsatisfied_classes = copy.copy(self.wanted_storage_classes)
                 for st_class, st_copies in self.confirmed_storage_classes.items():
-                    if st_class in self.wanted_storage_classes and st_copies < self.wanted_copies:
-                        unsatisfied_classes.append(st_class)
+                    if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
+                        unsatisfied_classes.remove(st_class)
                 return unsatisfied_classes
 
         def get_next_task(self):
@@ -614,7 +713,7 @@ class KeepClient(object):
                         while True:
                             self.get_nowait()
                             self.task_done()
-                    elif self.pending_tries > 0 or len(self.pending_classes()) > 0:
+                    elif self.pending_tries > 0:
                         service, service_root = self.get_nowait()
                         if service.finished():
                             self.task_done()
@@ -648,7 +747,7 @@ class KeepClient(object):
             self.total_task_nr += 1
 
         def done(self):
-            return self.queue.successful_copies
+            return self.queue.successful_copies, self.queue.satisfied_classes()
 
         def join(self):
             # Start workers
@@ -690,17 +789,23 @@ class KeepClient(object):
                     self.queue.task_done()
 
         def do_task(self, service, service_root):
+            classes = self.queue.pending_classes()
+            headers = {}
+            if len(classes) > 0:
+                classes.sort()
+                headers['X-Keep-Storage-Classes'] = ', '.join(classes)
             success = bool(service.put(self.data_hash,
                                         self.data,
-                                        timeout=self.timeout))
+                                        timeout=self.timeout,
+                                        headers=headers))
             result = service.last_result()
 
             if not success:
-                if result.get('status_code', None):
+                if result.get('status_code'):
                     _logger.debug("Request fail: PUT %s => %s %s",
                                   self.data_hash,
-                                  result['status_code'],
-                                  result['body'])
+                                  result.get('status_code'),
+                                  result.get('body'))
                 raise self.TaskFailed()
 
             _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
@@ -722,7 +827,7 @@ class KeepClient(object):
                         classes_confirmed[stored_class] = int(stored_copies)
             except (KeyError, ValueError):
                 # Storage classes confirmed header missing or corrupt
-                classes_confirmed = {}
+                classes_confirmed = None
 
             return result['body'].strip(), replicas_stored, classes_confirmed
 
@@ -730,7 +835,7 @@ class KeepClient(object):
     def __init__(self, api_client=None, proxy=None,
                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
                  api_token=None, local_store=None, block_cache=None,
-                 num_retries=0, session=None):
+                 num_retries=10, session=None, num_prefetch_threads=None):
         """Initialize a new KeepClient.
 
         Arguments:
@@ -783,7 +888,7 @@ class KeepClient(object):
         :num_retries:
           The default number of times to retry failed requests.
           This will be used as the default num_retries value when get() and
-          put() are called.  Default 0.
+          put() are called.  Default 10.
         """
         self.lock = threading.Lock()
         if proxy is None:
@@ -817,6 +922,11 @@ class KeepClient(object):
         self.get_counter = Counter()
         self.hits_counter = Counter()
         self.misses_counter = Counter()
+        self._storage_classes_unsupported_warning = False
+        self._default_classes = []
+        self.num_prefetch_threads = num_prefetch_threads or 2
+        self._prefetch_queue = None
+        self._prefetch_threads = None
 
         if local_store:
             self.local_store = local_store
@@ -857,6 +967,12 @@ class KeepClient(object):
                 self._writable_services = None
                 self.using_proxy = None
                 self._static_services_list = False
+                try:
+                    self._default_classes = [
+                        k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
+                except KeyError:
+                    # We're talking to an old cluster
+                    pass
 
     def current_timeout(self, attempt_number):
         """Return the appropriate timeout to use for this client.
@@ -1002,9 +1118,10 @@ class KeepClient(object):
         else:
             return None
 
-    def get_from_cache(self, loc):
+    def get_from_cache(self, loc_s):
         """Fetch a block only if is in the cache, otherwise return None."""
-        slot = self.block_cache.get(loc)
+        locator = KeepLocator(loc_s)
+        slot = self.block_cache.get(locator.md5sum)
         if slot is not None and slot.ready.is_set():
             return slot.get()
         else:
@@ -1023,7 +1140,7 @@ class KeepClient(object):
     def get(self, loc_s, **kwargs):
         return self._get_or_head(loc_s, method="GET", **kwargs)
 
-    def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None):
+    def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
         """Get data from Keep.
 
         This method fetches one or more blocks of data from Keep.  It
@@ -1048,28 +1165,52 @@ class KeepClient(object):
 
         self.get_counter.add(1)
 
+        request_id = (request_id or
+                      (hasattr(self, 'api_client') and self.api_client.request_id) or
+                      arvados.util.new_request_id())
+        if headers is None:
+            headers = {}
+        headers['X-Request-Id'] = request_id
+
         slot = None
         blob = None
         try:
             locator = KeepLocator(loc_s)
             if method == "GET":
-                slot, first = self.block_cache.reserve_cache(locator.md5sum)
-                if not first:
-                    self.hits_counter.add(1)
+                while slot is None:
+                    slot, first = self.block_cache.reserve_cache(locator.md5sum)
+                    if first:
+                        # Fresh and empty "first time it is used" slot
+                        break
+                    if prefetch:
+                        # this is request for a prefetch to fill in
+                        # the cache, don't need to wait for the
+                        # result, so if it is already in flight return
+                        # immediately.  Clear 'slot' to prevent
+                        # finally block from calling slot.set()
+                        slot = None
+                        return None
+
                     blob = slot.get()
-                    if blob is None:
-                        raise arvados.errors.KeepReadError(
-                            "failed to read {}".format(loc_s))
-                    return blob
+                    if blob is not None:
+                        self.hits_counter.add(1)
+                        return blob
+
+                    # If blob is None, this means either
+                    #
+                    # (a) another thread was fetching this block and
+                    # failed with an error or
+                    #
+                    # (b) cache thrashing caused the slot to be
+                    # evicted (content set to None) by another thread
+                    # between the call to reserve_cache() and get().
+                    #
+                    # We'll handle these cases by reserving a new slot
+                    # and then doing a full GET request.
+                    slot = None
 
             self.misses_counter.add(1)
 
-            if headers is None:
-                headers = {}
-            headers['X-Request-Id'] = (request_id or
-                                        (hasattr(self, 'api_client') and self.api_client.request_id) or
-                                        arvados.util.new_request_id())
-
             # If the locator has hints specifying a prefix (indicating a
             # remote keepproxy) or the UUID of a local gateway service,
             # read data from the indicated service(s) instead of the usual
@@ -1128,8 +1269,7 @@ class KeepClient(object):
                 return blob
         finally:
             if slot is not None:
-                slot.set(blob)
-                self.block_cache.cap_cache()
+                self.block_cache.set(slot, blob)
 
         # Q: Including 403 is necessary for the Keep tests to continue
         # passing, but maybe they should expect KeepReadError instead?
@@ -1139,17 +1279,17 @@ class KeepClient(object):
                           for key in sorted_roots)
         if not roots_map:
             raise arvados.errors.KeepReadError(
-                "failed to read {}: no Keep services available ({})".format(
-                    loc_s, loop.last_result()))
+                "[{}] failed to read {}: no Keep services available ({})".format(
+                    request_id, loc_s, loop.last_result()))
         elif not_founds == len(sorted_roots):
             raise arvados.errors.NotFoundError(
-                "{} not found".format(loc_s), service_errors)
+                "[{}] {} not found".format(request_id, loc_s), service_errors)
         else:
             raise arvados.errors.KeepReadError(
-                "failed to read {} after {}".format(loc_s, loop.attempts_str()), service_errors, label="service")
+                "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
 
     @retry.retry_method
-    def put(self, data, copies=2, num_retries=None, request_id=None, classes=[]):
+    def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
         """Save data in Keep.
 
         This method will get a list of Keep services from the API server, and
@@ -1170,6 +1310,8 @@ class KeepClient(object):
           be written.
         """
 
+        classes = classes or self._default_classes
+
         if not isinstance(data, bytes):
             data = data.encode()
 
@@ -1181,18 +1323,18 @@ class KeepClient(object):
             return loc_s
         locator = KeepLocator(loc_s)
 
+        request_id = (request_id or
+                      (hasattr(self, 'api_client') and self.api_client.request_id) or
+                      arvados.util.new_request_id())
         headers = {
-            'X-Request-Id': (request_id or
-                             (hasattr(self, 'api_client') and self.api_client.request_id) or
-                             arvados.util.new_request_id()),
+            'X-Request-Id': request_id,
             'X-Keep-Desired-Replicas': str(copies),
         }
-        if len(classes) > 0:
-            headers['X-Keep-Storage-Classes'] = ', '.join(classes)
         roots_map = {}
         loop = retry.RetryLoop(num_retries, self._check_loop_result,
                                backoff_start=2)
-        done = 0
+        done_copies = 0
+        done_classes = []
         for tries_left in loop:
             try:
                 sorted_roots = self.map_new_services(
@@ -1204,36 +1346,100 @@ class KeepClient(object):
                 loop.save_result(error)
                 continue
 
+            pending_classes = []
+            if done_classes is not None:
+                pending_classes = list(set(classes) - set(done_classes))
             writer_pool = KeepClient.KeepWriterThreadPool(data=data,
                                                         data_hash=data_hash,
-                                                        copies=copies - done,
+                                                        copies=copies - done_copies,
                                                         max_service_replicas=self.max_replicas_per_service,
                                                         timeout=self.current_timeout(num_retries - tries_left),
-                                                        classes=classes)
+                                                        classes=pending_classes)
             for service_root, ks in [(root, roots_map[root])
                                      for root in sorted_roots]:
                 if ks.finished():
                     continue
                 writer_pool.add_task(ks, service_root)
             writer_pool.join()
-            done += writer_pool.done()
-            loop.save_result((done >= copies, writer_pool.total_task_nr))
+            pool_copies, pool_classes = writer_pool.done()
+            done_copies += pool_copies
+            if (done_classes is not None) and (pool_classes is not None):
+                done_classes += pool_classes
+                loop.save_result(
+                    (done_copies >= copies and set(done_classes) == set(classes),
+                    writer_pool.total_task_nr))
+            else:
+                # Old keepstore contacted without storage classes support:
+                # success is determined only by successful copies.
+                #
+                # Disable storage classes tracking from this point forward.
+                if not self._storage_classes_unsupported_warning:
+                    self._storage_classes_unsupported_warning = True
+                    _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
+                done_classes = None
+                loop.save_result(
+                    (done_copies >= copies, writer_pool.total_task_nr))
 
         if loop.success():
             return writer_pool.response()
         if not roots_map:
             raise arvados.errors.KeepWriteError(
-                "failed to write {}: no Keep services available ({})".format(
-                    data_hash, loop.last_result()))
+                "[{}] failed to write {}: no Keep services available ({})".format(
+                    request_id, data_hash, loop.last_result()))
         else:
             service_errors = ((key, roots_map[key].last_result()['error'])
                               for key in sorted_roots
                               if roots_map[key].last_result()['error'])
             raise arvados.errors.KeepWriteError(
-                "failed to write {} after {} (wanted {} copies but wrote {})".format(
-                    data_hash, loop.attempts_str(), copies, writer_pool.done()), service_errors, label="service")
+                "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
+                    request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
 
-    def local_store_put(self, data, copies=1, num_retries=None):
+    def _block_prefetch_worker(self):
+        """The background downloader thread."""
+        while True:
+            try:
+                b = self._prefetch_queue.get()
+                if b is None:
+                    return
+                self.get(b, prefetch=True)
+            except Exception:
+                _logger.exception("Exception doing block prefetch")
+
+    def _start_prefetch_threads(self):
+        if self._prefetch_threads is None:
+            with self.lock:
+                if self._prefetch_threads is not None:
+                    return
+                self._prefetch_queue = queue.Queue()
+                self._prefetch_threads = []
+                for i in range(0, self.num_prefetch_threads):
+                    thread = threading.Thread(target=self._block_prefetch_worker)
+                    self._prefetch_threads.append(thread)
+                    thread.daemon = True
+                    thread.start()
+
+    def block_prefetch(self, locator):
+        """
+        This relies on the fact that KeepClient implements a block cache,
+        so repeated requests for the same block will not result in repeated
+        downloads (unless the block is evicted from the cache.)  This method
+        does not block.
+        """
+
+        self._start_prefetch_threads()
+        self._prefetch_queue.put(locator)
+
+    def stop_prefetch_threads(self):
+        with self.lock:
+            if self._prefetch_threads is not None:
+                for t in self._prefetch_threads:
+                    self._prefetch_queue.put(None)
+                for t in self._prefetch_threads:
+                    t.join()
+            self._prefetch_threads = None
+            self._prefetch_queue = None
+
+    def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
         """A stub for put().
 
         This method is used in place of the real put() method when
@@ -1276,6 +1482,3 @@ class KeepClient(object):
             return True
         if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
             return True
-
-    def is_cached(self, locator):
-        return self.block_cache.reserve_cache(expect_hash)