18842: Clean up keep cache set logic a little more
[arvados.git] / sdk / python / arvados / keep.py
index 9618ee5ce9ac5551931093b24695514379e1376f..8dbad2abb18aeb99a39ab896490751651c45c6b3 100644 (file)
@@ -4,6 +4,7 @@
 
 from __future__ import absolute_import
 from __future__ import division
 
 from __future__ import absolute_import
 from __future__ import division
+import copy
 from future import standard_library
 from future.utils import native_str
 standard_library.install_aliases()
 from future import standard_library
 from future.utils import native_str
 standard_library.install_aliases()
@@ -14,6 +15,7 @@ from builtins import object
 import collections
 import datetime
 import hashlib
 import collections
 import datetime
 import hashlib
+import errno
 import io
 import logging
 import math
 import io
 import logging
 import math
@@ -25,8 +27,10 @@ import socket
 import ssl
 import sys
 import threading
 import ssl
 import sys
 import threading
+import resource
 from . import timer
 import urllib.parse
 from . import timer
 import urllib.parse
+import traceback
 
 if sys.version_info >= (3, 0):
     from io import BytesIO
 
 if sys.version_info >= (3, 0):
     from io import BytesIO
@@ -38,6 +42,7 @@ import arvados.config as config
 import arvados.errors
 import arvados.retry as retry
 import arvados.util
 import arvados.errors
 import arvados.retry as retry
 import arvados.util
+import arvados.diskcache
 
 _logger = logging.getLogger('arvados.keep')
 global_client_object = None
 
 _logger = logging.getLogger('arvados.keep')
 global_client_object = None
@@ -158,7 +163,6 @@ class Keep(object):
                config.get('ARVADOS_API_TOKEN'),
                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
                config.get('ARVADOS_KEEP_PROXY'),
                config.get('ARVADOS_API_TOKEN'),
                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
                config.get('ARVADOS_KEEP_PROXY'),
-               config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
                os.environ.get('KEEP_LOCAL_STORE'))
         if (global_client_object is None) or (cls._last_key != key):
             global_client_object = KeepClient()
                os.environ.get('KEEP_LOCAL_STORE'))
         if (global_client_object is None) or (cls._last_key != key):
             global_client_object = KeepClient()
@@ -174,11 +178,50 @@ class Keep(object):
         return Keep.global_client_object().put(data, **kwargs)
 
 class KeepBlockCache(object):
         return Keep.global_client_object().put(data, **kwargs)
 
 class KeepBlockCache(object):
-    # Default RAM cache is 256MiB
-    def __init__(self, cache_max=(256 * 1024 * 1024)):
+    def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
         self.cache_max = cache_max
         self._cache = []
         self._cache_lock = threading.Lock()
         self.cache_max = cache_max
         self._cache = []
         self._cache_lock = threading.Lock()
+        self._max_slots = max_slots
+        self._disk_cache = disk_cache
+        self._disk_cache_dir = disk_cache_dir
+
+        if self._disk_cache and self._disk_cache_dir is None:
+            self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
+            os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True)
+
+        if self._max_slots == 0:
+            if self._disk_cache:
+                # default max slots to half of maximum file handles
+                # NOFILE typically defaults to 1024 on Linux so this
+                # will be 512 slots.
+                self._max_slots = resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 2
+            else:
+                # RAM cache slots
+                self._max_slots = 512
+
+        if self.cache_max == 0:
+            if self._disk_cache:
+                fs = os.statvfs(self._disk_cache_dir)
+                # Calculation of available space incorporates existing cache usage
+                existing_usage = arvados.diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
+                avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
+                maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
+                # pick smallest of:
+                # 10% of total disk size
+                # 25% of available space
+                # max_slots * 64 MiB
+                self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
+            else:
+                # 256 MiB in RAM
+                self.cache_max = (256 * 1024 * 1024)
+
+        self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
+
+        if self._disk_cache:
+            self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
+            self.cap_cache()
+
 
     class CacheSlot(object):
         __slots__ = ("locator", "ready", "content")
 
     class CacheSlot(object):
         __slots__ = ("locator", "ready", "content")
