Merge branch '3198-writable-fuse' closes #3198
[arvados.git] / sdk / python / arvados / keep.py
index 62c6709bb66b0876a24ecea2fb37e9080d9dde74..b2700ae5ba71dc9023ce8eb4f843a0d301b729b6 100644 (file)
@@ -1,25 +1,28 @@
+import bz2
+import datetime
+import fcntl
+import functools
 import gflags
 import gflags
+import hashlib
+import json
 import logging
 import os
 import pprint
 import logging
 import os
 import pprint
-import sys
-import types
-import subprocess
-import json
-import UserDict
+import pycurl
+import Queue
 import re
 import re
-import hashlib
+import socket
+import ssl
 import string
 import string
-import bz2
-import zlib
-import fcntl
-import time
+import cStringIO
+import subprocess
+import sys
 import threading
 import threading
+import time
 import timer
 import timer
-import datetime
-import ssl
-import socket
-import requests
+import types
+import UserDict
+import zlib
 
 import arvados
 import arvados.config as config
 
 import arvados
 import arvados.config as config
@@ -30,6 +33,7 @@ import arvados.util
 _logger = logging.getLogger('arvados.keep')
 global_client_object = None
 
 _logger = logging.getLogger('arvados.keep')
 global_client_object = None
 
+
 class KeepLocator(object):
     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
 class KeepLocator(object):
     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
@@ -46,7 +50,7 @@ class KeepLocator(object):
             self.size = None
         for hint in pieces:
             if self.HINT_RE.match(hint) is None:
             self.size = None
         for hint in pieces:
             if self.HINT_RE.match(hint) is None:
-                raise ValueError("unrecognized hint data {}".format(hint))
+                raise ValueError("invalid hint format: {}".format(hint))
             elif hint.startswith('A'):
                 self.parse_permission_hint(hint)
             else:
             elif hint.startswith('A'):
                 self.parse_permission_hint(hint)
             else:
@@ -72,7 +76,7 @@ class KeepLocator(object):
             return getattr(self, data_name)
         def setter(self, hex_str):
             if not arvados.util.is_hex(hex_str, length):
             return getattr(self, data_name)
         def setter(self, hex_str):
             if not arvados.util.is_hex(hex_str, length):
-                raise ValueError("{} must be a {}-digit hex string: {}".
+                raise ValueError("{} is not a {}-digit hex string: {}".
                                  format(name, length, hex_str))
             setattr(self, data_name, hex_str)
         return property(getter, setter)
                                  format(name, length, hex_str))
             setattr(self, data_name, hex_str)
         return property(getter, setter)
@@ -285,75 +289,216 @@ class KeepClient(object):
 
 
     class KeepService(object):
 
 
     class KeepService(object):
-        # Make requests to a single Keep service, and track results.
-        HTTP_ERRORS = (requests.exceptions.RequestException,
-                       socket.error, ssl.SSLError)
+        """Make requests to a single Keep service, and track results.
+
+        A KeepService is intended to last long enough to perform one
+        transaction (GET or PUT) against one Keep service. This can
+        involve calling either get() or put() multiple times in order
+        to retry after transient failures. However, calling both get()
+        and put() on a single instance -- or using the same instance
+        to access two different Keep services -- will not produce
+        sensible behavior.
+        """
 
 
-        def __init__(self, root, session, **headers):
+        HTTP_ERRORS = (
+            socket.error,
+            ssl.SSLError,
+            arvados.errors.HttpError,
+        )
+
+        def __init__(self, root, user_agent_pool=Queue.LifoQueue(), **headers):
             self.root = root
             self.root = root
-            self.last_result = None
-            self.success_flag = None
-            self.session = session
+            self._user_agent_pool = user_agent_pool
+            self._result = {'error': None}
+            self._usable = True
+            self._session = None
             self.get_headers = {'Accept': 'application/octet-stream'}
             self.get_headers.update(headers)
             self.put_headers = headers
 
         def usable(self):
             self.get_headers = {'Accept': 'application/octet-stream'}
             self.get_headers.update(headers)
             self.put_headers = headers
 
         def usable(self):
