Merge branch '5836-remote-api-server-errors-made-obvious' closes #5836
[arvados.git] / sdk / python / arvados / keep.py
index 561d34cd4090cb66c9d1bdf20222cbb925e1a38f..5caa5721dd77eb7790a103f18b0bdbf55d2e7e1d 100644 (file)
@@ -1,34 +1,39 @@
+import bz2
+import datetime
+import fcntl
+import functools
 import gflags
-import httplib
-import httplib2
+import hashlib
+import json
 import logging
 import os
 import pprint
-import sys
-import types
-import subprocess
-import json
-import UserDict
+import pycurl
+import Queue
 import re
-import hashlib
+import socket
+import ssl
 import string
-import bz2
-import zlib
-import fcntl
-import time
+import StringIO
+import subprocess
+import sys
 import threading
+import time
 import timer
-import datetime
-import ssl
-
-_logger = logging.getLogger('arvados.keep')
-global_client_object = None
+import types
+import UserDict
+import zlib
 
 import arvados
 import arvados.config as config
 import arvados.errors
+import arvados.retry as retry
 import arvados.util
 
+_logger = logging.getLogger('arvados.keep')
+global_client_object = None
+
+
 class KeepLocator(object):
     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
@@ -45,7 +50,7 @@ class KeepLocator(object):
             self.size = None
         for hint in pieces:
             if self.HINT_RE.match(hint) is None:
-                raise ValueError("unrecognized hint data {}".format(hint))
+                raise ValueError("invalid hint format: {}".format(hint))
             elif hint.startswith('A'):
                 self.parse_permission_hint(hint)
             else:
@@ -57,6 +62,12 @@ class KeepLocator(object):
                              self.permission_hint()] + self.hints
             if s is not None)
 
+    def stripped(self):
+        if self.size is not None:
+            return "%s+%i" % (self.md5sum, self.size)
+        else:
+            return self.md5sum
+
     def _make_hex_prop(name, length):
         # Build and return a new property with the given name that
         # must be a hex string of the given length.
@@ -141,8 +152,85 @@ class Keep(object):
     def put(data, **kwargs):
         return Keep.global_client_object().put(data, **kwargs)
 
+class KeepBlockCache(object):
+    # Default RAM cache is 256MiB
+    def __init__(self, cache_max=(256 * 1024 * 1024)):
+        self.cache_max = cache_max
+        self._cache = []
+        self._cache_lock = threading.Lock()
+
+    class CacheSlot(object):
+        def __init__(self, locator):
+            self.locator = locator
+            self.ready = threading.Event()
+            self.content = None
+
+        def get(self):
+            self.ready.wait()
+            return self.content
+
+        def set(self, value):
+            self.content = value
+            self.ready.set()
+
+        def size(self):
+            if self.content is None:
+                return 0
+            else:
+                return len(self.content)
+
+    def cap_cache(self):
+        '''Cap the cache size to self.cache_max'''
+        with self._cache_lock:
+            # Select all slots except those where ready.is_set() and content is
+            # None (that means there was an error reading the block).
+            self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
+            sm = sum([slot.size() for slot in self._cache])
+            while len(self._cache) > 0 and sm > self.cache_max:
+                for i in xrange(len(self._cache)-1, -1, -1):
+                    if self._cache[i].ready.is_set():
+                        del self._cache[i]
+                        break
+                sm = sum([slot.size() for slot in self._cache])
+
+    def _get(self, locator):
+        # Test if the locator is already in the cache
+        for i in xrange(0, len(self._cache)):
+            if self._cache[i].locator == locator:
+                n = self._cache[i]
+                if i != 0:
+                    # move it to the front
+                    del self._cache[i]
+                    self._cache.insert(0, n)
+                return n
+        return None
+
+    def get(self, locator):
+        with self._cache_lock:
+            return self._get(locator)
+
+    def reserve_cache(self, locator):
+        '''Reserve a cache slot for the specified locator,
+        or return the existing slot.'''
+        with self._cache_lock:
+            n = self._get(locator)
+            if n:
+                return n, False
+            else:
+                # Add a new cache slot for the locator
+                n = KeepBlockCache.CacheSlot(locator)
+                self._cache.insert(0, n)
+                return n, True
 
 class KeepClient(object):