@@ -202,6 +245,9 @@ class KeepBlockCache(object):
             else:
                 return len(self.content)
 
             else:
                 return len(self.content)
 
+        def evict(self):
+            return True
+
     def cap_cache(self):
         '''Cap the cache size to self.cache_max'''
         with self._cache_lock:
     def cap_cache(self):
         '''Cap the cache size to self.cache_max'''
         with self._cache_lock:
@@ -209,12 +255,27 @@ class KeepBlockCache(object):
             # None (that means there was an error reading the block).
             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
             sm = sum([slot.size() for slot in self._cache])
             # None (that means there was an error reading the block).
             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
             sm = sum([slot.size() for slot in self._cache])
-            while len(self._cache) > 0 and sm > self.cache_max:
+            while len(self._cache) > 0 and (sm > self.cache_max or len(self._cache) > self._max_slots):
                 for i in range(len(self._cache)-1, -1, -1):
                 for i in range(len(self._cache)-1, -1, -1):
+                    # start from the back, find a slot that is a candidate to evict
                     if self._cache[i].ready.is_set():
                     if self._cache[i].ready.is_set():
+                        sz = self._cache[i].size()
+
+                        # If evict returns false it means the
+                        # underlying disk cache couldn't lock the file
+                        # for deletion because another process was using
+                        # it. Don't count it as reducing the amount
+                        # of data in the cache, find something else to
+                        # throw out.
+                        if self._cache[i].evict():
+                            sm -= sz
+
+                        # either way we forget about it.  either the
+                        # other process will delete it, or if we need
+                        # it again and it is still there, we'll find
+                        # it on disk.
                         del self._cache[i]
                         break
                         del self._cache[i]
                         break
-                sm = sum([slot.size() for slot in self._cache])
 
     def _get(self, locator):
         # Test if the locator is already in the cache
 
     def _get(self, locator):
         # Test if the locator is already in the cache
@@ -226,6 +287,12 @@ class KeepBlockCache(object):
                     del self._cache[i]
                     self._cache.insert(0, n)
                 return n
                     del self._cache[i]
                     self._cache.insert(0, n)
                 return n
+        if self._disk_cache:
+            # see if it exists on disk
+            n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
+            if n is not None:
+                self._cache.insert(0, n)
+                return n
         return None
 
     def get(self, locator):
         return None
 
     def get(self, locator):
@@ -241,10 +308,54 @@ class KeepBlockCache(object):
                 return n, False
             else:
                 # Add a new cache slot for the locator
                 return n, False
             else:
                 # Add a new cache slot for the locator
-                n = KeepBlockCache.CacheSlot(locator)
+                if self._disk_cache:
+                    n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
+                else:
+                    n = KeepBlockCache.CacheSlot(locator)
                 self._cache.insert(0, n)
                 return n, True
 
                 self._cache.insert(0, n)
                 return n, True
 
+    def set(self, slot, blob):
+        try:
+            slot.set(blob)
+            return
+        except OSError as e:
+            if e.errno == errno.ENOMEM:
+                # Reduce max slots to current - 4, cap cache and retry
+                with self._cache_lock:
+                    self._max_slots = max(4, len(self._cache) - 4)
+            elif e.errno == errno.ENOSPC:
+                # Reduce disk max space to current - 256 MiB, cap cache and retry
+                with self._cache_lock:
+                    sm = sum([st.size() for st in self._cache])
+                    self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
+            elif e.errno == errno.ENODEV:
+                _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
+        except Exception as e:
+            pass
+        finally:
+            # Check if we should evict things from the cache.  Either
+            # because we added a new thing or there was an error and
+            # we possibly adjusted the limits down, so we might need
+            # to push something out.
+            self.cap_cache()
+
+        try:
+            # Only gets here if there was an error the first time. The
+            # exception handler adjusts limits downward in some cases
+            # to free up resources, which would make the operation
+            # succeed.
+            slot.set(blob)
+            self.cap_cache()
+        except Exception as e:
+            # It failed again.  Give up.
+            raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
+        finally:
+            # Set the notice that that we are done with the cache
+            # slot one way or another.
+            slot.ready.set()
+
+
 class Counter(object):
     def __init__(self, v=0):
         self._lk = threading.Lock()
 class Counter(object):
     def __init__(self, v=0):
         self._lk = threading.Lock()