-            return self.success_flag is not False
+            """Is it worth attempting a request?"""
+            return self._usable
 
         def finished(self):
 
         def finished(self):
-            return self.success_flag is not None
+            """Did the request succeed or encounter permanent failure?"""
+            return self._result['error'] == False or not self._usable
+
+        def last_result(self):
+            return self._result
 
 
-        def last_status(self):
+        def _get_user_agent(self):
             try:
             try:
-                return self.last_result.status_code
-            except AttributeError:
-                return None
+                return self._user_agent_pool.get(False)
+            except Queue.Empty:
+                return pycurl.Curl()
+
+        def _put_user_agent(self, ua):
+            try:
+                ua.reset()
+                self._user_agent_pool.put(ua, False)
+            except:
+                ua.close()
+
+        @staticmethod
+        def _socket_open(family, socktype, protocol, address=None):
+            """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
+            s = socket.socket(family, socktype, protocol)
+            s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
+            s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
+            s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
+            return s
 
         def get(self, locator, timeout=None):
             # locator is a KeepLocator object.
             url = self.root + str(locator)
             _logger.debug("Request: GET %s", url)
 
         def get(self, locator, timeout=None):
             # locator is a KeepLocator object.
             url = self.root + str(locator)
             _logger.debug("Request: GET %s", url)
+            curl = self._get_user_agent()
             try:
                 with timer.Timer() as t:
             try:
                 with timer.Timer() as t:
-                    result = self.session.get(url.encode('utf-8'),
-                                          headers=self.get_headers,
-                                          timeout=timeout)
+                    self._headers = {}
+                    response_body = cStringIO.StringIO()
+                    curl.setopt(pycurl.NOSIGNAL, 1)
+                    curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
+                    curl.setopt(pycurl.URL, url.encode('utf-8'))
+                    curl.setopt(pycurl.HTTPHEADER, [
+                        '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
+                    curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
+                    curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
+                    self._setcurltimeouts(curl, timeout)
+                    try:
+                        curl.perform()
+                    except Exception as e:
+                        raise arvados.errors.HttpError(0, str(e))
+                    self._result = {
+                        'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
+                        'body': response_body.getvalue(),
+                        'headers': self._headers,
+                        'error': False,
+                    }
+                ok = retry.check_http_response_success(self._result['status_code'])
+                if not ok:
+                    self._result['error'] = arvados.errors.HttpError(
+                        self._result['status_code'],
+                        self._headers.get('x-status-line', 'Error'))
             except self.HTTP_ERRORS as e:
             except self.HTTP_ERRORS as e:
-                _logger.debug("Request fail: GET %s => %s: %s",
-                              url, type(e), str(e))
-                self.last_result = e
+                self._result = {
+                    'error': e,
+                }
+                ok = False
+            self._usable = ok != False
+            if self._result.get('status_code', None):
+                # The client worked well enough to get an HTTP status
+                # code, so presumably any problems are just on the
+                # server side and it's OK to reuse the client.
+                self._put_user_agent(curl)
             else:
             else:
-                self.last_result = result
-                self.success_flag = retry.check_http_response_success(result)
-                content = result.content
-                _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
-                             self.last_status(), len(content), t.msecs,
-                             (len(content)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
-                if self.success_flag:
-                    resp_md5 = hashlib.md5(content).hexdigest()
-                    if resp_md5 == locator.md5sum:
-                        return content
-                    _logger.warning("Checksum fail: md5(%s) = %s",
-                                    url, resp_md5)
-            return None
+                # Don't return this client to the pool, in case it's
+                # broken.
+                curl.close()
+            if not ok:
+                _logger.debug("Request fail: GET %s => %s: %s",
+                              url, type(self._result['error']), str(self._result['error']))
+                return None
+            _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
+                         self._result['status_code'],
+                         len(self._result['body']),
+                         t.msecs,
+                         (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
+            resp_md5 = hashlib.md5(self._result['body']).hexdigest()
+            if resp_md5 != locator.md5sum:
+                _logger.warning("Checksum fail: md5(%s) = %s",
+                                url, resp_md5)
+                self._result['error'] = arvados.errors.HttpError(
+                    0, 'Checksum fail')
+                return None
+            return self._result['body']
 
         def put(self, hash_s, body, timeout=None):
             url = self.root + hash_s
             _logger.debug("Request: PUT %s", url)
 
         def put(self, hash_s, body, timeout=None):
             url = self.root + hash_s
             _logger.debug("Request: PUT %s", url)
+            curl = self._get_user_agent()
             try:
             try:
-                result = self.session.put(url.encode('utf-8'),
-                                      data=body,
-                                      headers=self.put_headers,
-                                      timeout=timeout)
+                self._headers = {}
+                body_reader = cStringIO.StringIO(body)
+                response_body = cStringIO.StringIO()
+                curl.setopt(pycurl.NOSIGNAL, 1)
+                curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
+                curl.setopt(pycurl.URL, url.encode('utf-8'))
+                # Using UPLOAD tells cURL to wait for a "go ahead" from the
+                # Keep server (in the form of a HTTP/1.1 "100 Continue"
+                # response) instead of sending the request body immediately.
+                # This allows the server to reject the request if the request
+                # is invalid or the server is read-only, without waiting for
+                # the client to send the entire block.
+                curl.setopt(pycurl.UPLOAD, True)
+                curl.setopt(pycurl.INFILESIZE, len(body))
+                curl.setopt(pycurl.READFUNCTION, body_reader.read)
+                curl.setopt(pycurl.HTTPHEADER, [
+                    '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
+                curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
+                curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
+                self._setcurltimeouts(curl, timeout)
+                try:
+                    curl.perform()
+                except Exception as e:
+                    raise arvados.errors.HttpError(0, str(e))
+                self._result = {
+                    'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
+                    'body': response_body.getvalue(),
+                    'headers': self._headers,
+                    'error': False,
+                }
+                ok = retry.check_http_response_success(self._result['status_code'])
+                if not ok:
+                    self._result['error'] = arvados.errors.HttpError(
+                        self._result['status_code'],
+                        self._headers.get('x-status-line', 'Error'))
             except self.HTTP_ERRORS as e:
             except self.HTTP_ERRORS as e:
+                self._result = {
+                    'error': e,
+                }
+                ok = False
+            self._usable = ok != False # still usable if ok is True or None
+            if self._result.get('status_code', None):
+                # Client is functional. See comment in get().
+                self._put_user_agent(curl)
+            else:
+                curl.close()
+            if not ok:
                 _logger.debug("Request fail: PUT %s => %s: %s",
                 _logger.debug("Request fail: PUT %s => %s: %s",
-                              url, type(e), str(e))
-                self.last_result = e
+                              url, type(self._result['error']), str(self._result['error']))
+                return False
+            return True
+
+        def _setcurltimeouts(self, curl, timeouts):
+            if not timeouts:
+                return
+            elif isinstance(timeouts, tuple):
+                conn_t, xfer_t = timeouts
             else:
             else:
-                self.last_result = result
-                self.success_flag = retry.check_http_response_success(result)
-            return self.success_flag
+                conn_t, xfer_t = (timeouts, timeouts)
+            curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
+            curl.setopt(pycurl.TIMEOUT_MS, int(xfer_t*1000))
+
+        def _headerfunction(self, header_line):
+            header_line = header_line.decode('iso-8859-1')
+            if ':' in header_line:
+                name, value = header_line.split(':', 1)
+                name = name.strip().lower()
+                value = value.strip()
+            elif self._headers:
+                name = self._lastheadername
+                value = self._headers[name] + ' ' + header_line.strip()
+            elif header_line.startswith('HTTP/'):
+                name = 'x-status-line'
+                value = header_line
+            else:
+                _logger.error("Unexpected header line: %s", header_line)
+                return
+            self._lastheadername = name
+            self._headers[name] = value
+            # Returning None implies all bytes were written
 
 
     class KeepWriterThread(threading.Thread):
 
 
     class KeepWriterThread(threading.Thread):
@@ -391,9 +536,8 @@ class KeepClient(object):
                 self.args['data_hash'],
                 self.args['data'],
                 timeout=self.args.get('timeout', None)))
                 self.args['data_hash'],
                 self.args['data'],
                 timeout=self.args.get('timeout', None)))
-            status = self.service.last_status()
+            result = self.service.last_result()
             if self._success:
             if self._success:
-                result = self.service.last_result
                 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
                               str(threading.current_thread()),
                               self.args['data_hash'],
                 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
                               str(threading.current_thread()),
                               self.args['data_hash'],
@@ -404,14 +548,15 @@ class KeepClient(object):
                 # we're talking to a proxy or other backend that
                 # stores to multiple copies for us.
                 try:
                 # we're talking to a proxy or other backend that
                 # stores to multiple copies for us.
                 try:
-                    replicas_stored = int(result.headers['x-keep-replicas-stored'])
+                    replicas_stored = int(result['headers']['x-keep-replicas-stored'])
                 except (KeyError, ValueError):
                     replicas_stored = 1
                 except (KeyError, ValueError):
                     replicas_stored = 1
-                limiter.save_response(result.content.strip(), replicas_stored)
-            elif status is not None:
+                limiter.save_response(result['body'].strip(), replicas_stored)
+            elif result.get('status_code', None):
                 _logger.debug("Request fail: PUT %s => %s %s",
                 _logger.debug("Request fail: PUT %s => %s %s",
-                              self.args['data_hash'], status,
-                              self.service.last_result.content)
+                              self.args['data_hash'],
+                              result['status_code'],
+                              result['body'])
 
 
     def __init__(self, api_client=None, proxy=None,
 
 
     def __init__(self, api_client=None, proxy=None,
@@ -433,16 +578,21 @@ class KeepClient(object):
           KeepClient does not use a proxy, pass in an empty string.
 
         :timeout:
           KeepClient does not use a proxy, pass in an empty string.
 
         :timeout:
-          The timeout (in seconds) for HTTP requests to Keep
+          The initial timeout (in seconds) for HTTP requests to Keep
           non-proxy servers.  A tuple of two floats is interpreted as
           (connection_timeout, read_timeout): see
           http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
           non-proxy servers.  A tuple of two floats is interpreted as
           (connection_timeout, read_timeout): see
           http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
+          Because timeouts are often a result of transient server load, the
+          actual connection timeout will be increased by a factor of two on
+          each retry.
           Default: (2, 300).
 
         :proxy_timeout:
           Default: (2, 300).
 
         :proxy_timeout:
-          The timeout (in seconds) for HTTP requests to
+          The initial timeout (in seconds) for HTTP requests to
           Keep proxies. A tuple of two floats is interpreted as
           Keep proxies. A tuple of two floats is interpreted as
-          (connection_timeout, read_timeout). Default: (20, 300).
+          (connection_timeout, read_timeout). The behavior described
+          above for adjusting connection timeouts on retry also applies.
+          Default: (20, 300).
 
         :api_token:
           If you're not using an API client, but only talking
 
         :api_token:
           If you're not using an API client, but only talking
@@ -463,10 +613,6 @@ class KeepClient(object):
           The default number of times to retry failed requests.
           This will be used as the default num_retries value when get() and
           put() are called.  Default 0.
           The default number of times to retry failed requests.
           This will be used as the default num_retries value when get() and
           put() are called.  Default 0.
-
-        :session:
-          The requests.Session object to use for get() and put() requests.
-          Will create one if not specified.
         """
         self.lock = threading.Lock()
         if proxy is None:
         """
         self.lock = threading.Lock()
         if proxy is None:
@@ -485,6 +631,7 @@ class KeepClient(object):
         self.block_cache = block_cache if block_cache else KeepBlockCache()
         self.timeout = timeout
         self.proxy_timeout = proxy_timeout
         self.block_cache = block_cache if block_cache else KeepBlockCache()
         self.timeout = timeout
         self.proxy_timeout = proxy_timeout
+        self._user_agent_pool = Queue.LifoQueue()
 
         if local_store:
             self.local_store = local_store
 
         if local_store:
             self.local_store = local_store
@@ -492,15 +639,16 @@ class KeepClient(object):
             self.put = self.local_store_put
         else:
             self.num_retries = num_retries
             self.put = self.local_store_put
         else:
             self.num_retries = num_retries
-            self.session = session if session is not None else requests.Session()
             if proxy:
                 if not proxy.endswith('/'):
                     proxy += '/'
                 self.api_token = api_token
             if proxy:
                 if not proxy.endswith('/'):
                     proxy += '/'
                 self.api_token = api_token
+                self._gateway_services = {}
                 self._keep_services = [{
                     'uuid': 'proxy',
                     '_service_root': proxy,
                     }]
                 self._keep_services = [{
                     'uuid': 'proxy',
                     '_service_root': proxy,
                     }]
+                self._writable_services = self._keep_services
                 self.using_proxy = True
                 self._static_services_list = True
             else:
                 self.using_proxy = True
                 self._static_services_list = True
             else:
@@ -510,18 +658,26 @@ class KeepClient(object):
                     api_client = arvados.api('v1')
                 self.api_client = api_client
                 self.api_token = api_client.api_token
                     api_client = arvados.api('v1')
                 self.api_client = api_client
                 self.api_token = api_client.api_token
+                self._gateway_services = {}
                 self._keep_services = None
                 self._keep_services = None
+                self._writable_services = None
                 self.using_proxy = None
                 self._static_services_list = False
 
                 self.using_proxy = None
                 self._static_services_list = False
 
-    def current_timeout(self):
-        """Return the appropriate timeout to use for this client: the proxy
-        timeout setting if the backend service is currently a proxy,
-        the regular timeout setting otherwise.
+    def current_timeout(self, attempt_number):
+        """Return the appropriate timeout to use for this client.
+
+        The proxy timeout setting if the backend service is currently a proxy,
+        the regular timeout setting otherwise.  The `attempt_number` indicates
+        how many times the operation has been tried already (starting from 0
+        for the first try), and scales the connection timeout portion of the
+        return value accordingly.
+
         """
         # TODO(twp): the timeout should be a property of a
         # KeepService, not a KeepClient. See #4488.
         """
         # TODO(twp): the timeout should be a property of a
         # KeepService, not a KeepClient. See #4488.
-        return self.proxy_timeout if self.using_proxy else self.timeout
+        t = self.proxy_timeout if self.using_proxy else self.timeout
+        return (t[0] * (1 << attempt_number), t[1])
 
     def build_services_list(self, force_rebuild=False):
         if (self._static_services_list or
 
     def build_services_list(self, force_rebuild=False):
         if (self._static_services_list or
@@ -533,21 +689,38 @@ class KeepClient(object):
             except Exception:  # API server predates Keep services.
                 keep_services = self.api_client.keep_disks().list()
 
             except Exception:  # API server predates Keep services.
                 keep_services = self.api_client.keep_disks().list()
 
-            self._keep_services = keep_services.execute().get('items')
-            if not self._keep_services:
+            accessible = keep_services.execute().get('items')
+            if not accessible:
                 raise arvados.errors.NoKeepServersError()
 
                 raise arvados.errors.NoKeepServersError()
 
-            self.using_proxy = any(ks.get('service_type') == 'proxy'
-                                   for ks in self._keep_services)
-
             # Precompute the base URI for each service.
             # Precompute the base URI for each service.
-            for r in self._keep_services:
-                r['_service_root'] = "{}://[{}]:{:d}/".format(
+            for r in accessible:
+                host = r['service_host']
+                if not host.startswith('[') and host.find(':') >= 0:
+                    # IPv6 URIs must be formatted like http://[::1]:80/...
+                    host = '[' + host + ']'
+                r['_service_root'] = "{}://{}:{:d}/".format(
                     'https' if r['service_ssl_flag'] else 'http',
                     'https' if r['service_ssl_flag'] else 'http',
-                    r['service_host'],
+                    host,
                     r['service_port'])
                     r['service_port'])
+
+            # Gateway services are only used when specified by UUID,
+            # so there's nothing to gain by filtering them by
+            # service_type.
+            self._gateway_services = {ks.get('uuid'): ks for ks in accessible}
+            _logger.debug(str(self._gateway_services))
+
+            self._keep_services = [
+                ks for ks in accessible
+                if ks.get('service_type') in ['disk', 'proxy']]
+            self._writable_services = [
+                ks for ks in accessible
+                if (ks.get('service_type') in ['disk', 'proxy']) and (True != ks.get('read_only'))]
             _logger.debug(str(self._keep_services))
 
             _logger.debug(str(self._keep_services))
 
+            self.using_proxy = any(ks.get('service_type') == 'proxy'
+                                   for ks in self._keep_services)
+
     def _service_weight(self, data_hash, service_uuid):
         """Compute the weight of a Keep service endpoint for a data
         block with a known hash.
     def _service_weight(self, data_hash, service_uuid):
         """Compute the weight of a Keep service endpoint for a data
         block with a known hash.
@@ -557,34 +730,53 @@ class KeepClient(object):
         """
         return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
 
         """
         return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
 
-    def weighted_service_roots(self, data_hash, force_rebuild=False):
+    def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
         """Return an array of Keep service endpoints, in the order in
         which they should be probed when reading or writing data with
         """Return an array of Keep service endpoints, in the order in
         which they should be probed when reading or writing data with
-        the given hash.
+        the given hash+hints.
         """
         self.build_services_list(force_rebuild)
 
         """
         self.build_services_list(force_rebuild)
 
-        # Sort the available services by weight (heaviest first) for
-        # this data_hash, and return their service_roots (base URIs)
+        sorted_roots = []
+
+        # Use the services indicated by the given +K@... remote
+        # service hints, if any are present and can be resolved to a
+        # URI.
+        for hint in locator.hints:
+            if hint.startswith('K@'):
+                if len(hint) == 7:
+                    sorted_roots.append(
+                        "https://keep.{}.arvadosapi.com/".format(hint[2:]))
+                elif len(hint) == 29:
+                    svc = self._gateway_services.get(hint[2:])
+                    if svc:
+                        sorted_roots.append(svc['_service_root'])
+
+        # Sort the available local services by weight (heaviest first)
+        # for this locator, and return their service_roots (base URIs)
         # in that order.
         # in that order.
-        sorted_roots = [
+        use_services = self._keep_services
+        if need_writable:
+          use_services = self._writable_services
+        sorted_roots.extend([
             svc['_service_root'] for svc in sorted(
             svc['_service_root'] for svc in sorted(
-                self._keep_services,
+                use_services,
                 reverse=True,
                 reverse=True,
-                key=lambda svc: self._service_weight(data_hash, svc['uuid']))]
-        _logger.debug(data_hash + ': ' + str(sorted_roots))
+                key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
+        _logger.debug("{}: {}".format(locator, sorted_roots))
         return sorted_roots
 
         return sorted_roots
 
-    def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
+    def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
         # roots_map is a dictionary, mapping Keep service root strings
         # to KeepService objects.  Poll for Keep services, and add any
         # new ones to roots_map.  Return the current list of local
         # root strings.
         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
         # roots_map is a dictionary, mapping Keep service root strings
         # to KeepService objects.  Poll for Keep services, and add any
         # new ones to roots_map.  Return the current list of local
         # root strings.
         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
-        local_roots = self.weighted_service_roots(md5_s, force_rebuild)
+        local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
         for root in local_roots:
             if root not in roots_map:
         for root in local_roots:
             if root not in roots_map:
-                roots_map[root] = self.KeepService(root, self.session, **headers)
+                roots_map[root] = self.KeepService(
+                    root, self._user_agent_pool, **headers)
         return local_roots
 
     @staticmethod
         return local_roots
 
     @staticmethod
@@ -608,7 +800,7 @@ class KeepClient(object):
     def get_from_cache(self, loc):
         """Fetch a block only if is in the cache, otherwise return None."""
         slot = self.block_cache.get(loc)
     def get_from_cache(self, loc):
         """Fetch a block only if is in the cache, otherwise return None."""
         slot = self.block_cache.get(loc)
-        if slot.ready.is_set():
+        if slot is not None and slot.ready.is_set():
             return slot.get()
         else:
             return None
             return slot.get()
         else:
             return None
@@ -637,29 +829,45 @@ class KeepClient(object):
         if ',' in loc_s:
             return ''.join(self.get(x) for x in loc_s.split(','))
         locator = KeepLocator(loc_s)
         if ',' in loc_s:
             return ''.join(self.get(x) for x in loc_s.split(','))
         locator = KeepLocator(loc_s)
-        expect_hash = locator.md5sum
-        slot, first = self.block_cache.reserve_cache(expect_hash)
+        slot, first = self.block_cache.reserve_cache(locator.md5sum)
         if not first:
             v = slot.get()
             return v
 
         if not first:
             v = slot.get()
             return v
 
+        # If the locator has hints specifying a prefix (indicating a
+        # remote keepproxy) or the UUID of a local gateway service,
+        # read data from the indicated service(s) instead of the usual
+        # list of local disk services.
+        hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
+                      for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
+        hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
+                           for hint in locator.hints if (
+                                   hint.startswith('K@') and
+                                   len(hint) == 29 and
+                                   self._gateway_services.get(hint[2:])
+                                   )])
+        # Map root URLs to their KeepService objects.
+        roots_map = {
+            root: self.KeepService(root, self._user_agent_pool)
+            for root in hint_roots
+        }
+
         # See #3147 for a discussion of the loop implementation.  Highlights:
         # * Refresh the list of Keep services after each failure, in case
         #   it's being updated.
         # * Retry until we succeed, we're out of retries, or every available
         #   service has returned permanent failure.
         # See #3147 for a discussion of the loop implementation.  Highlights:
         # * Refresh the list of Keep services after each failure, in case
         #   it's being updated.
         # * Retry until we succeed, we're out of retries, or every available
         #   service has returned permanent failure.
-        hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
-                      for hint in locator.hints if hint.startswith('K@')]
-        # Map root URLs their KeepService objects.
-        roots_map = {root: self.KeepService(root, self.session) for root in hint_roots}
+        sorted_roots = []
+        roots_map = {}
         blob = None
         loop = retry.RetryLoop(num_retries, self._check_loop_result,
                                backoff_start=2)
         for tries_left in loop:
             try:
         blob = None
         loop = retry.RetryLoop(num_retries, self._check_loop_result,
                                backoff_start=2)
         for tries_left in loop:
             try:
-                local_roots = self.map_new_services(
-                    roots_map, expect_hash,
-                    force_rebuild=(tries_left < num_retries))
+                sorted_roots = self.map_new_services(
+                    roots_map, locator,
+                    force_rebuild=(tries_left < num_retries),
+                    need_writable=False)
             except Exception as error:
                 loop.save_result(error)
                 continue
             except Exception as error:
                 loop.save_result(error)
                 continue
@@ -667,10 +875,10 @@ class KeepClient(object):
             # Query KeepService objects that haven't returned
             # permanent failure, in our specified shuffle order.
             services_to_try = [roots_map[root]
             # Query KeepService objects that haven't returned
             # permanent failure, in our specified shuffle order.
             services_to_try = [roots_map[root]
-                               for root in (local_roots + hint_roots)
+                               for root in sorted_roots
                                if roots_map[root].usable()]
             for keep_service in services_to_try:
                                if roots_map[root].usable()]
             for keep_service in services_to_try:
-                blob = keep_service.get(locator, timeout=self.current_timeout())
+                blob = keep_service.get(locator, timeout=self.current_timeout(num_retries-tries_left))
                 if blob is not None:
                     break
             loop.save_result((blob, len(services_to_try)))
                 if blob is not None:
                     break
             loop.save_result((blob, len(services_to_try)))
@@ -681,22 +889,17 @@ class KeepClient(object):
         if loop.success():
             return blob
 
         if loop.success():
             return blob
 
-        try:
-            all_roots = local_roots + hint_roots
-        except NameError:
-            # We never successfully fetched local_roots.
-            all_roots = hint_roots
         # Q: Including 403 is necessary for the Keep tests to continue
         # passing, but maybe they should expect KeepReadError instead?
         # Q: Including 403 is necessary for the Keep tests to continue
         # passing, but maybe they should expect KeepReadError instead?
-        not_founds = sum(1 for key in all_roots
-                         if roots_map[key].last_status() in {403, 404, 410})
-        service_errors = ((key, roots_map[key].last_result)
-                          for key in all_roots)
+        not_founds = sum(1 for key in sorted_roots
+                         if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
+        service_errors = ((key, roots_map[key].last_result()['error'])
+                          for key in sorted_roots)
         if not roots_map:
             raise arvados.errors.KeepReadError(
                 "failed to read {}: no Keep services available ({})".format(
                     loc_s, loop.last_result()))
         if not roots_map:
             raise arvados.errors.KeepReadError(
                 "failed to read {}: no Keep services available ({})".format(
                     loc_s, loop.last_result()))
-        elif not_founds == len(all_roots):
+        elif not_founds == len(sorted_roots):
             raise arvados.errors.NotFoundError(
                 "{} not found".format(loc_s), service_errors)
         else:
             raise arvados.errors.NotFoundError(
                 "{} not found".format(loc_s), service_errors)
         else:
@@ -726,11 +929,12 @@ class KeepClient(object):
         if isinstance(data, unicode):
             data = data.encode("ascii")
         elif not isinstance(data, str):
         if isinstance(data, unicode):
             data = data.encode("ascii")
         elif not isinstance(data, str):
-            raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put must be type 'str'")
+            raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
 
         data_hash = hashlib.md5(data).hexdigest()
         if copies < 1:
             return data_hash
 
         data_hash = hashlib.md5(data).hexdigest()
         if copies < 1:
             return data_hash
+        locator = KeepLocator(data_hash + '+' + str(len(data)))
 
         headers = {}
         if self.using_proxy:
 
         headers = {}
         if self.using_proxy:
@@ -743,8 +947,8 @@ class KeepClient(object):
         for tries_left in loop:
             try:
                 local_roots = self.map_new_services(
         for tries_left in loop:
             try:
                 local_roots = self.map_new_services(
-                    roots_map, data_hash,
-                    force_rebuild=(tries_left < num_retries), **headers)
+                    roots_map, locator,
+                    force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
             except Exception as error:
                 loop.save_result(error)
                 continue
             except Exception as error:
                 loop.save_result(error)
                 continue
@@ -759,7 +963,7 @@ class KeepClient(object):
                     data_hash=data_hash,
                     service_root=service_root,
                     thread_limiter=thread_limiter,
                     data_hash=data_hash,
                     service_root=service_root,
                     thread_limiter=thread_limiter,
-                    timeout=self.current_timeout())
+                    timeout=self.current_timeout(num_retries-tries_left))
                 t.start()
                 threads.append(t)
             for t in threads:
                 t.start()
                 threads.append(t)
             for t in threads:
@@ -773,9 +977,9 @@ class KeepClient(object):
                 "failed to write {}: no Keep services available ({})".format(
                     data_hash, loop.last_result()))
         else:
                 "failed to write {}: no Keep services available ({})".format(
                     data_hash, loop.last_result()))
         else:
-            service_errors = ((key, roots_map[key].last_result)
+            service_errors = ((key, roots_map[key].last_result()['error'])
                               for key in local_roots
                               for key in local_roots
-                              if not roots_map[key].success_flag)
+                              if roots_map[key].last_result()['error'])
             raise arvados.errors.KeepWriteError(
                 "failed to write {} (wanted {} copies but wrote {})".format(
                     data_hash, copies, thread_limiter.done()), service_errors, label="service")
             raise arvados.errors.KeepWriteError(
                 "failed to write {} (wanted {} copies but wrote {})".format(
                     data_hash, copies, thread_limiter.done()), service_errors, label="service")