Merge branch '11507-deleted-bufferblocks' refs #11507
[arvados.git] / sdk / python / arvados / keep.py
index 71dc7ce7af863f60fb9e741b7bbc1231c22b146b..5b4770c4d0dca8824c268448296d0658c8ba04d8 100644 (file)
@@ -1,25 +1,19 @@
-import gflags
+import cStringIO
+import collections
+import datetime
+import hashlib
 import logging
 import logging
+import math
 import os
 import os
-import pprint
-import sys
-import types
-import subprocess
-import json
-import UserDict
+import pycurl
+import Queue
 import re
 import re
-import hashlib
-import string
-import bz2
-import zlib
-import fcntl
-import time
+import socket
+import ssl
+import sys
 import threading
 import timer
 import threading
 import timer
-import datetime
-import ssl
-import socket
-import requests
+import urlparse
 
 import arvados
 import arvados.config as config
 
 import arvados
 import arvados.config as config
@@ -30,6 +24,18 @@ import arvados.util
 _logger = logging.getLogger('arvados.keep')
 global_client_object = None
 
 _logger = logging.getLogger('arvados.keep')
 global_client_object = None
 
+
+# Monkey patch TCP constants when not available (apple). Values sourced from:
+# http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
+if sys.platform == 'darwin':
+    if not hasattr(socket, 'TCP_KEEPALIVE'):
+        socket.TCP_KEEPALIVE = 0x010
+    if not hasattr(socket, 'TCP_KEEPINTVL'):
+        socket.TCP_KEEPINTVL = 0x101
+    if not hasattr(socket, 'TCP_KEEPCNT'):
+        socket.TCP_KEEPCNT = 0x102
+
+
 class KeepLocator(object):
     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
 class KeepLocator(object):
     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
@@ -46,7 +52,7 @@ class KeepLocator(object):
             self.size = None
         for hint in pieces:
             if self.HINT_RE.match(hint) is None:
             self.size = None
         for hint in pieces:
             if self.HINT_RE.match(hint) is None:
-                raise ValueError("unrecognized hint data {}".format(hint))
+                raise ValueError("invalid hint format: {}".format(hint))
             elif hint.startswith('A'):
                 self.parse_permission_hint(hint)
             else:
             elif hint.startswith('A'):
                 self.parse_permission_hint(hint)
             else:
@@ -58,6 +64,12 @@ class KeepLocator(object):
                              self.permission_hint()] + self.hints
             if s is not None)
 
                              self.permission_hint()] + self.hints
             if s is not None)
 
+    def stripped(self):
+        if self.size is not None:
+            return "%s+%i" % (self.md5sum, self.size)
+        else:
+            return self.md5sum
+
     def _make_hex_prop(name, length):
         # Build and return a new property with the given name that
         # must be a hex string of the given length.
     def _make_hex_prop(name, length):
         # Build and return a new property with the given name that
         # must be a hex string of the given length.
@@ -66,7 +78,7 @@ class KeepLocator(object):
             return getattr(self, data_name)
         def setter(self, hex_str):
             if not arvados.util.is_hex(hex_str, length):
             return getattr(self, data_name)
         def setter(self, hex_str):
             if not arvados.util.is_hex(hex_str, length):
-                raise ValueError("{} must be a {}-digit hex string: {}".
+                raise ValueError("{} is not a {}-digit hex string: {}".
                                  format(name, length, hex_str))
             setattr(self, data_name, hex_str)
         return property(getter, setter)
                                  format(name, length, hex_str))
             setattr(self, data_name, hex_str)
         return property(getter, setter)
@@ -150,6 +162,8 @@ class KeepBlockCache(object):
         self._cache_lock = threading.Lock()
 
     class CacheSlot(object):
         self._cache_lock = threading.Lock()
 
     class CacheSlot(object):
+        __slots__ = ("locator", "ready", "content")
+
         def __init__(self, locator):
             self.locator = locator
             self.ready = threading.Event()
         def __init__(self, locator):
             self.locator = locator
             self.ready = threading.Event()
@@ -171,8 +185,7 @@ class KeepBlockCache(object):
 
     def cap_cache(self):
         '''Cap the cache size to self.cache_max'''
 
     def cap_cache(self):
         '''Cap the cache size to self.cache_max'''
-        self._cache_lock.acquire()
-        try:
+        with self._cache_lock:
             # Select all slots except those where ready.is_set() and content is
             # None (that means there was an error reading the block).
             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
             # Select all slots except those where ready.is_set() and content is
             # None (that means there was an error reading the block).
             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
@@ -183,264 +196,528 @@ class KeepBlockCache(object):
                         del self._cache[i]
                         break
                 sm = sum([slot.size() for slot in self._cache])
                         del self._cache[i]
                         break
                 sm = sum([slot.size() for slot in self._cache])
-        finally:
-            self._cache_lock.release()
+
+    def _get(self, locator):
+        # Test if the locator is already in the cache
+        for i in xrange(0, len(self._cache)):
+            if self._cache[i].locator == locator:
+                n = self._cache[i]
+                if i != 0:
+                    # move it to the front
+                    del self._cache[i]
+                    self._cache.insert(0, n)
+                return n
+        return None
+
+    def get(self, locator):
+        with self._cache_lock:
+            return self._get(locator)
 
     def reserve_cache(self, locator):
         '''Reserve a cache slot for the specified locator,
         or return the existing slot.'''
 
     def reserve_cache(self, locator):
         '''Reserve a cache slot for the specified locator,
         or return the existing slot.'''
-        self._cache_lock.acquire()
-        try:
-            # Test if the locator is already in the cache
-            for i in xrange(0, len(self._cache)):
-                if self._cache[i].locator == locator:
-                    n = self._cache[i]
-                    if i != 0:
-                        # move it to the front
-                        del self._cache[i]
-                        self._cache.insert(0, n)
-                    return n, False
+        with self._cache_lock:
+            n = self._get(locator)
+            if n:
+                return n, False
+            else:
+                # Add a new cache slot for the locator
+                n = KeepBlockCache.CacheSlot(locator)
+                self._cache.insert(0, n)
+                return n, True
+
+class Counter(object):
+    def __init__(self, v=0):
+        self._lk = threading.Lock()
+        self._val = v
+
+    def add(self, v):
+        with self._lk:
+            self._val += v
+
+    def get(self):
+        with self._lk:
+            return self._val
 
 
-            # Add a new cache slot for the locator
-            n = KeepBlockCache.CacheSlot(locator)
-            self._cache.insert(0, n)
-            return n, True
-        finally:
-            self._cache_lock.release()
 
 class KeepClient(object):
 
     # Default Keep server connection timeout:  2 seconds
 
 class KeepClient(object):
 
     # Default Keep server connection timeout:  2 seconds