@@ -375,9 +486,12 @@ class KeepClient(object):
                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
                     if self.insecure:
                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
                     if self.insecure:
                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
+                        curl.setopt(pycurl.SSL_VERIFYHOST, 0)
+                    else:
+                        curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
                     if method == "HEAD":
                         curl.setopt(pycurl.NOBODY, True)
                     if method == "HEAD":
                         curl.setopt(pycurl.NOBODY, True)
-                    self._setcurltimeouts(curl, timeout)
+                    self._setcurltimeouts(curl, timeout, method=="HEAD")
 
                     try:
                         curl.perform()
 
                     try:
                         curl.perform()
@@ -444,7 +558,9 @@ class KeepClient(object):
                 return None
             return self._result['body']
 
                 return None
             return self._result['body']
 
-        def put(self, hash_s, body, timeout=None):
+        def put(self, hash_s, body, timeout=None, headers={}):
+            put_headers = copy.copy(self.put_headers)
+            put_headers.update(headers)
             url = self.root + hash_s
             _logger.debug("Request: PUT %s", url)
             curl = self._get_user_agent()
             url = self.root + hash_s
             _logger.debug("Request: PUT %s", url)
             curl = self._get_user_agent()
@@ -468,11 +584,14 @@ class KeepClient(object):
                     curl.setopt(pycurl.INFILESIZE, len(body))
                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
                     curl.setopt(pycurl.HTTPHEADER, [
                     curl.setopt(pycurl.INFILESIZE, len(body))
                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
                     curl.setopt(pycurl.HTTPHEADER, [
-                        '{}: {}'.format(k,v) for k,v in self.put_headers.items()])
+                        '{}: {}'.format(k,v) for k,v in put_headers.items()])
                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
                     if self.insecure:
                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
                     if self.insecure:
                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
+                        curl.setopt(pycurl.SSL_VERIFYHOST, 0)
+                    else:
+                        curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
                     self._setcurltimeouts(curl, timeout)
                     try:
                         curl.perform()
                     self._setcurltimeouts(curl, timeout)
                     try:
                         curl.perform()
@@ -516,7 +635,7 @@ class KeepClient(object):
                 self.upload_counter.add(len(body))
             return True
 
                 self.upload_counter.add(len(body))
             return True
 
-        def _setcurltimeouts(self, curl, timeouts):
+        def _setcurltimeouts(self, curl, timeouts, ignore_bandwidth=False):
             if not timeouts:
                 return
             elif isinstance(timeouts, tuple):
             if not timeouts:
                 return
             elif isinstance(timeouts, tuple):
@@ -529,8 +648,9 @@ class KeepClient(object):
                 conn_t, xfer_t = (timeouts, timeouts)
                 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
                 conn_t, xfer_t = (timeouts, timeouts)
                 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
-            curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
-            curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
+            if not ignore_bandwidth:
+                curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
+                curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
 
         def _headerfunction(self, header_line):
             if isinstance(header_line, bytes):
 
         def _headerfunction(self, header_line):
             if isinstance(header_line, bytes):
@@ -554,18 +674,30 @@ class KeepClient(object):
 
 
     class KeepWriterQueue(queue.Queue):
 
 
     class KeepWriterQueue(queue.Queue):
-        def __init__(self, copies):
+        def __init__(self, copies, classes=[]):
             queue.Queue.__init__(self) # Old-style superclass
             self.wanted_copies = copies
             queue.Queue.__init__(self) # Old-style superclass
             self.wanted_copies = copies