+
+    # Default Keep server connection timeout:  2 seconds
+    # Default Keep server read timeout:      300 seconds
+    # Default Keep proxy connection timeout:  20 seconds
+    # Default Keep proxy read timeout:       300 seconds
+    DEFAULT_TIMEOUT = (2, 300)
+    DEFAULT_PROXY_TIMEOUT = (20, 300)
+
     class ThreadLimiter(object):
         """
         Limit the number of threads running at a given time to
@@ -200,15 +288,219 @@ class KeepClient(object):
                 return self._done
 
 
+    class KeepService(object):
+        """Make requests to a single Keep service, and track results.
+
+        A KeepService is intended to last long enough to perform one
+        transaction (GET or PUT) against one Keep service. This can
+        involve calling either get() or put() multiple times in order
+        to retry after transient failures. However, calling both get()
+        and put() on a single instance -- or using the same instance
+        to access two different Keep services -- will not produce
+        sensible behavior.
+        """
+
+        HTTP_ERRORS = (
+            socket.error,
+            ssl.SSLError,
+            arvados.errors.HttpError,
+        )
+
+        def __init__(self, root, user_agent_pool=Queue.LifoQueue(), **headers):
+            self.root = root
+            self._user_agent_pool = user_agent_pool
+            self._result = {'error': None}
+            self._usable = True
+            self._session = None
+            self.get_headers = {'Accept': 'application/octet-stream'}
+            self.get_headers.update(headers)
+            self.put_headers = headers
+
+        def usable(self):
+            """Is it worth attempting a request?"""
+            return self._usable
+
+        def finished(self):
+            """Did the request succeed or encounter permanent failure?"""
+            return self._result['error'] == False or not self._usable
+
+        def last_result(self):
+            return self._result
+
+        def _get_user_agent(self):
+            try:
+                return self._user_agent_pool.get(False)
+            except Queue.Empty:
+                return pycurl.Curl()
+
+        def _put_user_agent(self, ua):
+            try:
+                ua.reset()
+                self._user_agent_pool.put(ua, False)
+            except:
+                ua.close()
+
+        def _socket_open(self, family, socktype, protocol, address):
+            """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
+            s = socket.socket(family, socktype, protocol)
+            s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
+            s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
+            s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
+            return s
+
+        def get(self, locator, timeout=None):
+            # locator is a KeepLocator object.
+            url = self.root + str(locator)
+            _logger.debug("Request: GET %s", url)
+            curl = self._get_user_agent()
+            try:
+                with timer.Timer() as t:
+                    self._headers = {}
+                    response_body = StringIO.StringIO()
+                    curl.setopt(pycurl.NOSIGNAL, 1)
+                    curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
+                    curl.setopt(pycurl.URL, url.encode('utf-8'))
+                    curl.setopt(pycurl.HTTPHEADER, [
+                        '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
+                    curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
+                    curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
+                    self._setcurltimeouts(curl, timeout)
+                    try:
+                        curl.perform()
+                    except Exception as e:
+                        raise arvados.errors.HttpError(0, str(e))
+                    self._result = {
+                        'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
+                        'body': response_body.getvalue(),
+                        'headers': self._headers,
+                        'error': False,
+                    }
+                ok = retry.check_http_response_success(self._result['status_code'])
+                if not ok:
+                    self._result['error'] = arvados.errors.HttpError(
+                        self._result['status_code'],
+                        self._headers.get('x-status-line', 'Error'))
+            except self.HTTP_ERRORS as e:
+                self._result = {
+                    'error': e,
+                }
+                ok = False
+            self._usable = ok != False
+            if self._result.get('status_code', None):
+                # The client worked well enough to get an HTTP status
+                # code, so presumably any problems are just on the
+                # server side and it's OK to reuse the client.
+                self._put_user_agent(curl)
+            else:
+                # Don't return this client to the pool, in case it's
+                # broken.
+                curl.close()
+            if not ok:
+                _logger.debug("Request fail: GET %s => %s: %s",
+                              url, type(self._result['error']), str(self._result['error']))
+                return None
+            _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
+                         self._result['status_code'],
+                         len(self._result['body']),
+                         t.msecs,
+                         (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
+            resp_md5 = hashlib.md5(self._result['body']).hexdigest()
+            if resp_md5 != locator.md5sum:
+                _logger.warning("Checksum fail: md5(%s) = %s",
+                                url, resp_md5)
+                self._result['error'] = arvados.errors.HttpError(
+                    0, 'Checksum fail')
+                return None
+            return self._result['body']
+
+        def put(self, hash_s, body, timeout=None):
+            url = self.root + hash_s
+            _logger.debug("Request: PUT %s", url)
+            curl = self._get_user_agent()
+            try:
+                self._headers = {}
+                response_body = StringIO.StringIO()
+                curl.setopt(pycurl.NOSIGNAL, 1)
+                curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
+                curl.setopt(pycurl.URL, url.encode('utf-8'))
+                curl.setopt(pycurl.POSTFIELDS, body)
+                curl.setopt(pycurl.CUSTOMREQUEST, 'PUT')
+                curl.setopt(pycurl.HTTPHEADER, [
+                    '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
+                curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
+                curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
+                self._setcurltimeouts(curl, timeout)
+                try:
+                    curl.perform()
+                except Exception as e:
+                    raise arvados.errors.HttpError(0, str(e))
+                self._result = {
+                    'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
+                    'body': response_body.getvalue(),
+                    'headers': self._headers,
+                    'error': False,
+                }
+                ok = retry.check_http_response_success(self._result['status_code'])
+                if not ok:
+                    self._result['error'] = arvados.errors.HttpError(
+                        self._result['status_code'],
+                        self._headers.get('x-status-line', 'Error'))
+            except self.HTTP_ERRORS as e:
+                self._result = {
+                    'error': e,
+                }
+                ok = False
+            self._usable = ok != False # still usable if ok is True or None
+            if self._result.get('status_code', None):
+                # Client is functional. See comment in get().
+                self._put_user_agent(curl)
+            else:
+                curl.close()
+            if not ok:
+                _logger.debug("Request fail: PUT %s => %s: %s",
+                              url, type(self._result['error']), str(self._result['error']))
+                return False
+            return True
+
+        def _setcurltimeouts(self, curl, timeouts):
+            if not timeouts:
+                return
+            elif isinstance(timeouts, tuple):
+                conn_t, xfer_t = timeouts
+            else:
+                conn_t, xfer_t = (timeouts, timeouts)
+            curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
+            curl.setopt(pycurl.TIMEOUT_MS, int(xfer_t*1000))
+
+        def _headerfunction(self, header_line):
+            header_line = header_line.decode('iso-8859-1')
+            if ':' in header_line:
+                name, value = header_line.split(':', 1)
+                name = name.strip().lower()
+                value = value.strip()
+            elif self._headers:
+                name = self._lastheadername
+                value = self._headers[name] + ' ' + header_line.strip()
+            elif header_line.startswith('HTTP/'):
+                name = 'x-status-line'
+                value = header_line
+            else:
+                _logger.error("Unexpected header line: %s", header_line)
+                return
+            self._lastheadername = name
+            self._headers[name] = value
+            # Returning None implies all bytes were written
+
+
     class KeepWriterThread(threading.Thread):
         """
         Write a blob of data to the given Keep server. On success, call
         save_response() of the given ThreadLimiter to save the returned
         locator.
         """
-        def __init__(self, api_token, **kwargs):
+        def __init__(self, keep_service, **kwargs):
             super(KeepClient.KeepWriterThread, self).__init__()
-            self._api_token = api_token
+            self.service = keep_service
             self.args = kwargs
             self._success = False
 
@@ -224,114 +516,131 @@ class KeepClient(object):
                 self.run_with_limiter(limiter)
 
         def run_with_limiter(self, limiter):
-            _logger.debug("KeepWriterThread %s proceeding %s %s",
+            if self.service.finished():
+                return
+            _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
                           str(threading.current_thread()),
                           self.args['data_hash'],
+                          len(self.args['data']),
                           self.args['service_root'])
-            h = httplib2.Http(timeout=self.args.get('timeout', None))
-            url = self.args['service_root'] + self.args['data_hash']
-            headers = {'Authorization': "OAuth2 %s" % (self._api_token,)}
-
-            if self.args['using_proxy']:
-                # We're using a proxy, so tell the proxy how many copies we
-                # want it to store
-                headers['X-Keep-Desired-Replication'] = str(self.args['want_copies'])
-
-            try:
-                _logger.debug("Uploading to {}".format(url))
-                resp, content = h.request(url.encode('utf-8'), 'PUT',
-                                          headers=headers,
-                                          body=self.args['data'])
-                if (resp['status'] == '401' and
-                    re.match(r'Timestamp verification failed', content)):
-                    body = KeepClient.sign_for_old_server(
-                        self.args['data_hash'],
-                        self.args['data'])
-                    h = httplib2.Http(timeout=self.args.get('timeout', None))
-                    resp, content = h.request(url.encode('utf-8'), 'PUT',
-                                              headers=headers,
-                                              body=body)
-                if re.match(r'^2\d\d$', resp['status']):
-                    self._success = True
-                    _logger.debug("KeepWriterThread %s succeeded %s %s",
-                                  str(threading.current_thread()),
-                                  self.args['data_hash'],
-                                  self.args['service_root'])
+            self._success = bool(self.service.put(
+                self.args['data_hash'],
+                self.args['data'],
+                timeout=self.args.get('timeout', None)))
+            result = self.service.last_result()
+            if self._success:
+                _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
+                              str(threading.current_thread()),
+                              self.args['data_hash'],
+                              len(self.args['data']),
+                              self.args['service_root'])
+                # Tick the 'done' counter for the number of replica
+                # reported stored by the server, for the case that
+                # we're talking to a proxy or other backend that
+                # stores to multiple copies for us.
+                try:
+                    replicas_stored = int(result['headers']['x-keep-replicas-stored'])
+                except (KeyError, ValueError):
                     replicas_stored = 1
-                    if 'x-keep-replicas-stored' in resp:
-                        # Tick the 'done' counter for the number of replica
-                        # reported stored by the server, for the case that
-                        # we're talking to a proxy or other backend that
-                        # stores to multiple copies for us.
-                        try:
-                            replicas_stored = int(resp['x-keep-replicas-stored'])
-                        except ValueError:
-                            pass
-                    limiter.save_response(content.strip(), replicas_stored)
-                else:
-                    _logger.debug("Request fail: PUT %s => %s %s",
-                                    url, resp['status'], content)
-            except (httplib2.HttpLib2Error,
-                    httplib.HTTPException,
-                    ssl.SSLError) as e:
-                # When using https, timeouts look like ssl.SSLError from here.
-                # "SSLError: The write operation timed out"
-                _logger.debug("Request fail: PUT %s => %s: %s",
-                                url, type(e), str(e))
-
-
-    def __init__(self, api_client=None, proxy=None, timeout=60,
-                 api_token=None, local_store=None):
+                limiter.save_response(result['body'].strip(), replicas_stored)
+            elif result.get('status_code', None):
+                _logger.debug("Request fail: PUT %s => %s %s",
+                              self.args['data_hash'],
+                              result['status_code'],
+                              result['body'])
+
+
+    def __init__(self, api_client=None, proxy=None,
+                 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
+                 api_token=None, local_store=None, block_cache=None,
+                 num_retries=0, session=None):
         """Initialize a new KeepClient.
 
         Arguments:
-        * api_client: The API client to use to find Keep services.  If not
+        :api_client:
+          The API client to use to find Keep services.  If not
           provided, KeepClient will build one from available Arvados
           configuration.
-        * proxy: If specified, this KeepClient will send requests to this
-          Keep proxy.  Otherwise, KeepClient will fall back to the setting
-          of the ARVADOS_KEEP_PROXY configuration setting.  If you want to
-          ensure KeepClient does not use a proxy, pass in an empty string.
-        * timeout: The timeout for all HTTP requests, in seconds.  Default
-          60.
-        * api_token: If you're not using an API client, but only talking
+
+        :proxy:
+          If specified, this KeepClient will send requests to this Keep
+          proxy.  Otherwise, KeepClient will fall back to the setting of the
+          ARVADOS_KEEP_PROXY configuration setting.  If you want to ensure
+          KeepClient does not use a proxy, pass in an empty string.
+
+        :timeout:
+          The initial timeout (in seconds) for HTTP requests to Keep
+          non-proxy servers.  A tuple of two floats is interpreted as
+          (connection_timeout, read_timeout): see
+          http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
+          Because timeouts are often a result of transient server load, the
+          actual connection timeout will be increased by a factor of two on
+          each retry.
+          Default: (2, 300).
+
+        :proxy_timeout:
+          The initial timeout (in seconds) for HTTP requests to
+          Keep proxies. A tuple of two floats is interpreted as
+          (connection_timeout, read_timeout). The behavior described
+          above for adjusting connection timeouts on retry also applies.
+          Default: (20, 300).
+
+        :api_token:
+          If you're not using an API client, but only talking
           directly to a Keep proxy, this parameter specifies an API token
           to authenticate Keep requests.  It is an error to specify both
           api_client and api_token.  If you specify neither, KeepClient
           will use one available from the Arvados configuration.
-        * local_store: If specified, this KeepClient will bypass Keep
+
+        :local_store:
+          If specified, this KeepClient will bypass Keep
           services, and save data to the named directory.  If unspecified,
           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
           environment variable.  If you want to ensure KeepClient does not
           use local storage, pass in an empty string.  This is primarily
           intended to mock a server for testing.
+
+        :num_retries:
+          The default number of times to retry failed requests.
+          This will be used as the default num_retries value when get() and
+          put() are called.  Default 0.
         """
         self.lock = threading.Lock()
         if proxy is None:
             proxy = config.get('ARVADOS_KEEP_PROXY')
         if api_token is None:
-            api_token = config.get('ARVADOS_API_TOKEN')
+            if api_client is None:
+                api_token = config.get('ARVADOS_API_TOKEN')
+            else:
+                api_token = api_client.api_token
         elif api_client is not None:
             raise ValueError(
                 "can't build KeepClient with both API client and token")
         if local_store is None:
             local_store = os.environ.get('KEEP_LOCAL_STORE')
 
+        self.block_cache = block_cache if block_cache else KeepBlockCache()
+        self.timeout = timeout
+        self.proxy_timeout = proxy_timeout
+        self._user_agent_pool = Queue.LifoQueue()
+
         if local_store:
             self.local_store = local_store
             self.get = self.local_store_get
             self.put = self.local_store_put
         else:
-            self.timeout = timeout
-            self.cache_max = 256 * 1024 * 1024  # Cache is 256MiB
-            self._cache = []
-            self._cache_lock = threading.Lock()
+            self.num_retries = num_retries
             if proxy:
                 if not proxy.endswith('/'):
                     proxy += '/'
                 self.api_token = api_token
-                self.service_roots = [proxy]
+                self._gateway_services = {}
+                self._keep_services = [{
+                    'uuid': 'proxy',
+                    '_service_root': proxy,
+                    }]
                 self.using_proxy = True
+                self._static_services_list = True
             else:
                 # It's important to avoid instantiating an API client
                 # unless we actually need one, for testing's sake.
@@ -339,242 +648,336 @@ class KeepClient(object):
                     api_client = arvados.api('v1')
                 self.api_client = api_client
                 self.api_token = api_client.api_token
-                self.service_roots = None
+                self._gateway_services = {}
+                self._keep_services = None
                 self.using_proxy = None
+                self._static_services_list = False
 
-    def shuffled_service_roots(self, hash):
-        if self.service_roots is None:
-            with self.lock:
-                try:
-                    keep_services = self.api_client.keep_services().accessible()
-                except Exception:  # API server predates Keep services.
-                    keep_services = self.api_client.keep_disks().list()
-
-                keep_services = keep_services.execute().get('items')
-                if not keep_services:
-                    raise arvados.errors.NoKeepServersError()
-
-                self.using_proxy = (keep_services[0].get('service_type') ==
-                                    'proxy')
-
-                roots = (("http%s://%s:%d/" %
-                          ('s' if f['service_ssl_flag'] else '',
-                           f['service_host'],
-                           f['service_port']))
-                         for f in keep_services)
-                self.service_roots = sorted(set(roots))
-                _logger.debug(str(self.service_roots))
-
-        # Build an ordering with which to query the Keep servers based on the
-        # contents of the hash.
-        # "hash" is a hex-encoded number at least 8 digits
-        # (32 bits) long
-
-        # seed used to calculate the next keep server from 'pool'
-        # to be added to 'pseq'
-        seed = hash
-
-        # Keep servers still to be added to the ordering
-        pool = self.service_roots[:]
-
-        # output probe sequence
-        pseq = []
-
-        # iterate while there are servers left to be assigned
-        while len(pool) > 0:
-            if len(seed) < 8:
-                # ran out of digits in the seed
-                if len(pseq) < len(hash) / 4:
-                    # the number of servers added to the probe sequence is less
-                    # than the number of 4-digit slices in 'hash' so refill the
-                    # seed with the last 4 digits and then append the contents
-                    # of 'hash'.
-                    seed = hash[-4:] + hash
-                else:
-                    # refill the seed with the contents of 'hash'
-                    seed += hash
-
-            # Take the next 8 digits (32 bytes) and interpret as an integer,
-            # then modulus with the size of the remaining pool to get the next
-            # selected server.
-            probe = int(seed[0:8], 16) % len(pool)
-
-            # Append the selected server to the probe sequence and remove it
-            # from the pool.
-            pseq += [pool[probe]]
-            pool = pool[:probe] + pool[probe+1:]
-
-            # Remove the digits just used from the seed
-            seed = seed[8:]
-        _logger.debug(str(pseq))
-        return pseq
+    def current_timeout(self, attempt_number):
+        """Return the appropriate timeout to use for this client.
 
-    class CacheSlot(object):
-        def __init__(self, locator):
-            self.locator = locator
-            self.ready = threading.Event()
-            self.content = None
+        The proxy timeout setting if the backend service is currently a proxy,
+        the regular timeout setting otherwise.  The `attempt_number` indicates
+        how many times the operation has been tried already (starting from 0
+        for the first try), and scales the connection timeout portion of the
+        return value accordingly.
 
-        def get(self):
-            self.ready.wait()
-            return self.content
+        """
+        # TODO(twp): the timeout should be a property of a
+        # KeepService, not a KeepClient. See #4488.
+        t = self.proxy_timeout if self.using_proxy else self.timeout
+        return (t[0] * (1 << attempt_number), t[1])
+
+    def build_services_list(self, force_rebuild=False):
+        if (self._static_services_list or
+              (self._keep_services and not force_rebuild)):
+            return
+        with self.lock:
+            try:
+                keep_services = self.api_client.keep_services().accessible()
+            except Exception:  # API server predates Keep services.
+                keep_services = self.api_client.keep_disks().list()
+
+            accessible = keep_services.execute().get('items')
+            if not accessible:
+                raise arvados.errors.NoKeepServersError()
+
+            # Precompute the base URI for each service.
+            for r in accessible:
+                host = r['service_host']
+                if not host.startswith('[') and host.find(':') >= 0:
+                    # IPv6 URIs must be formatted like http://[::1]:80/...
+                    host = '[' + host + ']'
+                r['_service_root'] = "{}://{}:{:d}/".format(
+                    'https' if r['service_ssl_flag'] else 'http',
+                    host,
+                    r['service_port'])
+
+            # Gateway services are only used when specified by UUID,
+            # so there's nothing to gain by filtering them by
+            # service_type.
+            self._gateway_services = {ks.get('uuid'): ks for ks in accessible}
+            _logger.debug(str(self._gateway_services))
+
+            self._keep_services = [
+                ks for ks in accessible
+                if ks.get('service_type') in ['disk', 'proxy']]
+            _logger.debug(str(self._keep_services))
+
+            self.using_proxy = any(ks.get('service_type') == 'proxy'
+                                   for ks in self._keep_services)
+
+    def _service_weight(self, data_hash, service_uuid):
+        """Compute the weight of a Keep service endpoint for a data
+        block with a known hash.
+
+        The weight is md5(h + u) where u is the last 15 characters of
+        the service endpoint's UUID.
+        """
+        return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
 
-        def set(self, value):
-            self.content = value
-            self.ready.set()
+    def weighted_service_roots(self, locator, force_rebuild=False):
+        """Return an array of Keep service endpoints, in the order in
+        which they should be probed when reading or writing data with
+        the given hash+hints.
+        """
+        self.build_services_list(force_rebuild)
+
+        sorted_roots = []
+
+        # Use the services indicated by the given +K@... remote
+        # service hints, if any are present and can be resolved to a
+        # URI.
+        for hint in locator.hints:
+            if hint.startswith('K@'):
+                if len(hint) == 7:
+                    sorted_roots.append(
+                        "https://keep.{}.arvadosapi.com/".format(hint[2:]))
+                elif len(hint) == 29:
+                    svc = self._gateway_services.get(hint[2:])
+                    if svc:
+                        sorted_roots.append(svc['_service_root'])
+
+        # Sort the available local services by weight (heaviest first)
+        # for this locator, and return their service_roots (base URIs)
+        # in that order.
+        sorted_roots.extend([
+            svc['_service_root'] for svc in sorted(
+                self._keep_services,
+                reverse=True,
+                key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
+        _logger.debug("{}: {}".format(locator, sorted_roots))
+        return sorted_roots
+
+    def map_new_services(self, roots_map, locator, force_rebuild, **headers):
+        # roots_map is a dictionary, mapping Keep service root strings
+        # to KeepService objects.  Poll for Keep services, and add any
+        # new ones to roots_map.  Return the current list of local
+        # root strings.
+        headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
+        local_roots = self.weighted_service_roots(locator, force_rebuild)
+        for root in local_roots:
+            if root not in roots_map:
+                roots_map[root] = self.KeepService(
+                    root, self._user_agent_pool, **headers)
+        return local_roots
 
-        def size(self):
-            if self.content == None:
-                return 0
-            else:
-                return len(self.content)
+    @staticmethod
+    def _check_loop_result(result):
+        # KeepClient RetryLoops should save results as a 2-tuple: the
+        # actual result of the request, and the number of servers available
+        # to receive the request this round.
+        # This method returns True if there's a real result, False if
+        # there are no more servers available, otherwise None.
+        if isinstance(result, Exception):
+            return None
+        result, tried_server_count = result
+        if (result is not None) and (result is not False):
+            return True
+        elif tried_server_count < 1:
+            _logger.info("No more Keep services to try; giving up")
+            return False
+        else:
+            return None
 
-    def cap_cache(self):
-        '''Cap the cache size to self.cache_max'''
-        self._cache_lock.acquire()
-        try:
-            self._cache = filter(lambda c: not (c.ready.is_set() and c.content == None), self._cache)
-            sm = sum([slot.size() for slot in self._cache])
-            while sm > self.cache_max:
-                del self._cache[-1]
-                sm = sum([slot.size() for a in self._cache])
-        finally:
-            self._cache_lock.release()
+    def get_from_cache(self, loc):
+        """Fetch a block only if is in the cache, otherwise return None."""
+        slot = self.block_cache.get(loc)
+        if slot is not None and slot.ready.is_set():
+            return slot.get()
+        else:
+            return None
 
-    def reserve_cache(self, locator):
-        '''Reserve a cache slot for the specified locator,
-        or return the existing slot.'''
-        self._cache_lock.acquire()
-        try:
-            # Test if the locator is already in the cache
-            for i in xrange(0, len(self._cache)):
-                if self._cache[i].locator == locator:
-                    n = self._cache[i]
-                    if i != 0:
-                        # move it to the front
-                        del self._cache[i]
-                        self._cache.insert(0, n)
-                    return n, False
+    @retry.retry_method
+    def get(self, loc_s, num_retries=None):
+        """Get data from Keep.
 
-            # Add a new cache slot for the locator
-            n = KeepClient.CacheSlot(locator)
-            self._cache.insert(0, n)
-            return n, True
-        finally:
-            self._cache_lock.release()
+        This method fetches one or more blocks of data from Keep.  It
+        sends a request each Keep service registered with the API
+        server (or the proxy provided when this client was
+        instantiated), then each service named in location hints, in
+        sequence.  As soon as one service provides the data, it's
+        returned.
 
-    def get(self, loc_s):
+        Arguments:
+        * loc_s: A string of one or more comma-separated locators to fetch.
+          This method returns the concatenation of these blocks.
+        * num_retries: The number of times to retry GET requests to
+          *each* Keep server if it returns temporary failures, with
+          exponential backoff.  Note that, in each loop, the method may try
+          to fetch data from every available Keep service, along with any
+          that are named in location hints in the locator.  The default value
+          is set when the KeepClient is initialized.
+        """
         if ',' in loc_s:
             return ''.join(self.get(x) for x in loc_s.split(','))
         locator = KeepLocator(loc_s)
-        expect_hash = locator.md5sum
-
-        slot, first = self.reserve_cache(expect_hash)
-
+        slot, first = self.block_cache.reserve_cache(locator.md5sum)
         if not first:
             v = slot.get()
             return v
 
-        try:
-            for service_root in self.shuffled_service_roots(expect_hash):
-                url = service_root + loc_s
-                headers = {'Authorization': "OAuth2 %s" % (self.api_token,),
-                           'Accept': 'application/octet-stream'}
-                blob = self.get_url(url, headers, expect_hash)
-                if blob:
-                    slot.set(blob)
-                    self.cap_cache()
-                    return blob
-
-            for hint in locator.hints:
-                if not hint.startswith('K@'):
-                    continue
-                url = 'http://keep.' + hint[2:] + '.arvadosapi.com/' + loc_s
-                blob = self.get_url(url, {}, expect_hash)
-                if blob:
-                    slot.set(blob)
-                    self.cap_cache()
-                    return blob
-        except:
-            slot.set(None)
-            self.cap_cache()
-            raise
-
-        slot.set(None)
-        self.cap_cache()
-        raise arvados.errors.NotFoundError("Block not found: %s" % expect_hash)
-
-    def get_url(self, url, headers, expect_hash):
-        h = httplib2.Http()
-        try:
-            _logger.debug("Request: GET %s", url)
-            with timer.Timer() as t:
-                resp, content = h.request(url.encode('utf-8'), 'GET',
-                                          headers=headers)
-            _logger.info("Received %s bytes in %s msec (%s MiB/sec)",
-                         len(content), t.msecs,
-                         (len(content)/(1024*1024))/t.secs)
-            if re.match(r'^2\d\d$', resp['status']):
-                md5 = hashlib.md5(content).hexdigest()
-                if md5 == expect_hash:
-                    return content
-                _logger.warning("Checksum fail: md5(%s) = %s", url, md5)
-        except Exception as e:
-            _logger.debug("Request fail: GET %s => %s: %s",
-                         url, type(e), str(e))
-        return None
+        # If the locator has hints specifying a prefix (indicating a
+        # remote keepproxy) or the UUID of a local gateway service,
+        # read data from the indicated service(s) instead of the usual
+        # list of local disk services.
+        hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
+                      for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
+        hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
+                           for hint in locator.hints if (
+                                   hint.startswith('K@') and
+                                   len(hint) == 29 and
+                                   self._gateway_services.get(hint[2:])
+                                   )])
+        # Map root URLs to their KeepService objects.
+        roots_map = {
+            root: self.KeepService(root, self._user_agent_pool)
+            for root in hint_roots
+        }
+
+        # See #3147 for a discussion of the loop implementation.  Highlights:
+        # * Refresh the list of Keep services after each failure, in case
+        #   it's being updated.
+        # * Retry until we succeed, we're out of retries, or every available
+        #   service has returned permanent failure.
+        sorted_roots = []
+        roots_map = {}
+        blob = None
+        loop = retry.RetryLoop(num_retries, self._check_loop_result,
+                               backoff_start=2)
+        for tries_left in loop:
+            try:
+                sorted_roots = self.map_new_services(
+                    roots_map, locator,
+                    force_rebuild=(tries_left < num_retries))
+            except Exception as error:
+                loop.save_result(error)
+                continue
+
+            # Query KeepService objects that haven't returned
+            # permanent failure, in our specified shuffle order.
+            services_to_try = [roots_map[root]
+                               for root in sorted_roots
+                               if roots_map[root].usable()]
+            for keep_service in services_to_try:
+                blob = keep_service.get(locator, timeout=self.current_timeout(num_retries-tries_left))
+                if blob is not None:
+                    break
+            loop.save_result((blob, len(services_to_try)))
+
+        # Always cache the result, then return it if we succeeded.
+        slot.set(blob)
+        self.block_cache.cap_cache()
+        if loop.success():
+            return blob
+
+        # Q: Including 403 is necessary for the Keep tests to continue
+        # passing, but maybe they should expect KeepReadError instead?
+        not_founds = sum(1 for key in sorted_roots
+                         if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
+        service_errors = ((key, roots_map[key].last_result()['error'])
+                          for key in sorted_roots)
+        if not roots_map:
+            raise arvados.errors.KeepReadError(
+                "failed to read {}: no Keep services available ({})".format(
+                    loc_s, loop.last_result()))
+        elif not_founds == len(sorted_roots):
+            raise arvados.errors.NotFoundError(
+                "{} not found".format(loc_s), service_errors)
+        else:
+            raise arvados.errors.KeepReadError(
+                "failed to read {}".format(loc_s), service_errors, label="service")
+
+    @retry.retry_method
+    def put(self, data, copies=2, num_retries=None):
+        """Save data in Keep.
+
+        This method will get a list of Keep services from the API server, and
+        send the data to each one simultaneously in a new thread.  Once the
+        uploads are finished, if enough copies are saved, this method returns
+        the most recent HTTP response body.  If requests fail to upload
+        enough copies, this method raises KeepWriteError.
+
+        Arguments:
+        * data: The string of data to upload.
+        * copies: The number of copies that the user requires be saved.
+          Default 2.
+        * num_retries: The number of times to retry PUT requests to
+          *each* Keep server if it returns temporary failures, with
+          exponential backoff.  The default value is set when the
+          KeepClient is initialized.
+        """
+
+        if isinstance(data, unicode):
+            data = data.encode("ascii")
+        elif not isinstance(data, str):
+            raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put must be type 'str'")
 
-    def put(self, data, copies=2):
         data_hash = hashlib.md5(data).hexdigest()
-        have_copies = 0
-        want_copies = copies
-        if not (want_copies > 0):
+        if copies < 1:
             return data_hash
-        threads = []
-        thread_limiter = KeepClient.ThreadLimiter(want_copies)
-        for service_root in self.shuffled_service_roots(data_hash):
-            t = KeepClient.KeepWriterThread(
-                self.api_token,
-                data=data,
-                data_hash=data_hash,
-                service_root=service_root,
-                thread_limiter=thread_limiter,
-                timeout=self.timeout,
-                using_proxy=self.using_proxy,
-                want_copies=(want_copies if self.using_proxy else 1))
-            t.start()
-            threads += [t]
-        for t in threads:
-            t.join()
-        if thread_limiter.done() < want_copies:
-            # Retry the threads (i.e., services) that failed the first
-            # time around.
-            threads_retry = []
+        locator = KeepLocator(data_hash + '+' + str(len(data)))
+
+        headers = {}
+        if self.using_proxy:
+            # Tell the proxy how many copies we want it to store
+            headers['X-Keep-Desired-Replication'] = str(copies)
+        roots_map = {}
+        thread_limiter = KeepClient.ThreadLimiter(copies)
+        loop = retry.RetryLoop(num_retries, self._check_loop_result,
+                               backoff_start=2)
+        for tries_left in loop:
+            try:
+                local_roots = self.map_new_services(
+                    roots_map, locator,
+                    force_rebuild=(tries_left < num_retries), **headers)
+            except Exception as error:
+                loop.save_result(error)
+                continue
+
+            threads = []
+            for service_root, ks in roots_map.iteritems():
+                if ks.finished():
+                    continue
+                t = KeepClient.KeepWriterThread(
+                    ks,
+                    data=data,
+                    data_hash=data_hash,
+                    service_root=service_root,
+                    thread_limiter=thread_limiter,
+                    timeout=self.current_timeout(num_retries-tries_left))
+                t.start()
+                threads.append(t)
             for t in threads:
-                if not t.success():
-                    _logger.debug("Retrying: PUT %s %s",
-                                    t.args['service_root'],
-                                    t.args['data_hash'])
-                    retry_with_args = t.args.copy()
-                    t_retry = KeepClient.KeepWriterThread(self.api_token,
-                                                          **retry_with_args)
-                    t_retry.start()
-                    threads_retry += [t_retry]
-            for t in threads_retry:
                 t.join()
-        have_copies = thread_limiter.done()
-        # If we're done, return the response from Keep
-        if have_copies >= want_copies:
+            loop.save_result((thread_limiter.done() >= copies, len(threads)))
+
+        if loop.success():
             return thread_limiter.response()
-        raise arvados.errors.KeepWriteError(
-            "Write fail for %s: wanted %d but wrote %d" %
-            (data_hash, want_copies, have_copies))
+        if not roots_map:
+            raise arvados.errors.KeepWriteError(
+                "failed to write {}: no Keep services available ({})".format(
+                    data_hash, loop.last_result()))
+        else:
+            service_errors = ((key, roots_map[key].last_result()['error'])
+                              for key in local_roots
+                              if roots_map[key].last_result()['error'])
+            raise arvados.errors.KeepWriteError(
+                "failed to write {} (wanted {} copies but wrote {})".format(
+                    data_hash, copies, thread_limiter.done()), service_errors, label="service")
 
-    @staticmethod
-    def sign_for_old_server(data_hash, data):
-        return (("-----BEGIN PGP SIGNED MESSAGE-----\n\n\n%d %s\n-----BEGIN PGP SIGNATURE-----\n\n-----END PGP SIGNATURE-----\n" % (int(time.time()), data_hash)) + data)
+    def local_store_put(self, data, copies=1, num_retries=None):
+        """A stub for put().
 
-    def local_store_put(self, data):
+        This method is used in place of the real put() method when
+        using local storage (see constructor's local_store argument).
+
+        copies and num_retries arguments are ignored: they are here
+        only for the sake of offering the same call signature as
+        put().
+
+        Data stored this way can be retrieved via local_store_get().
+        """
         md5 = hashlib.md5(data).hexdigest()
         locator = '%s+%d' % (md5, len(data))
         with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
@@ -583,7 +986,8 @@ class KeepClient(object):
                   os.path.join(self.local_store, md5))
         return locator
 
-    def local_store_get(self, loc_s):
+    def local_store_get(self, loc_s, num_retries=None):
+        """Companion to local_store_put()."""
         try:
             locator = KeepLocator(loc_s)
         except ValueError:
@@ -593,3 +997,6 @@ class KeepClient(object):
             return ''
         with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
             return f.read()
+
+    def is_cached(self, locator):
+        return self.block_cache.reserve_cache(expect_hash)