-    # Default Keep server read timeout:      300 seconds
+    # Default Keep server read timeout:       256 seconds
+    # Default Keep server bandwidth minimum:  32768 bytes per second
     # Default Keep proxy connection timeout:  20 seconds
     # Default Keep proxy connection timeout:  20 seconds
-    # Default Keep proxy read timeout:       300 seconds
-    DEFAULT_TIMEOUT = (2, 300)
-    DEFAULT_PROXY_TIMEOUT = (20, 300)
+    # Default Keep proxy read timeout:        256 seconds
+    # Default Keep proxy bandwidth minimum:   32768 bytes per second
+    DEFAULT_TIMEOUT = (2, 256, 32768)
+    DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
 
 
-    class ThreadLimiter(object):
-        """
-        Limit the number of threads running at a given time to
-        {desired successes} minus {successes reported}. When successes
-        reported == desired, wake up the remaining threads and tell
-        them to quit.
 
 
-        Should be used in a "with" block.
+    class KeepService(object):
+        """Make requests to a single Keep service, and track results.
+
+        A KeepService is intended to last long enough to perform one
+        transaction (GET or PUT) against one Keep service. This can
+        involve calling either get() or put() multiple times in order
+        to retry after transient failures. However, calling both get()
+        and put() on a single instance -- or using the same instance
+        to access two different Keep services -- will not produce
+        sensible behavior.
         """
         """
-        def __init__(self, todo):
-            self._todo = todo
-            self._done = 0
-            self._response = None
-            self._todo_lock = threading.Semaphore(todo)
-            self._done_lock = threading.Lock()
-
-        def __enter__(self):
-            self._todo_lock.acquire()
-            return self
-
-        def __exit__(self, type, value, traceback):
-            self._todo_lock.release()
-
-        def shall_i_proceed(self):
-            """
-            Return true if the current thread should do stuff. Return
-            false if the current thread should just stop.
-            """
-            with self._done_lock:
-                return (self._done < self._todo)
-
-        def save_response(self, response_body, replicas_stored):
-            """
-            Records a response body (a locator, possibly signed) returned by
-            the Keep server.  It is not necessary to save more than
-            one response, since we presume that any locator returned
-            in response to a successful request is valid.
-            """
-            with self._done_lock:
-                self._done += replicas_stored
-                self._response = response_body
-
-        def response(self):
-            """
-            Returns the body from the response to a PUT request.
-            """
-            with self._done_lock:
-                return self._response
-
-        def done(self):
-            """
-            Return how many successes were reported.
-            """
-            with self._done_lock:
-                return self._done
-
 
 
-    class KeepService(object):
-        # Make requests to a single Keep service, and track results.
-        HTTP_ERRORS = (requests.exceptions.RequestException,
-                       socket.error, ssl.SSLError)
+        HTTP_ERRORS = (
+            socket.error,
+            ssl.SSLError,
+            arvados.errors.HttpError,
+        )
 
 
-        def __init__(self, root, **headers):
+        def __init__(self, root, user_agent_pool=Queue.LifoQueue(),
+                     upload_counter=None,
+                     download_counter=None, **headers):
             self.root = root
             self.root = root
-            self.last_result = None
-            self.success_flag = None
+            self._user_agent_pool = user_agent_pool
+            self._result = {'error': None}
+            self._usable = True
+            self._session = None
             self.get_headers = {'Accept': 'application/octet-stream'}
             self.get_headers.update(headers)
             self.put_headers = headers
             self.get_headers = {'Accept': 'application/octet-stream'}
             self.get_headers.update(headers)
             self.put_headers = headers
+            self.upload_counter = upload_counter
+            self.download_counter = download_counter
 
         def usable(self):
 
         def usable(self):
-            return self.success_flag is not False
+            """Is it worth attempting a request?"""
+            return self._usable
 
         def finished(self):
 
         def finished(self):
-            return self.success_flag is not None
+            """Did the request succeed or encounter permanent failure?"""
+            return self._result['error'] == False or not self._usable
 
 
-        def last_status(self):
+        def last_result(self):
+            return self._result
+
+        def _get_user_agent(self):
             try:
             try:
-                return self.last_result.status_code
-            except AttributeError:
-                return None
+                return self._user_agent_pool.get(block=False)
+            except Queue.Empty:
+                return pycurl.Curl()
 
 
-        def get(self, locator, timeout=None):
+        def _put_user_agent(self, ua):
+            try:
+                ua.reset()
+                self._user_agent_pool.put(ua, block=False)
+            except:
+                ua.close()
+
+        def _socket_open(self, *args, **kwargs):
+            if len(args) + len(kwargs) == 2:
+                return self._socket_open_pycurl_7_21_5(*args, **kwargs)
+            else:
+                return self._socket_open_pycurl_7_19_3(*args, **kwargs)
+
+        def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
+            return self._socket_open_pycurl_7_21_5(
+                purpose=None,
+                address=collections.namedtuple(
+                    'Address', ['family', 'socktype', 'protocol', 'addr'],
+                )(family, socktype, protocol, address))
+
+        def _socket_open_pycurl_7_21_5(self, purpose, address):
+            """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
+            s = socket.socket(address.family, address.socktype, address.protocol)
+            s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
+            # Will throw invalid protocol error on mac. This test prevents that.
+            if hasattr(socket, 'TCP_KEEPIDLE'):
+                s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
+            s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
+            return s
+
+        def get(self, locator, method="GET", timeout=None):
             # locator is a KeepLocator object.
             url = self.root + str(locator)
             # locator is a KeepLocator object.
             url = self.root + str(locator)
-            _logger.debug("Request: GET %s", url)
+            _logger.debug("Request: %s %s", method, url)
+            curl = self._get_user_agent()
+            ok = None
             try:
                 with timer.Timer() as t:
             try:
                 with timer.Timer() as t:
-                    result = requests.get(url.encode('utf-8'),
-                                          headers=self.get_headers,
-                                          timeout=timeout)
+                    self._headers = {}
+                    response_body = cStringIO.StringIO()
+                    curl.setopt(pycurl.NOSIGNAL, 1)
+                    curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
+                    curl.setopt(pycurl.URL, url.encode('utf-8'))
+                    curl.setopt(pycurl.HTTPHEADER, [
+                        '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
+                    curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
+                    curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
+                    if method == "HEAD":
+                        curl.setopt(pycurl.NOBODY, True)
+                    self._setcurltimeouts(curl, timeout)
+
+                    try:
+                        curl.perform()
+                    except Exception as e:
+                        raise arvados.errors.HttpError(0, str(e))
+                    self._result = {
+                        'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
+                        'body': response_body.getvalue(),
+                        'headers': self._headers,
+                        'error': False,
+                    }
+
+                ok = retry.check_http_response_success(self._result['status_code'])
+                if not ok:
+                    self._result['error'] = arvados.errors.HttpError(
+                        self._result['status_code'],
+                        self._headers.get('x-status-line', 'Error'))
             except self.HTTP_ERRORS as e:
             except self.HTTP_ERRORS as e:
-                _logger.debug("Request fail: GET %s => %s: %s",
-                              url, type(e), str(e))
-                self.last_result = e
+                self._result = {
+                    'error': e,
+                }
+            self._usable = ok != False
+            if self._result.get('status_code', None):
+                # The client worked well enough to get an HTTP status
+                # code, so presumably any problems are just on the
+                # server side and it's OK to reuse the client.
+                self._put_user_agent(curl)
             else:
             else:
-                self.last_result = result
-                self.success_flag = retry.check_http_response_success(result)
-                content = result.content
-                _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
-                             self.last_status(), len(content), t.msecs,
-                             (len(content)/(1024.0*1024))/t.secs)
-                if self.success_flag:
-                    resp_md5 = hashlib.md5(content).hexdigest()
-                    if resp_md5 == locator.md5sum:
-                        return content
-                    _logger.warning("Checksum fail: md5(%s) = %s",
-                                    url, resp_md5)
-            return None
+                # Don't return this client to the pool, in case it's
+                # broken.
+                curl.close()
+            if not ok:
+                _logger.debug("Request fail: GET %s => %s: %s",
+                              url, type(self._result['error']), str(self._result['error']))
+                return None
+            if method == "HEAD":
+                _logger.info("HEAD %s: %s bytes",
+                         self._result['status_code'],
+                         self._result.get('content-length'))
+                return True
+
+            _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
+                         self._result['status_code'],
+                         len(self._result['body']),
+                         t.msecs,
+                         (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
+
+            if self.download_counter:
+                self.download_counter.add(len(self._result['body']))
+            resp_md5 = hashlib.md5(self._result['body']).hexdigest()
+            if resp_md5 != locator.md5sum:
+                _logger.warning("Checksum fail: md5(%s) = %s",
+                                url, resp_md5)
+                self._result['error'] = arvados.errors.HttpError(
+                    0, 'Checksum fail')
+                return None
+            return self._result['body']
 
         def put(self, hash_s, body, timeout=None):
             url = self.root + hash_s
             _logger.debug("Request: PUT %s", url)
 
         def put(self, hash_s, body, timeout=None):
             url = self.root + hash_s
             _logger.debug("Request: PUT %s", url)
+            curl = self._get_user_agent()
+            ok = None
             try:
             try:
-                result = requests.put(url.encode('utf-8'),
-                                      data=body,
-                                      headers=self.put_headers,
-                                      timeout=timeout)
+                with timer.Timer() as t:
+                    self._headers = {}
+                    body_reader = cStringIO.StringIO(body)
+                    response_body = cStringIO.StringIO()
+                    curl.setopt(pycurl.NOSIGNAL, 1)
+                    curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
+                    curl.setopt(pycurl.URL, url.encode('utf-8'))
+                    # Using UPLOAD tells cURL to wait for a "go ahead" from the
+                    # Keep server (in the form of a HTTP/1.1 "100 Continue"
+                    # response) instead of sending the request body immediately.
+                    # This allows the server to reject the request if the request
+                    # is invalid or the server is read-only, without waiting for
+                    # the client to send the entire block.
+                    curl.setopt(pycurl.UPLOAD, True)
+                    curl.setopt(pycurl.INFILESIZE, len(body))
+                    curl.setopt(pycurl.READFUNCTION, body_reader.read)
+                    curl.setopt(pycurl.HTTPHEADER, [
+                        '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
+                    curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
+                    curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
+                    self._setcurltimeouts(curl, timeout)
+                    try:
+                        curl.perform()
+                    except Exception as e:
+                        raise arvados.errors.HttpError(0, str(e))
+                    self._result = {
+                        'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
+                        'body': response_body.getvalue(),
+                        'headers': self._headers,
+                        'error': False,
+                    }
+                ok = retry.check_http_response_success(self._result['status_code'])
+                if not ok:
+                    self._result['error'] = arvados.errors.HttpError(
+                        self._result['status_code'],
+                        self._headers.get('x-status-line', 'Error'))
             except self.HTTP_ERRORS as e:
             except self.HTTP_ERRORS as e:
-                _logger.debug("Request fail: PUT %s => %s: %s",
-                              url, type(e), str(e))
-                self.last_result = e
+                self._result = {
+                    'error': e,
+                }
+            self._usable = ok != False # still usable if ok is True or None
+            if self._result.get('status_code', None):
+                # Client is functional. See comment in get().
+                self._put_user_agent(curl)
             else:
             else:
-                self.last_result = result
-                self.success_flag = retry.check_http_response_success(result)
-            return self.success_flag
-
+                curl.close()
+            if not ok:
+                _logger.debug("Request fail: PUT %s => %s: %s",
+                              url, type(self._result['error']), str(self._result['error']))
+                return False
+            _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
+                         self._result['status_code'],
+                         len(body),
+                         t.msecs,
+                         (len(body)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
+            if self.upload_counter:
+                self.upload_counter.add(len(body))
+            return True
 
 
+        def _setcurltimeouts(self, curl, timeouts):
+            if not timeouts:
+                return
+            elif isinstance(timeouts, tuple):
+                if len(timeouts) == 2:
+                    conn_t, xfer_t = timeouts
+                    bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
+                else:
+                    conn_t, xfer_t, bandwidth_bps = timeouts
+            else:
+                conn_t, xfer_t = (timeouts, timeouts)
+                bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
+            curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
+            curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
+            curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
+
+        def _headerfunction(self, header_line):
+            header_line = header_line.decode('iso-8859-1')
+            if ':' in header_line:
+                name, value = header_line.split(':', 1)
+                name = name.strip().lower()
+                value = value.strip()
+            elif self._headers:
+                name = self._lastheadername
+                value = self._headers[name] + ' ' + header_line.strip()
+            elif header_line.startswith('HTTP/'):
+                name = 'x-status-line'
+                value = header_line
+            else:
+                _logger.error("Unexpected header line: %s", header_line)
+                return
+            self._lastheadername = name
+            self._headers[name] = value
+            # Returning None implies all bytes were written
+    
+
+    class KeepWriterQueue(Queue.Queue):
+        def __init__(self, copies):
+            Queue.Queue.__init__(self) # Old-style superclass
+            self.wanted_copies = copies
+            self.successful_copies = 0
+            self.response = None
+            self.successful_copies_lock = threading.Lock()
+            self.pending_tries = copies
+            self.pending_tries_notification = threading.Condition()
+        
+        def write_success(self, response, replicas_nr):
+            with self.successful_copies_lock:
+                self.successful_copies += replicas_nr
+                self.response = response
+            with self.pending_tries_notification:
+                self.pending_tries_notification.notify_all()
+        
+        def write_fail(self, ks):
+            with self.pending_tries_notification:
+                self.pending_tries += 1
+                self.pending_tries_notification.notify()
+        
+        def pending_copies(self):
+            with self.successful_copies_lock:
+                return self.wanted_copies - self.successful_copies
+
+        def get_next_task(self):
+            with self.pending_tries_notification:
+                while True:
+                    if self.pending_copies() < 1:
+                        # This notify_all() is unnecessary --
+                        # write_success() already called notify_all()
+                        # when pending<1 became true, so it's not
+                        # possible for any other thread to be in
+                        # wait() now -- but it's cheap insurance
+                        # against deadlock so we do it anyway:
+                        self.pending_tries_notification.notify_all()
+                        # Drain the queue and then raise Queue.Empty
+                        while True:
+                            self.get_nowait()
+                            self.task_done()
+                    elif self.pending_tries > 0:
+                        service, service_root = self.get_nowait()
+                        if service.finished():
+                            self.task_done()
+                            continue
+                        self.pending_tries -= 1
+                        return service, service_root
+                    elif self.empty():
+                        self.pending_tries_notification.notify_all()
+                        raise Queue.Empty
+                    else:
+                        self.pending_tries_notification.wait()
+
+
+    class KeepWriterThreadPool(object):
+        def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None):
+            self.total_task_nr = 0
+            self.wanted_copies = copies
+            if (not max_service_replicas) or (max_service_replicas >= copies):
+                num_threads = 1
+            else:
+                num_threads = int(math.ceil(float(copies) / max_service_replicas))
+            _logger.debug("Pool max threads is %d", num_threads)
+            self.workers = []
+            self.queue = KeepClient.KeepWriterQueue(copies)
+            # Create workers
+            for _ in range(num_threads):
+                w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
+                self.workers.append(w)
+        
+        def add_task(self, ks, service_root):
+            self.queue.put((ks, service_root))
+            self.total_task_nr += 1
+        
+        def done(self):
+            return self.queue.successful_copies
+        
+        def join(self):
+            # Start workers
+            for worker in self.workers:
+                worker.start()
+            # Wait for finished work
+            self.queue.join()
+        
+        def response(self):
+            return self.queue.response
+    
+    
     class KeepWriterThread(threading.Thread):
     class KeepWriterThread(threading.Thread):
-        """
-        Write a blob of data to the given Keep server. On success, call
-        save_response() of the given ThreadLimiter to save the returned
-        locator.
-        """
-        def __init__(self, keep_service, **kwargs):
-            super(KeepClient.KeepWriterThread, self).__init__()
-            self.service = keep_service
-            self.args = kwargs
-            self._success = False
+        TaskFailed = RuntimeError()
 
 
-        def success(self):
-            return self._success
+        def __init__(self, queue, data, data_hash, timeout=None):
+            super(KeepClient.KeepWriterThread, self).__init__()
+            self.timeout = timeout
+            self.queue = queue
+            self.data = data
+            self.data_hash = data_hash
+            self.daemon = True
 
         def run(self):
 
         def run(self):
-            with self.args['thread_limiter'] as limiter:
-                if not limiter.shall_i_proceed():
-                    # My turn arrived, but the job has been done without
-                    # me.
+            while True:
+                try:
+                    service, service_root = self.queue.get_next_task()
+                except Queue.Empty:
                     return
                     return
-                self.run_with_limiter(limiter)
-
-        def run_with_limiter(self, limiter):
-            if self.service.finished():
-                return
-            _logger.debug("KeepWriterThread %s proceeding %s %s",
-                          str(threading.current_thread()),
-                          self.args['data_hash'],
-                          self.args['service_root'])
-            self._success = bool(self.service.put(
-                self.args['data_hash'],
-                self.args['data'],
-                timeout=self.args.get('timeout', None)))
-            status = self.service.last_status()
-            if self._success:
-                result = self.service.last_result
-                _logger.debug("KeepWriterThread %s succeeded %s %s",
-                              str(threading.current_thread()),
-                              self.args['data_hash'],
-                              self.args['service_root'])
-                # Tick the 'done' counter for the number of replica
-                # reported stored by the server, for the case that
-                # we're talking to a proxy or other backend that
-                # stores to multiple copies for us.
                 try:
                 try:
-                    replicas_stored = int(result.headers['x-keep-replicas-stored'])
-                except (KeyError, ValueError):
-                    replicas_stored = 1
-                limiter.save_response(result.content.strip(), replicas_stored)
-            elif status is not None:
-                _logger.debug("Request fail: PUT %s => %s %s",
-                              self.args['data_hash'], status,
-                              self.service.last_result.content)
+                    locator, copies = self.do_task(service, service_root)
+                except Exception as e:
+                    if e is not self.TaskFailed:
+                        _logger.exception("Exception in KeepWriterThread")
+                    self.queue.write_fail(service)
+                else:
+                    self.queue.write_success(locator, copies)
+                finally:
+                    self.queue.task_done()
+
+        def do_task(self, service, service_root):
+            success = bool(service.put(self.data_hash,
+                                        self.data,
+                                        timeout=self.timeout))
+            result = service.last_result()
+
+            if not success:
+                if result.get('status_code', None):
+                    _logger.debug("Request fail: PUT %s => %s %s",
+                                  self.data_hash,
+                                  result['status_code'],
+                                  result['body'])
+                raise self.TaskFailed
+
+            _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
+                          str(threading.current_thread()),
+                          self.data_hash,
+                          len(self.data),
+                          service_root)
+            try:
+                replicas_stored = int(result['headers']['x-keep-replicas-stored'])
+            except (KeyError, ValueError):
+                replicas_stored = 1
+
+            return result['body'].strip(), replicas_stored
 
 
     def __init__(self, api_client=None, proxy=None,
                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
                  api_token=None, local_store=None, block_cache=None,
 
 
     def __init__(self, api_client=None, proxy=None,
                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
                  api_token=None, local_store=None, block_cache=None,
-                 num_retries=0):
+                 num_retries=0, session=None):
         """Initialize a new KeepClient.
 
         Arguments:
         """Initialize a new KeepClient.
 
         Arguments:
-        * api_client: The API client to use to find Keep services.  If not
+        :api_client:
+          The API client to use to find Keep services.  If not
           provided, KeepClient will build one from available Arvados
           configuration.
           provided, KeepClient will build one from available Arvados
           configuration.
-        * proxy: If specified, this KeepClient will send requests to this
-          Keep proxy.  Otherwise, KeepClient will fall back to the setting
-          of the ARVADOS_KEEP_PROXY configuration setting.  If you want to
-          ensure KeepClient does not use a proxy, pass in an empty string.
-        * timeout: The timeout (in seconds) for HTTP requests to Keep
-          non-proxy servers.  A tuple of two floats is interpreted as
-          (connection_timeout, read_timeout): see
-          http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
-          Default: (2, 300).
-        * proxy_timeout: The timeout (in seconds) for HTTP requests to
-          Keep proxies. A tuple of two floats is interpreted as
-          (connection_timeout, read_timeout). Default: (20, 300).
-        * api_token: If you're not using an API client, but only talking
+
+        :proxy:
+          If specified, this KeepClient will send requests to this Keep
+          proxy.  Otherwise, KeepClient will fall back to the setting of the
+          ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
+          If you want to KeepClient does not use a proxy, pass in an empty
+          string.
+
+        :timeout:
+          The initial timeout (in seconds) for HTTP requests to Keep
+          non-proxy servers.  A tuple of three floats is interpreted as
+          (connection_timeout, read_timeout, minimum_bandwidth). A connection
+          will be aborted if the average traffic rate falls below
+          minimum_bandwidth bytes per second over an interval of read_timeout
+          seconds. Because timeouts are often a result of transient server
+          load, the actual connection timeout will be increased by a factor
+          of two on each retry.
+          Default: (2, 256, 32768).
+
+        :proxy_timeout:
+          The initial timeout (in seconds) for HTTP requests to
+          Keep proxies. A tuple of three floats is interpreted as
+          (connection_timeout, read_timeout, minimum_bandwidth). The behavior
+          described above for adjusting connection timeouts on retry also
+          applies.
+          Default: (20, 256, 32768).
+
+        :api_token:
+          If you're not using an API client, but only talking
           directly to a Keep proxy, this parameter specifies an API token
           to authenticate Keep requests.  It is an error to specify both
           api_client and api_token.  If you specify neither, KeepClient
           will use one available from the Arvados configuration.
           directly to a Keep proxy, this parameter specifies an API token
           to authenticate Keep requests.  It is an error to specify both
           api_client and api_token.  If you specify neither, KeepClient
           will use one available from the Arvados configuration.
-        * local_store: If specified, this KeepClient will bypass Keep
+
+        :local_store:
+          If specified, this KeepClient will bypass Keep
           services, and save data to the named directory.  If unspecified,
           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
           environment variable.  If you want to ensure KeepClient does not
           use local storage, pass in an empty string.  This is primarily
           intended to mock a server for testing.
           services, and save data to the named directory.  If unspecified,
           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
           environment variable.  If you want to ensure KeepClient does not
           use local storage, pass in an empty string.  This is primarily
           intended to mock a server for testing.
-        * num_retries: The default number of times to retry failed requests.
+
+        :num_retries:
+          The default number of times to retry failed requests.
           This will be used as the default num_retries value when get() and
           put() are called.  Default 0.
         """
         self.lock = threading.Lock()
         if proxy is None:
           This will be used as the default num_retries value when get() and
           put() are called.  Default 0.
         """
         self.lock = threading.Lock()
         if proxy is None:
-            proxy = config.get('ARVADOS_KEEP_PROXY')
+            if config.get('ARVADOS_KEEP_SERVICES'):
+                proxy = config.get('ARVADOS_KEEP_SERVICES')
+            else:
+                proxy = config.get('ARVADOS_KEEP_PROXY')
         if api_token is None:
             if api_client is None:
                 api_token = config.get('ARVADOS_API_TOKEN')
         if api_token is None:
             if api_client is None:
                 api_token = config.get('ARVADOS_API_TOKEN')
@@ -455,6 +732,13 @@ class KeepClient(object):
         self.block_cache = block_cache if block_cache else KeepBlockCache()
         self.timeout = timeout
         self.proxy_timeout = proxy_timeout
         self.block_cache = block_cache if block_cache else KeepBlockCache()
         self.timeout = timeout
         self.proxy_timeout = proxy_timeout
+        self._user_agent_pool = Queue.LifoQueue()
+        self.upload_counter = Counter()
+        self.download_counter = Counter()
+        self.put_counter = Counter()
+        self.get_counter = Counter()
+        self.hits_counter = Counter()
+        self.misses_counter = Counter()
 
         if local_store:
             self.local_store = local_store
 
         if local_store:
             self.local_store = local_store
@@ -462,14 +746,24 @@ class KeepClient(object):
             self.put = self.local_store_put
         else:
             self.num_retries = num_retries
             self.put = self.local_store_put
         else:
             self.num_retries = num_retries
+            self.max_replicas_per_service = None
             if proxy:
             if proxy:
-                if not proxy.endswith('/'):
-                    proxy += '/'
+                proxy_uris = proxy.split()
+                for i in range(len(proxy_uris)):
+                    if not proxy_uris[i].endswith('/'):
+                        proxy_uris[i] += '/'
+                    # URL validation
+                    url = urlparse.urlparse(proxy_uris[i])
+                    if not (url.scheme and url.netloc):
+                        raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
                 self.api_token = api_token
                 self.api_token = api_token
+                self._gateway_services = {}
                 self._keep_services = [{
                 self._keep_services = [{
-                    'uuid': 'proxy',
-                    '_service_root': proxy,
-                    }]
+                    'uuid': "00000-bi6l4-%015d" % idx,
+                    'service_type': 'proxy',
+                    '_service_root': uri,
+                    } for idx, uri in enumerate(proxy_uris)]
+                self._writable_services = self._keep_services
                 self.using_proxy = True
                 self._static_services_list = True
             else:
                 self.using_proxy = True
                 self._static_services_list = True
             else:
@@ -479,18 +773,32 @@ class KeepClient(object):
                     api_client = arvados.api('v1')
                 self.api_client = api_client
                 self.api_token = api_client.api_token
                     api_client = arvados.api('v1')
                 self.api_client = api_client
                 self.api_token = api_client.api_token
+                self._gateway_services = {}
                 self._keep_services = None
                 self._keep_services = None
+                self._writable_services = None
                 self.using_proxy = None
                 self._static_services_list = False
 
                 self.using_proxy = None
                 self._static_services_list = False
 
-    def current_timeout(self):
-        """Return the appropriate timeout to use for this client: the proxy
-        timeout setting if the backend service is currently a proxy,
-        the regular timeout setting otherwise.
+    def current_timeout(self, attempt_number):
+        """Return the appropriate timeout to use for this client.
+
+        The proxy timeout setting if the backend service is currently a proxy,
+        the regular timeout setting otherwise.  The `attempt_number` indicates
+        how many times the operation has been tried already (starting from 0
+        for the first try), and scales the connection timeout portion of the
+        return value accordingly.
+
         """
         # TODO(twp): the timeout should be a property of a
         # KeepService, not a KeepClient. See #4488.
         """
         # TODO(twp): the timeout should be a property of a
         # KeepService, not a KeepClient. See #4488.
-        return self.proxy_timeout if self.using_proxy else self.timeout
+        t = self.proxy_timeout if self.using_proxy else self.timeout
+        if len(t) == 2:
+            return (t[0] * (1 << attempt_number), t[1])
+        else:
+            return (t[0] * (1 << attempt_number), t[1], t[2])
+    def _any_nondisk_services(self, service_list):
+        return any(ks.get('service_type', 'disk') != 'disk'
+                   for ks in service_list)
 
     def build_services_list(self, force_rebuild=False):
         if (self._static_services_list or
 
     def build_services_list(self, force_rebuild=False):
         if (self._static_services_list or
@@ -502,20 +810,38 @@ class KeepClient(object):
             except Exception:  # API server predates Keep services.
                 keep_services = self.api_client.keep_disks().list()
 
             except Exception:  # API server predates Keep services.
                 keep_services = self.api_client.keep_disks().list()
 
-            self._keep_services = keep_services.execute().get('items')
-            if not self._keep_services:
+            # Gateway services are only used when specified by UUID,
+            # so there's nothing to gain by filtering them by
+            # service_type.
+            self._gateway_services = {ks['uuid']: ks for ks in
+                                      keep_services.execute()['items']}
+            if not self._gateway_services:
                 raise arvados.errors.NoKeepServersError()
 
                 raise arvados.errors.NoKeepServersError()
 
-            self.using_proxy = any(ks.get('service_type') == 'proxy'
-                                   for ks in self._keep_services)
-
             # Precompute the base URI for each service.
             # Precompute the base URI for each service.
-            for r in self._keep_services:
-                r['_service_root'] = "{}://[{}]:{:d}/".format(
+            for r in self._gateway_services.itervalues():
+                host = r['service_host']
+                if not host.startswith('[') and host.find(':') >= 0:
+                    # IPv6 URIs must be formatted like http://[::1]:80/...
+                    host = '[' + host + ']'
+                r['_service_root'] = "{}://{}:{:d}/".format(
                     'https' if r['service_ssl_flag'] else 'http',
                     'https' if r['service_ssl_flag'] else 'http',
-                    r['service_host'],
+                    host,
                     r['service_port'])
                     r['service_port'])
-            _logger.debug(str(self._keep_services))
+
+            _logger.debug(str(self._gateway_services))
+            self._keep_services = [
+                ks for ks in self._gateway_services.itervalues()
+                if not ks.get('service_type', '').startswith('gateway:')]
+            self._writable_services = [ks for ks in self._keep_services
+                                       if not ks.get('read_only')]
+
+            # For disk type services, max_replicas_per_service is 1
+            # It is unknown (unlimited) for other service types.
+            if self._any_nondisk_services(self._writable_services):
+                self.max_replicas_per_service = None
+            else:
+                self.max_replicas_per_service = 1
 
     def _service_weight(self, data_hash, service_uuid):
         """Compute the weight of a Keep service endpoint for a data
 
     def _service_weight(self, data_hash, service_uuid):
         """Compute the weight of a Keep service endpoint for a data
@@ -526,34 +852,56 @@ class KeepClient(object):
         """
         return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
 
         """
         return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
 
-    def weighted_service_roots(self, data_hash, force_rebuild=False):
+    def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
         """Return an array of Keep service endpoints, in the order in
         which they should be probed when reading or writing data with
         """Return an array of Keep service endpoints, in the order in
         which they should be probed when reading or writing data with
-        the given hash.
+        the given hash+hints.
         """
         self.build_services_list(force_rebuild)
 
         """
         self.build_services_list(force_rebuild)
 
-        # Sort the available services by weight (heaviest first) for
-        # this data_hash, and return their service_roots (base URIs)
+        sorted_roots = []
+        # Use the services indicated by the given +K@... remote
+        # service hints, if any are present and can be resolved to a
+        # URI.
+        for hint in locator.hints:
+            if hint.startswith('K@'):
+                if len(hint) == 7:
+                    sorted_roots.append(
+                        "https://keep.{}.arvadosapi.com/".format(hint[2:]))
+                elif len(hint) == 29:
+                    svc = self._gateway_services.get(hint[2:])
+                    if svc:
+                        sorted_roots.append(svc['_service_root'])
+
+        # Sort the available local services by weight (heaviest first)
+        # for this locator, and return their service_roots (base URIs)
         # in that order.
         # in that order.
-        sorted_roots = [
+        use_services = self._keep_services
+        if need_writable:
+            use_services = self._writable_services
+        self.using_proxy = self._any_nondisk_services(use_services)
+        sorted_roots.extend([
             svc['_service_root'] for svc in sorted(
             svc['_service_root'] for svc in sorted(
-                self._keep_services,
+                use_services,
                 reverse=True,
                 reverse=True,
-                key=lambda svc: self._service_weight(data_hash, svc['uuid']))]
-        _logger.debug(data_hash + ': ' + str(sorted_roots))
+                key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
+        _logger.debug("{}: {}".format(locator, sorted_roots))
         return sorted_roots
 
         return sorted_roots
 
-    def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
+    def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
         # roots_map is a dictionary, mapping Keep service root strings
         # to KeepService objects.  Poll for Keep services, and add any
         # new ones to roots_map.  Return the current list of local
         # root strings.
         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
         # roots_map is a dictionary, mapping Keep service root strings
         # to KeepService objects.  Poll for Keep services, and add any
         # new ones to roots_map.  Return the current list of local
         # root strings.
         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
-        local_roots = self.weighted_service_roots(md5_s, force_rebuild)
+        local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
         for root in local_roots:
             if root not in roots_map:
         for root in local_roots:
             if root not in roots_map:
-                roots_map[root] = self.KeepService(root, **headers)
+                roots_map[root] = self.KeepService(
+                    root, self._user_agent_pool,
+                    upload_counter=self.upload_counter,
+                    download_counter=self.download_counter,
+                    **headers)
         return local_roots
 
     @staticmethod
         return local_roots
 
     @staticmethod
@@ -574,8 +922,23 @@ class KeepClient(object):
         else:
             return None
 
         else:
             return None
 
+    def get_from_cache(self, loc):
+        """Fetch a block only if is in the cache, otherwise return None."""
+        slot = self.block_cache.get(loc)
+        if slot is not None and slot.ready.is_set():
+            return slot.get()
+        else:
+            return None
+
+    @retry.retry_method
+    def head(self, loc_s, num_retries=None):
+        return self._get_or_head(loc_s, method="HEAD", num_retries=num_retries)
+
     @retry.retry_method
     def get(self, loc_s, num_retries=None):
     @retry.retry_method
     def get(self, loc_s, num_retries=None):
+        return self._get_or_head(loc_s, method="GET", num_retries=num_retries)
+
+    def _get_or_head(self, loc_s, method="GET", num_retries=None):
         """Get data from Keep.
 
         This method fetches one or more blocks of data from Keep.  It
         """Get data from Keep.
 
         This method fetches one or more blocks of data from Keep.  It
@@ -597,31 +960,55 @@ class KeepClient(object):
         """
         if ',' in loc_s:
             return ''.join(self.get(x) for x in loc_s.split(','))
         """
         if ',' in loc_s:
             return ''.join(self.get(x) for x in loc_s.split(','))
-        locator = KeepLocator(loc_s)
-        expect_hash = locator.md5sum
 
 
-        slot, first = self.block_cache.reserve_cache(expect_hash)
-        if not first:
-            v = slot.get()
-            return v
+        self.get_counter.add(1)
+
+        locator = KeepLocator(loc_s)
+        if method == "GET":
+            slot, first = self.block_cache.reserve_cache(locator.md5sum)
+            if not first:
+                self.hits_counter.add(1)
+                v = slot.get()
+                return v
+
+        self.misses_counter.add(1)
+
+        # If the locator has hints specifying a prefix (indicating a
+        # remote keepproxy) or the UUID of a local gateway service,
+        # read data from the indicated service(s) instead of the usual
+        # list of local disk services.
+        hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
+                      for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
+        hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
+                           for hint in locator.hints if (
+                                   hint.startswith('K@') and
+                                   len(hint) == 29 and
+                                   self._gateway_services.get(hint[2:])
+                                   )])
+        # Map root URLs to their KeepService objects.
+        roots_map = {
+            root: self.KeepService(root, self._user_agent_pool,
+                                   upload_counter=self.upload_counter,
+                                   download_counter=self.download_counter)
+            for root in hint_roots
+        }
 
         # See #3147 for a discussion of the loop implementation.  Highlights:
         # * Refresh the list of Keep services after each failure, in case
         #   it's being updated.
         # * Retry until we succeed, we're out of retries, or every available
         #   service has returned permanent failure.
 
         # See #3147 for a discussion of the loop implementation.  Highlights:
         # * Refresh the list of Keep services after each failure, in case
         #   it's being updated.
         # * Retry until we succeed, we're out of retries, or every available
         #   service has returned permanent failure.
-        hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
-                      for hint in locator.hints if hint.startswith('K@')]
-        # Map root URLs their KeepService objects.
-        roots_map = {root: self.KeepService(root) for root in hint_roots}
+        sorted_roots = []
+        roots_map = {}
         blob = None
         loop = retry.RetryLoop(num_retries, self._check_loop_result,
                                backoff_start=2)
         for tries_left in loop:
             try:
         blob = None
         loop = retry.RetryLoop(num_retries, self._check_loop_result,
                                backoff_start=2)
         for tries_left in loop:
             try:
-                local_roots = self.map_new_services(
-                    roots_map, expect_hash,
-                    force_rebuild=(tries_left < num_retries))
+                sorted_roots = self.map_new_services(
+                    roots_map, locator,
+                    force_rebuild=(tries_left < num_retries),
+                    need_writable=False)
             except Exception as error:
                 loop.save_result(error)
                 continue
             except Exception as error:
                 loop.save_result(error)
                 continue
@@ -629,41 +1016,40 @@ class KeepClient(object):
             # Query KeepService objects that haven't returned
             # permanent failure, in our specified shuffle order.
             services_to_try = [roots_map[root]
             # Query KeepService objects that haven't returned
             # permanent failure, in our specified shuffle order.
             services_to_try = [roots_map[root]
-                               for root in (local_roots + hint_roots)
+                               for root in sorted_roots
                                if roots_map[root].usable()]
             for keep_service in services_to_try:
                                if roots_map[root].usable()]
             for keep_service in services_to_try:
-                blob = keep_service.get(locator, timeout=self.current_timeout())
+                blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
                 if blob is not None:
                     break
             loop.save_result((blob, len(services_to_try)))
 
         # Always cache the result, then return it if we succeeded.
                 if blob is not None:
                     break
             loop.save_result((blob, len(services_to_try)))
 
         # Always cache the result, then return it if we succeeded.
-        slot.set(blob)
-        self.block_cache.cap_cache()
+        if method == "GET":
+            slot.set(blob)
+            self.block_cache.cap_cache()
         if loop.success():
         if loop.success():
-            return blob
+            if method == "HEAD":
+                return True
+            else:
+                return blob
 
 
-        try:
-            all_roots = local_roots + hint_roots
-        except NameError:
-            # We never successfully fetched local_roots.
-            all_roots = hint_roots
         # Q: Including 403 is necessary for the Keep tests to continue
         # passing, but maybe they should expect KeepReadError instead?
         # Q: Including 403 is necessary for the Keep tests to continue
         # passing, but maybe they should expect KeepReadError instead?
-        not_founds = sum(1 for key in all_roots
-                         if roots_map[key].last_status() in {403, 404, 410})
-        service_errors = ((key, roots_map[key].last_result)
-                          for key in all_roots)
+        not_founds = sum(1 for key in sorted_roots
+                         if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
+        service_errors = ((key, roots_map[key].last_result()['error'])
+                          for key in sorted_roots)
         if not roots_map:
             raise arvados.errors.KeepReadError(
                 "failed to read {}: no Keep services available ({})".format(
                     loc_s, loop.last_result()))
         if not roots_map:
             raise arvados.errors.KeepReadError(
                 "failed to read {}: no Keep services available ({})".format(
                     loc_s, loop.last_result()))
-        elif not_founds == len(all_roots):
+        elif not_founds == len(sorted_roots):
             raise arvados.errors.NotFoundError(
                 "{} not found".format(loc_s), service_errors)
         else:
             raise arvados.errors.KeepReadError(
             raise arvados.errors.NotFoundError(
                 "{} not found".format(loc_s), service_errors)
         else:
             raise arvados.errors.KeepReadError(
-                "failed to read {}".format(loc_s), service_errors)
+                "failed to read {}".format(loc_s), service_errors, label="service")
 
     @retry.retry_method
     def put(self, data, copies=2, num_retries=None):
 
     @retry.retry_method
     def put(self, data, copies=2, num_retries=None):
@@ -688,59 +1074,59 @@ class KeepClient(object):
         if isinstance(data, unicode):
             data = data.encode("ascii")
         elif not isinstance(data, str):
         if isinstance(data, unicode):
             data = data.encode("ascii")
         elif not isinstance(data, str):
-            raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put must be type 'str'")
+            raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
+
+        self.put_counter.add(1)
 
         data_hash = hashlib.md5(data).hexdigest()
 
         data_hash = hashlib.md5(data).hexdigest()
+        loc_s = data_hash + '+' + str(len(data))
         if copies < 1:
         if copies < 1:
-            return data_hash
+            return loc_s
+        locator = KeepLocator(loc_s)
 
         headers = {}
 
         headers = {}
-        if self.using_proxy:
-            # Tell the proxy how many copies we want it to store
-            headers['X-Keep-Desired-Replication'] = str(copies)
+        # Tell the proxy how many copies we want it to store
+        headers['X-Keep-Desired-Replicas'] = str(copies)
         roots_map = {}
         roots_map = {}
-        thread_limiter = KeepClient.ThreadLimiter(copies)
         loop = retry.RetryLoop(num_retries, self._check_loop_result,
                                backoff_start=2)
         loop = retry.RetryLoop(num_retries, self._check_loop_result,
                                backoff_start=2)
+        done = 0
         for tries_left in loop:
             try:
         for tries_left in loop:
             try:
-                local_roots = self.map_new_services(
-                    roots_map, data_hash,
-                    force_rebuild=(tries_left < num_retries), **headers)
+                sorted_roots = self.map_new_services(
+                    roots_map, locator,
+                    force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
             except Exception as error:
                 loop.save_result(error)
                 continue
 
             except Exception as error:
                 loop.save_result(error)
                 continue
 
-            threads = []
-            for service_root, ks in roots_map.iteritems():
+            writer_pool = KeepClient.KeepWriterThreadPool(data=data, 
+                                                        data_hash=data_hash,
+                                                        copies=copies - done,
+                                                        max_service_replicas=self.max_replicas_per_service,
+                                                        timeout=self.current_timeout(num_retries - tries_left))
+            for service_root, ks in [(root, roots_map[root])
+                                     for root in sorted_roots]:
                 if ks.finished():
                     continue
                 if ks.finished():
                     continue
-                t = KeepClient.KeepWriterThread(
-                    ks,
-                    data=data,
-                    data_hash=data_hash,
-                    service_root=service_root,
-                    thread_limiter=thread_limiter,
-                    timeout=self.current_timeout())
-                t.start()
-                threads.append(t)
-            for t in threads:
-                t.join()
-            loop.save_result((thread_limiter.done() >= copies, len(threads)))
+                writer_pool.add_task(ks, service_root)
+            writer_pool.join()
+            done += writer_pool.done()
+            loop.save_result((done >= copies, writer_pool.total_task_nr))
 
         if loop.success():
 
         if loop.success():
-            return thread_limiter.response()
+            return writer_pool.response()
         if not roots_map:
             raise arvados.errors.KeepWriteError(
                 "failed to write {}: no Keep services available ({})".format(
                     data_hash, loop.last_result()))
         else:
         if not roots_map:
             raise arvados.errors.KeepWriteError(
                 "failed to write {}: no Keep services available ({})".format(
                     data_hash, loop.last_result()))
         else:
-            service_errors = ((key, roots_map[key].last_result)
-                              for key in local_roots
-                              if not roots_map[key].success_flag)
+            service_errors = ((key, roots_map[key].last_result()['error'])
+                              for key in sorted_roots
+                              if roots_map[key].last_result()['error'])
             raise arvados.errors.KeepWriteError(
                 "failed to write {} (wanted {} copies but wrote {})".format(
             raise arvados.errors.KeepWriteError(
                 "failed to write {} (wanted {} copies but wrote {})".format(
-                    data_hash, copies, thread_limiter.done()), service_errors)
+                    data_hash, copies, writer_pool.done()), service_errors, label="service")
 
     def local_store_put(self, data, copies=1, num_retries=None):
         """A stub for put().
 
     def local_store_put(self, data, copies=1, num_retries=None):
         """A stub for put().
@@ -773,3 +1159,6 @@ class KeepClient(object):
             return ''
         with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
             return f.read()
             return ''
         with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
             return f.read()
+
+    def is_cached(self, locator):
+        return self.block_cache.reserve_cache(expect_hash)