+            self.wanted_storage_classes = classes
             self.successful_copies = 0
             self.successful_copies = 0
+            self.confirmed_storage_classes = {}
             self.response = None
             self.response = None
-            self.successful_copies_lock = threading.Lock()
-            self.pending_tries = copies
+            self.storage_classes_tracking = True
+            self.queue_data_lock = threading.RLock()
+            self.pending_tries = max(copies, len(classes))
             self.pending_tries_notification = threading.Condition()
 
             self.pending_tries_notification = threading.Condition()
 
-        def write_success(self, response, replicas_nr):
-            with self.successful_copies_lock:
+        def write_success(self, response, replicas_nr, classes_confirmed):
+            with self.queue_data_lock:
                 self.successful_copies += replicas_nr
                 self.successful_copies += replicas_nr
+                if classes_confirmed is None:
+                    self.storage_classes_tracking = False
+                elif self.storage_classes_tracking:
+                    for st_class, st_copies in classes_confirmed.items():
+                        try:
+                            self.confirmed_storage_classes[st_class] += st_copies
+                        except KeyError:
+                            self.confirmed_storage_classes[st_class] = st_copies
+                    self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
                 self.response = response
             with self.pending_tries_notification:
                 self.pending_tries_notification.notify_all()
                 self.response = response
             with self.pending_tries_notification:
                 self.pending_tries_notification.notify_all()
@@ -576,13 +708,31 @@ class KeepClient(object):
                 self.pending_tries_notification.notify()
 
         def pending_copies(self):
                 self.pending_tries_notification.notify()
 
         def pending_copies(self):
-            with self.successful_copies_lock:
+            with self.queue_data_lock:
                 return self.wanted_copies - self.successful_copies
 
                 return self.wanted_copies - self.successful_copies
 
+        def satisfied_classes(self):
+            with self.queue_data_lock:
+                if not self.storage_classes_tracking:
+                    # Notifies disabled storage classes expectation to
+                    # the outer loop.
+                    return None
+            return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
+
+        def pending_classes(self):
+            with self.queue_data_lock:
+                if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
+                    return []
+                unsatisfied_classes = copy.copy(self.wanted_storage_classes)
+                for st_class, st_copies in self.confirmed_storage_classes.items():
+                    if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
+                        unsatisfied_classes.remove(st_class)
+                return unsatisfied_classes
+
         def get_next_task(self):
             with self.pending_tries_notification:
                 while True:
         def get_next_task(self):
             with self.pending_tries_notification:
                 while True:
-                    if self.pending_copies() < 1:
+                    if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
                         # This notify_all() is unnecessary --
                         # write_success() already called notify_all()
                         # when pending<1 became true, so it's not
                         # This notify_all() is unnecessary --
                         # write_success() already called notify_all()
                         # when pending<1 became true, so it's not
@@ -609,16 +759,15 @@ class KeepClient(object):
 
 
     class KeepWriterThreadPool(object):
 
 
     class KeepWriterThreadPool(object):
-        def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None):
+        def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
             self.total_task_nr = 0
             self.total_task_nr = 0
-            self.wanted_copies = copies
             if (not max_service_replicas) or (max_service_replicas >= copies):
                 num_threads = 1
             else:
                 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
             _logger.debug("Pool max threads is %d", num_threads)
             self.workers = []
             if (not max_service_replicas) or (max_service_replicas >= copies):
                 num_threads = 1
             else:
                 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
             _logger.debug("Pool max threads is %d", num_threads)
             self.workers = []
-            self.queue = KeepClient.KeepWriterQueue(copies)
+            self.queue = KeepClient.KeepWriterQueue(copies, classes)
             # Create workers
             for _ in range(num_threads):
                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
             # Create workers
             for _ in range(num_threads):
                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
@@ -629,7 +778,7 @@ class KeepClient(object):
             self.total_task_nr += 1
 
         def done(self):
             self.total_task_nr += 1
 
         def done(self):
-            return self.queue.successful_copies
+            return self.queue.successful_copies, self.queue.satisfied_classes()
 
         def join(self):
             # Start workers
 
         def join(self):
             # Start workers
@@ -643,7 +792,7 @@ class KeepClient(object):
 
 
     class KeepWriterThread(threading.Thread):
 
 
     class KeepWriterThread(threading.Thread):
-        TaskFailed = RuntimeError()
+        class TaskFailed(RuntimeError): pass
 
         def __init__(self, queue, data, data_hash, timeout=None):
             super(KeepClient.KeepWriterThread, self).__init__()
 
         def __init__(self, queue, data, data_hash, timeout=None):
             super(KeepClient.KeepWriterThread, self).__init__()
@@ -660,29 +809,35 @@ class KeepClient(object):
                 except queue.Empty:
                     return
                 try:
                 except queue.Empty:
                     return
                 try:
-                    locator, copies = self.do_task(service, service_root)
+                    locator, copies, classes = self.do_task(service, service_root)
                 except Exception as e:
                 except Exception as e:
-                    if e is not self.TaskFailed:
+                    if not isinstance(e, self.TaskFailed):
                         _logger.exception("Exception in KeepWriterThread")
                     self.queue.write_fail(service)
                 else:
                         _logger.exception("Exception in KeepWriterThread")
                     self.queue.write_fail(service)
                 else:
-                    self.queue.write_success(locator, copies)
+                    self.queue.write_success(locator, copies, classes)
                 finally:
                     self.queue.task_done()
 
         def do_task(self, service, service_root):
                 finally:
                     self.queue.task_done()
 
         def do_task(self, service, service_root):
+            classes = self.queue.pending_classes()
+            headers = {}
+            if len(classes) > 0:
+                classes.sort()
+                headers['X-Keep-Storage-Classes'] = ', '.join(classes)
             success = bool(service.put(self.data_hash,
                                         self.data,
             success = bool(service.put(self.data_hash,
                                         self.data,
-                                        timeout=self.timeout))
+                                        timeout=self.timeout,
+                                        headers=headers))
             result = service.last_result()
 
             if not success:
             result = service.last_result()
 
             if not success:
-                if result.get('status_code', None):
+                if result.get('status_code'):
                     _logger.debug("Request fail: PUT %s => %s %s",
                                   self.data_hash,
                     _logger.debug("Request fail: PUT %s => %s %s",
                                   self.data_hash,
-                                  result['status_code'],
-                                  result['body'])
-                raise self.TaskFailed
+                                  result.get('status_code'),
+                                  result.get('body'))
+                raise self.TaskFailed()
 
             _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
                           str(threading.current_thread()),
 
             _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
                           str(threading.current_thread()),
@@ -694,7 +849,18 @@ class KeepClient(object):
             except (KeyError, ValueError):
                 replicas_stored = 1
 
             except (KeyError, ValueError):
                 replicas_stored = 1
 
-            return result['body'].strip(), replicas_stored
+            classes_confirmed = {}
+            try:
+                scch = result['headers']['x-keep-storage-classes-confirmed']
+                for confirmation in scch.replace(' ', '').split(','):
+                    if '=' in confirmation:
+                        stored_class, stored_copies = confirmation.split('=')[:2]
+                        classes_confirmed[stored_class] = int(stored_copies)
+            except (KeyError, ValueError):
+                # Storage classes confirmed header missing or corrupt
+                classes_confirmed = None
+
+            return result['body'].strip(), replicas_stored, classes_confirmed
 
 
     def __init__(self, api_client=None, proxy=None,
 
 
     def __init__(self, api_client=None, proxy=None,
@@ -787,9 +953,12 @@ class KeepClient(object):
         self.get_counter = Counter()
         self.hits_counter = Counter()
         self.misses_counter = Counter()
         self.get_counter = Counter()
         self.hits_counter = Counter()
         self.misses_counter = Counter()
+        self._storage_classes_unsupported_warning = False
+        self._default_classes = []
 
         if local_store:
             self.local_store = local_store
 
         if local_store:
             self.local_store = local_store
+            self.head = self.local_store_head
             self.get = self.local_store_get
             self.put = self.local_store_put
         else:
             self.get = self.local_store_get
             self.put = self.local_store_put
         else:
@@ -826,6 +995,12 @@ class KeepClient(object):
                 self._writable_services = None
                 self.using_proxy = None
                 self._static_services_list = False
                 self._writable_services = None
                 self.using_proxy = None
                 self._static_services_list = False
+                try:
+                    self._default_classes = [
+                        k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
+                except KeyError:
+                    # We're talking to an old cluster
+                    pass
 
     def current_timeout(self, attempt_number):
         """Return the appropriate timeout to use for this client.
 
     def current_timeout(self, attempt_number):
         """Return the appropriate timeout to use for this client.
@@ -971,9 +1146,10 @@ class KeepClient(object):
         else:
             return None
 
         else:
             return None
 
-    def get_from_cache(self, loc):
+    def get_from_cache(self, loc_s):
         """Fetch a block only if is in the cache, otherwise return None."""
         """Fetch a block only if is in the cache, otherwise return None."""
-        slot = self.block_cache.get(loc)
+        locator = KeepLocator(loc_s)
+        slot = self.block_cache.get(locator.md5sum)
         if slot is not None and slot.ready.is_set():
             return slot.get()
         else:
         if slot is not None and slot.ready.is_set():
             return slot.get()
         else:
@@ -992,7 +1168,7 @@ class KeepClient(object):
     def get(self, loc_s, **kwargs):
         return self._get_or_head(loc_s, method="GET", **kwargs)
 
     def get(self, loc_s, **kwargs):
         return self._get_or_head(loc_s, method="GET", **kwargs)
 
-    def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None):
+    def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
         """Get data from Keep.
 
         This method fetches one or more blocks of data from Keep.  It
         """Get data from Keep.
 
         This method fetches one or more blocks of data from Keep.  It
@@ -1017,6 +1193,13 @@ class KeepClient(object):
 
         self.get_counter.add(1)
 
 
         self.get_counter.add(1)
 
+        request_id = (request_id or
+                      (hasattr(self, 'api_client') and self.api_client.request_id) or
+                      arvados.util.new_request_id())
+        if headers is None:
+            headers = {}
+        headers['X-Request-Id'] = request_id
+
         slot = None
         blob = None
         try:
         slot = None
         blob = None
         try:
@@ -1024,6 +1207,13 @@ class KeepClient(object):
             if method == "GET":
                 slot, first = self.block_cache.reserve_cache(locator.md5sum)
                 if not first:
             if method == "GET":
                 slot, first = self.block_cache.reserve_cache(locator.md5sum)
                 if not first:
+                    if prefetch:
+                        # this is request for a prefetch, if it is
+                        # already in flight, return immediately.
+                        # clear 'slot' to prevent finally block from
+                        # calling slot.set()
+                        slot = None
+                        return None
                     self.hits_counter.add(1)
                     blob = slot.get()
                     if blob is None:
                     self.hits_counter.add(1)
                     blob = slot.get()
                     if blob is None:
@@ -1033,12 +1223,6 @@ class KeepClient(object):
 
             self.misses_counter.add(1)
 
 
             self.misses_counter.add(1)
 
-            if headers is None:
-                headers = {}
-            headers['X-Request-Id'] = (request_id or
-                                        (hasattr(self, 'api_client') and self.api_client.request_id) or
-                                        arvados.util.new_request_id())
-
             # If the locator has hints specifying a prefix (indicating a
             # remote keepproxy) or the UUID of a local gateway service,
             # read data from the indicated service(s) instead of the usual
             # If the locator has hints specifying a prefix (indicating a
             # remote keepproxy) or the UUID of a local gateway service,
             # read data from the indicated service(s) instead of the usual
@@ -1094,14 +1278,10 @@ class KeepClient(object):
 
             # Always cache the result, then return it if we succeeded.
             if loop.success():
 
             # Always cache the result, then return it if we succeeded.
             if loop.success():
-                if method == "HEAD":
-                    return blob or True
-                else:
-                    return blob
+                return blob
         finally:
             if slot is not None:
         finally:
             if slot is not None:
-                slot.set(blob)
-                self.block_cache.cap_cache()
+                self.block_cache.set(slot, blob)
 
         # Q: Including 403 is necessary for the Keep tests to continue
         # passing, but maybe they should expect KeepReadError instead?
 
         # Q: Including 403 is necessary for the Keep tests to continue
         # passing, but maybe they should expect KeepReadError instead?
@@ -1111,17 +1291,17 @@ class KeepClient(object):
                           for key in sorted_roots)
         if not roots_map:
             raise arvados.errors.KeepReadError(
                           for key in sorted_roots)
         if not roots_map:
             raise arvados.errors.KeepReadError(
-                "failed to read {}: no Keep services available ({})".format(
-                    loc_s, loop.last_result()))
+                "[{}] failed to read {}: no Keep services available ({})".format(
+                    request_id, loc_s, loop.last_result()))
         elif not_founds == len(sorted_roots):
             raise arvados.errors.NotFoundError(
         elif not_founds == len(sorted_roots):
             raise arvados.errors.NotFoundError(
-                "{} not found".format(loc_s), service_errors)
+                "[{}] {} not found".format(request_id, loc_s), service_errors)
         else:
             raise arvados.errors.KeepReadError(
         else:
             raise arvados.errors.KeepReadError(
-                "failed to read {}".format(loc_s), service_errors, label="service")
+                "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
 
     @retry.retry_method
 
     @retry.retry_method
-    def put(self, data, copies=2, num_retries=None, request_id=None):
+    def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
         """Save data in Keep.
 
         This method will get a list of Keep services from the API server, and
         """Save data in Keep.
 
         This method will get a list of Keep services from the API server, and
@@ -1138,8 +1318,12 @@ class KeepClient(object):
           *each* Keep server if it returns temporary failures, with
           exponential backoff.  The default value is set when the
           KeepClient is initialized.
           *each* Keep server if it returns temporary failures, with
           exponential backoff.  The default value is set when the
           KeepClient is initialized.
+        * classes: An optional list of storage class names where copies should
+          be written.
         """
 
         """
 
+        classes = classes or self._default_classes
+
         if not isinstance(data, bytes):
             data = data.encode()
 
         if not isinstance(data, bytes):
             data = data.encode()
 
@@ -1151,16 +1335,18 @@ class KeepClient(object):
             return loc_s
         locator = KeepLocator(loc_s)
 
             return loc_s
         locator = KeepLocator(loc_s)
 
+        request_id = (request_id or
+                      (hasattr(self, 'api_client') and self.api_client.request_id) or
+                      arvados.util.new_request_id())
         headers = {
         headers = {
-            'X-Request-Id': (request_id or
-                             (hasattr(self, 'api_client') and self.api_client.request_id) or
-                             arvados.util.new_request_id()),
+            'X-Request-Id': request_id,
             'X-Keep-Desired-Replicas': str(copies),
         }
         roots_map = {}
         loop = retry.RetryLoop(num_retries, self._check_loop_result,
                                backoff_start=2)
             'X-Keep-Desired-Replicas': str(copies),
         }
         roots_map = {}
         loop = retry.RetryLoop(num_retries, self._check_loop_result,
                                backoff_start=2)
-        done = 0
+        done_copies = 0
+        done_classes = []
         for tries_left in loop:
             try:
                 sorted_roots = self.map_new_services(
         for tries_left in loop:
             try:
                 sorted_roots = self.map_new_services(
@@ -1172,35 +1358,55 @@ class KeepClient(object):
                 loop.save_result(error)
                 continue
 
                 loop.save_result(error)
                 continue
 
+            pending_classes = []
+            if done_classes is not None:
+                pending_classes = list(set(classes) - set(done_classes))
             writer_pool = KeepClient.KeepWriterThreadPool(data=data,
                                                         data_hash=data_hash,
             writer_pool = KeepClient.KeepWriterThreadPool(data=data,
                                                         data_hash=data_hash,
-                                                        copies=copies - done,
+                                                        copies=copies - done_copies,
                                                         max_service_replicas=self.max_replicas_per_service,
                                                         max_service_replicas=self.max_replicas_per_service,
-                                                        timeout=self.current_timeout(num_retries - tries_left))
+                                                        timeout=self.current_timeout(num_retries - tries_left),
+                                                        classes=pending_classes)
             for service_root, ks in [(root, roots_map[root])
                                      for root in sorted_roots]:
                 if ks.finished():
                     continue
                 writer_pool.add_task(ks, service_root)
             writer_pool.join()
             for service_root, ks in [(root, roots_map[root])
                                      for root in sorted_roots]:
                 if ks.finished():
                     continue
                 writer_pool.add_task(ks, service_root)
             writer_pool.join()
-            done += writer_pool.done()
-            loop.save_result((done >= copies, writer_pool.total_task_nr))
+            pool_copies, pool_classes = writer_pool.done()
+            done_copies += pool_copies
+            if (done_classes is not None) and (pool_classes is not None):
+                done_classes += pool_classes
+                loop.save_result(
+                    (done_copies >= copies and set(done_classes) == set(classes),
+                    writer_pool.total_task_nr))
+            else:
+                # Old keepstore contacted without storage classes support:
+                # success is determined only by successful copies.
+                #
+                # Disable storage classes tracking from this point forward.
+                if not self._storage_classes_unsupported_warning:
+                    self._storage_classes_unsupported_warning = True
+                    _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
+                done_classes = None
+                loop.save_result(
+                    (done_copies >= copies, writer_pool.total_task_nr))
 
         if loop.success():
             return writer_pool.response()
         if not roots_map:
             raise arvados.errors.KeepWriteError(
 
         if loop.success():
             return writer_pool.response()
         if not roots_map:
             raise arvados.errors.KeepWriteError(
-                "failed to write {}: no Keep services available ({})".format(
-                    data_hash, loop.last_result()))
+                "[{}] failed to write {}: no Keep services available ({})".format(
+                    request_id, data_hash, loop.last_result()))
         else:
             service_errors = ((key, roots_map[key].last_result()['error'])
                               for key in sorted_roots
                               if roots_map[key].last_result()['error'])
             raise arvados.errors.KeepWriteError(
         else:
             service_errors = ((key, roots_map[key].last_result()['error'])
                               for key in sorted_roots
                               if roots_map[key].last_result()['error'])
             raise arvados.errors.KeepWriteError(
-                "failed to write {} (wanted {} copies but wrote {})".format(
-                    data_hash, copies, writer_pool.done()), service_errors, label="service")
+                "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
+                    request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
 
 
-    def local_store_put(self, data, copies=1, num_retries=None):
+    def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
         """A stub for put().
 
         This method is used in place of the real put() method when
         """A stub for put().
 
         This method is used in place of the real put() method when
@@ -1232,5 +1438,14 @@ class KeepClient(object):
         with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
             return f.read()
 
         with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
             return f.read()
 
-    def is_cached(self, locator):
-        return self.block_cache.reserve_cache(expect_hash)
+    def local_store_head(self, loc_s, num_retries=None):
+        """Companion to local_store_put()."""
+        try:
+            locator = KeepLocator(loc_s)
+        except ValueError:
+            raise arvados.errors.NotFoundError(
+                "Invalid data locator: '%s'" % loc_s)
+        if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
+            return True
+        if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
+            return True