Merge branch '11507-deleted-bufferblocks' refs #11507
[arvados.git] / sdk / python / arvados / keep.py
index ab683526e077d266ee24090543ad98cea3b200cd..5b4770c4d0dca8824c268448296d0658c8ba04d8 100644 (file)
@@ -1,25 +1,19 @@
-import gflags
+import cStringIO
+import collections
+import datetime
+import hashlib
 import logging
+import math
 import os
-import pprint
-import sys
-import types
-import subprocess
-import json
-import UserDict
+import pycurl
+import Queue
 import re
-import hashlib
-import string
-import bz2
-import zlib
-import fcntl
-import time
+import socket
+import ssl
+import sys
 import threading
 import timer
-import datetime
-import ssl
-import socket
-import requests
+import urlparse
 
 import arvados
 import arvados.config as config
@@ -30,6 +24,18 @@ import arvados.util
 _logger = logging.getLogger('arvados.keep')
 global_client_object = None
 
+
+# Monkey patch TCP constants when not available (apple). Values sourced from:
+# http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
+if sys.platform == 'darwin':
+    if not hasattr(socket, 'TCP_KEEPALIVE'):
+        socket.TCP_KEEPALIVE = 0x010
+    if not hasattr(socket, 'TCP_KEEPINTVL'):
+        socket.TCP_KEEPINTVL = 0x101
+    if not hasattr(socket, 'TCP_KEEPCNT'):
+        socket.TCP_KEEPCNT = 0x102
+
+
 class KeepLocator(object):
     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
@@ -46,7 +52,7 @@ class KeepLocator(object):
             self.size = None
         for hint in pieces:
             if self.HINT_RE.match(hint) is None:
-                raise ValueError("unrecognized hint data {}".format(hint))
+                raise ValueError("invalid hint format: {}".format(hint))
             elif hint.startswith('A'):
                 self.parse_permission_hint(hint)
             else:
@@ -59,7 +65,10 @@ class KeepLocator(object):
             if s is not None)
 
     def stripped(self):
-        return "%s+%i" % (self.md5sum, self.size)
+        if self.size is not None:
+            return "%s+%i" % (self.md5sum, self.size)
+        else:
+            return self.md5sum
 
     def _make_hex_prop(name, length):
         # Build and return a new property with the given name that
@@ -69,7 +78,7 @@ class KeepLocator(object):
             return getattr(self, data_name)
         def setter(self, hex_str):
             if not arvados.util.is_hex(hex_str, length):
-                raise ValueError("{} must be a {}-digit hex string: {}".
+                raise ValueError("{} is not a {}-digit hex string: {}".
                                  format(name, length, hex_str))
             setattr(self, data_name, hex_str)
         return property(getter, setter)
@@ -153,6 +162,8 @@ class KeepBlockCache(object):
         self._cache_lock = threading.Lock()
 
     class CacheSlot(object):
+        __slots__ = ("locator", "ready", "content")
+
         def __init__(self, locator):
             self.locator = locator
             self.ready = threading.Event()
@@ -215,200 +226,432 @@ class KeepBlockCache(object):
                 self._cache.insert(0, n)
                 return n, True
 
-class KeepClient(object):
+class Counter(object):
+    def __init__(self, v=0):
+        self._lk = threading.Lock()
+        self._val = v
 
-    # Default Keep server connection timeout:  2 seconds
-    # Default Keep server read timeout:      300 seconds
-    # Default Keep proxy connection timeout:  20 seconds
-    # Default Keep proxy read timeout:       300 seconds
-    DEFAULT_TIMEOUT = (2, 300)
-    DEFAULT_PROXY_TIMEOUT = (20, 300)
+    def add(self, v):
+        with self._lk:
+            self._val += v
 
-    class ThreadLimiter(object):
-        """
-        Limit the number of threads running at a given time to
-        {desired successes} minus {successes reported}. When successes
-        reported == desired, wake up the remaining threads and tell
-        them to quit.
+    def get(self):
+        with self._lk:
+            return self._val
 
-        Should be used in a "with" block.
-        """
-        def __init__(self, todo):
-            self._todo = todo
-            self._done = 0
-            self._response = None
-            self._todo_lock = threading.Semaphore(todo)
-            self._done_lock = threading.Lock()
-
-        def __enter__(self):
-            self._todo_lock.acquire()
-            return self
-
-        def __exit__(self, type, value, traceback):
-            self._todo_lock.release()
-
-        def shall_i_proceed(self):
-            """
-            Return true if the current thread should do stuff. Return
-            false if the current thread should just stop.
-            """
-            with self._done_lock:
-                return (self._done < self._todo)
-
-        def save_response(self, response_body, replicas_stored):
-            """
-            Records a response body (a locator, possibly signed) returned by
-            the Keep server.  It is not necessary to save more than
-            one response, since we presume that any locator returned
-            in response to a successful request is valid.
-            """
-            with self._done_lock:
-                self._done += replicas_stored
-                self._response = response_body
 
-        def response(self):
-            """
-            Returns the body from the response to a PUT request.
-            """
-            with self._done_lock:
-                return self._response
+class KeepClient(object):
 
-        def done(self):
-            """
-            Return how many successes were reported.
-            """
-            with self._done_lock:
-                return self._done
+    # Default Keep server connection timeout:  2 seconds
+    # Default Keep server read timeout:       256 seconds
+    # Default Keep server bandwidth minimum:  32768 bytes per second
+    # Default Keep proxy connection timeout:  20 seconds
+    # Default Keep proxy read timeout:        256 seconds
+    # Default Keep proxy bandwidth minimum:   32768 bytes per second
+    DEFAULT_TIMEOUT = (2, 256, 32768)
+    DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
 
 
     class KeepService(object):
-        # Make requests to a single Keep service, and track results.
-        HTTP_ERRORS = (requests.exceptions.RequestException,
-                       socket.error, ssl.SSLError)
+        """Make requests to a single Keep service, and track results.
+
+        A KeepService is intended to last long enough to perform one
+        transaction (GET or PUT) against one Keep service. This can
+        involve calling either get() or put() multiple times in order
+        to retry after transient failures. However, calling both get()
+        and put() on a single instance -- or using the same instance
+        to access two different Keep services -- will not produce
+        sensible behavior.
+        """
 
-        def __init__(self, root, session, **headers):
+        HTTP_ERRORS = (
+            socket.error,
+            ssl.SSLError,
+            arvados.errors.HttpError,
+        )
+
+        def __init__(self, root, user_agent_pool=Queue.LifoQueue(),
+                     upload_counter=None,
+                     download_counter=None, **headers):
             self.root = root
-            self.last_result = None
-            self.success_flag = None
-            self.session = session
+            self._user_agent_pool = user_agent_pool
+            self._result = {'error': None}
+            self._usable = True
+            self._session = None
             self.get_headers = {'Accept': 'application/octet-stream'}
             self.get_headers.update(headers)
             self.put_headers = headers
+            self.upload_counter = upload_counter
+            self.download_counter = download_counter
 
         def usable(self):
-            return self.success_flag is not False
+            """Is it worth attempting a request?"""
+            return self._usable
 
         def finished(self):
-            return self.success_flag is not None
+            """Did the request succeed or encounter permanent failure?"""
+            return self._result['error'] == False or not self._usable
+
+        def last_result(self):
+            return self._result
 
-        def last_status(self):
+        def _get_user_agent(self):
             try:
-                return self.last_result.status_code
-            except AttributeError:
-                return None
+                return self._user_agent_pool.get(block=False)
+            except Queue.Empty:
+                return pycurl.Curl()
 
-        def get(self, locator, timeout=None):
+        def _put_user_agent(self, ua):
+            try:
+                ua.reset()
+                self._user_agent_pool.put(ua, block=False)
+            except:
+                ua.close()
+
+        def _socket_open(self, *args, **kwargs):
+            if len(args) + len(kwargs) == 2:
+                return self._socket_open_pycurl_7_21_5(*args, **kwargs)
+            else:
+                return self._socket_open_pycurl_7_19_3(*args, **kwargs)
+
+        def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
+            return self._socket_open_pycurl_7_21_5(
+                purpose=None,
+                address=collections.namedtuple(
+                    'Address', ['family', 'socktype', 'protocol', 'addr'],
+                )(family, socktype, protocol, address))
+
+        def _socket_open_pycurl_7_21_5(self, purpose, address):
+            """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
+            s = socket.socket(address.family, address.socktype, address.protocol)
+            s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
+            # Will throw invalid protocol error on mac. This test prevents that.
+            if hasattr(socket, 'TCP_KEEPIDLE'):
+                s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
+            s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
+            return s
+
+        def get(self, locator, method="GET", timeout=None):
             # locator is a KeepLocator object.
             url = self.root + str(locator)
-            _logger.debug("Request: GET %s", url)
+            _logger.debug("Request: %s %s", method, url)
+            curl = self._get_user_agent()
+            ok = None
             try:
                 with timer.Timer() as t:
-                    result = self.session.get(url.encode('utf-8'),
-                                          headers=self.get_headers,
-                                          timeout=timeout)
+                    self._headers = {}
+                    response_body = cStringIO.StringIO()
+                    curl.setopt(pycurl.NOSIGNAL, 1)
+                    curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
+                    curl.setopt(pycurl.URL, url.encode('utf-8'))
+                    curl.setopt(pycurl.HTTPHEADER, [
+                        '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
+                    curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
+                    curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
+                    if method == "HEAD":
+                        curl.setopt(pycurl.NOBODY, True)
+                    self._setcurltimeouts(curl, timeout)
+
+                    try:
+                        curl.perform()
+                    except Exception as e:
+                        raise arvados.errors.HttpError(0, str(e))
+                    self._result = {
+                        'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
+                        'body': response_body.getvalue(),
+                        'headers': self._headers,
+                        'error': False,
+                    }
+
+                ok = retry.check_http_response_success(self._result['status_code'])
+                if not ok:
+                    self._result['error'] = arvados.errors.HttpError(
+                        self._result['status_code'],
+                        self._headers.get('x-status-line', 'Error'))
             except self.HTTP_ERRORS as e:
-                _logger.debug("Request fail: GET %s => %s: %s",
-                              url, type(e), str(e))
-                self.last_result = e
+                self._result = {
+                    'error': e,
+                }
+            self._usable = ok != False
+            if self._result.get('status_code', None):
+                # The client worked well enough to get an HTTP status
+                # code, so presumably any problems are just on the
+                # server side and it's OK to reuse the client.
+                self._put_user_agent(curl)
             else:
-                self.last_result = result
-                self.success_flag = retry.check_http_response_success(result)
-                content = result.content
-                _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
-                             self.last_status(), len(content), t.msecs,
-                             (len(content)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
-                if self.success_flag:
-                    resp_md5 = hashlib.md5(content).hexdigest()
-                    if resp_md5 == locator.md5sum:
-                        return content
-                    _logger.warning("Checksum fail: md5(%s) = %s",
-                                    url, resp_md5)
-            return None
+                # Don't return this client to the pool, in case it's
+                # broken.
+                curl.close()
+            if not ok:
+                _logger.debug("Request fail: GET %s => %s: %s",
+                              url, type(self._result['error']), str(self._result['error']))
+                return None
+            if method == "HEAD":
+                _logger.info("HEAD %s: %s bytes",
+                         self._result['status_code'],
+                         self._result.get('content-length'))
+                return True
+
+            _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
+                         self._result['status_code'],
+                         len(self._result['body']),
+                         t.msecs,
+                         (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
+
+            if self.download_counter:
+                self.download_counter.add(len(self._result['body']))
+            resp_md5 = hashlib.md5(self._result['body']).hexdigest()
+            if resp_md5 != locator.md5sum:
+                _logger.warning("Checksum fail: md5(%s) = %s",
+                                url, resp_md5)
+                self._result['error'] = arvados.errors.HttpError(
+                    0, 'Checksum fail')
+                return None
+            return self._result['body']
 
         def put(self, hash_s, body, timeout=None):
             url = self.root + hash_s
             _logger.debug("Request: PUT %s", url)
+            curl = self._get_user_agent()
+            ok = None
             try:
-                result = self.session.put(url.encode('utf-8'),
-                                      data=body,
-                                      headers=self.put_headers,
-                                      timeout=timeout)
+                with timer.Timer() as t:
+                    self._headers = {}
+                    body_reader = cStringIO.StringIO(body)
+                    response_body = cStringIO.StringIO()
+                    curl.setopt(pycurl.NOSIGNAL, 1)
+                    curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
+                    curl.setopt(pycurl.URL, url.encode('utf-8'))
+                    # Using UPLOAD tells cURL to wait for a "go ahead" from the
+                    # Keep server (in the form of a HTTP/1.1 "100 Continue"
+                    # response) instead of sending the request body immediately.
+                    # This allows the server to reject the request if the request
+                    # is invalid or the server is read-only, without waiting for
+                    # the client to send the entire block.
+                    curl.setopt(pycurl.UPLOAD, True)
+                    curl.setopt(pycurl.INFILESIZE, len(body))
+                    curl.setopt(pycurl.READFUNCTION, body_reader.read)
+                    curl.setopt(pycurl.HTTPHEADER, [
+                        '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
+                    curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
+                    curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
+                    self._setcurltimeouts(curl, timeout)
+                    try:
+                        curl.perform()
+                    except Exception as e:
+                        raise arvados.errors.HttpError(0, str(e))
+                    self._result = {
+                        'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
+                        'body': response_body.getvalue(),
+                        'headers': self._headers,
+                        'error': False,
+                    }
+                ok = retry.check_http_response_success(self._result['status_code'])
+                if not ok:
+                    self._result['error'] = arvados.errors.HttpError(
+                        self._result['status_code'],
+                        self._headers.get('x-status-line', 'Error'))
             except self.HTTP_ERRORS as e:
-                _logger.debug("Request fail: PUT %s => %s: %s",
-                              url, type(e), str(e))
-                self.last_result = e
+                self._result = {
+                    'error': e,
+                }
+            self._usable = ok != False # still usable if ok is True or None
+            if self._result.get('status_code', None):
+                # Client is functional. See comment in get().
+                self._put_user_agent(curl)
             else:
-                self.last_result = result
-                self.success_flag = retry.check_http_response_success(result)
-            return self.success_flag
-
+                curl.close()
+            if not ok:
+                _logger.debug("Request fail: PUT %s => %s: %s",
+                              url, type(self._result['error']), str(self._result['error']))
+                return False
+            _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
+                         self._result['status_code'],
+                         len(body),
+                         t.msecs,
+                         (len(body)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
+            if self.upload_counter:
+                self.upload_counter.add(len(body))
+            return True
 
+        def _setcurltimeouts(self, curl, timeouts):
+            if not timeouts:
+                return
+            elif isinstance(timeouts, tuple):
+                if len(timeouts) == 2:
+                    conn_t, xfer_t = timeouts
+                    bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
+                else:
+                    conn_t, xfer_t, bandwidth_bps = timeouts
+            else:
+                conn_t, xfer_t = (timeouts, timeouts)
+                bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
+            curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
+            curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
+            curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
+
+        def _headerfunction(self, header_line):
+            header_line = header_line.decode('iso-8859-1')
+            if ':' in header_line:
+                name, value = header_line.split(':', 1)
+                name = name.strip().lower()
+                value = value.strip()
+            elif self._headers:
+                name = self._lastheadername
+                value = self._headers[name] + ' ' + header_line.strip()
+            elif header_line.startswith('HTTP/'):
+                name = 'x-status-line'
+                value = header_line
+            else:
+                _logger.error("Unexpected header line: %s", header_line)
+                return
+            self._lastheadername = name
+            self._headers[name] = value
+            # Returning None implies all bytes were written
+    
+
+    class KeepWriterQueue(Queue.Queue):
+        def __init__(self, copies):
+            Queue.Queue.__init__(self) # Old-style superclass
+            self.wanted_copies = copies
+            self.successful_copies = 0
+            self.response = None
+            self.successful_copies_lock = threading.Lock()
+            self.pending_tries = copies
+            self.pending_tries_notification = threading.Condition()
+        
+        def write_success(self, response, replicas_nr):
+            with self.successful_copies_lock:
+                self.successful_copies += replicas_nr
+                self.response = response
+            with self.pending_tries_notification:
+                self.pending_tries_notification.notify_all()
+        
+        def write_fail(self, ks):
+            with self.pending_tries_notification:
+                self.pending_tries += 1
+                self.pending_tries_notification.notify()
+        
+        def pending_copies(self):
+            with self.successful_copies_lock:
+                return self.wanted_copies - self.successful_copies
+
+        def get_next_task(self):
+            with self.pending_tries_notification:
+                while True:
+                    if self.pending_copies() < 1:
+                        # This notify_all() is unnecessary --
+                        # write_success() already called notify_all()
+                        # when pending<1 became true, so it's not
+                        # possible for any other thread to be in
+                        # wait() now -- but it's cheap insurance
+                        # against deadlock so we do it anyway:
+                        self.pending_tries_notification.notify_all()
+                        # Drain the queue and then raise Queue.Empty
+                        while True:
+                            self.get_nowait()
+                            self.task_done()
+                    elif self.pending_tries > 0:
+                        service, service_root = self.get_nowait()
+                        if service.finished():
+                            self.task_done()
+                            continue
+                        self.pending_tries -= 1
+                        return service, service_root
+                    elif self.empty():
+                        self.pending_tries_notification.notify_all()
+                        raise Queue.Empty
+                    else:
+                        self.pending_tries_notification.wait()
+
+
+    class KeepWriterThreadPool(object):
+        def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None):
+            self.total_task_nr = 0
+            self.wanted_copies = copies
+            if (not max_service_replicas) or (max_service_replicas >= copies):
+                num_threads = 1
+            else:
+                num_threads = int(math.ceil(float(copies) / max_service_replicas))
+            _logger.debug("Pool max threads is %d", num_threads)
+            self.workers = []
+            self.queue = KeepClient.KeepWriterQueue(copies)
+            # Create workers
+            for _ in range(num_threads):
+                w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
+                self.workers.append(w)
+        
+        def add_task(self, ks, service_root):
+            self.queue.put((ks, service_root))
+            self.total_task_nr += 1
+        
+        def done(self):
+            return self.queue.successful_copies
+        
+        def join(self):
+            # Start workers
+            for worker in self.workers:
+                worker.start()
+            # Wait for finished work
+            self.queue.join()
+        
+        def response(self):
+            return self.queue.response
+    
+    
     class KeepWriterThread(threading.Thread):
-        """
-        Write a blob of data to the given Keep server. On success, call
-        save_response() of the given ThreadLimiter to save the returned
-        locator.
-        """
-        def __init__(self, keep_service, **kwargs):
-            super(KeepClient.KeepWriterThread, self).__init__()
-            self.service = keep_service
-            self.args = kwargs
-            self._success = False
+        TaskFailed = RuntimeError()
 
-        def success(self):
-            return self._success
+        def __init__(self, queue, data, data_hash, timeout=None):
+            super(KeepClient.KeepWriterThread, self).__init__()
+            self.timeout = timeout
+            self.queue = queue
+            self.data = data
+            self.data_hash = data_hash
+            self.daemon = True
 
         def run(self):
-            with self.args['thread_limiter'] as limiter:
-                if not limiter.shall_i_proceed():
-                    # My turn arrived, but the job has been done without
-                    # me.
+            while True:
+                try:
+                    service, service_root = self.queue.get_next_task()
+                except Queue.Empty:
                     return
-                self.run_with_limiter(limiter)
-
-        def run_with_limiter(self, limiter):
-            if self.service.finished():
-                return
-            _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
-                          str(threading.current_thread()),
-                          self.args['data_hash'],
-                          len(self.args['data']),
-                          self.args['service_root'])
-            self._success = bool(self.service.put(
-                self.args['data_hash'],
-                self.args['data'],
-                timeout=self.args.get('timeout', None)))
-            status = self.service.last_status()
-            if self._success:
-                result = self.service.last_result
-                _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
-                              str(threading.current_thread()),
-                              self.args['data_hash'],
-                              len(self.args['data']),
-                              self.args['service_root'])
-                # Tick the 'done' counter for the number of replica
-                # reported stored by the server, for the case that
-                # we're talking to a proxy or other backend that
-                # stores to multiple copies for us.
                 try:
-                    replicas_stored = int(result.headers['x-keep-replicas-stored'])
-                except (KeyError, ValueError):
-                    replicas_stored = 1
-                limiter.save_response(result.text.strip(), replicas_stored)
-            elif status is not None:
-                _logger.debug("Request fail: PUT %s => %s %s",
-                              self.args['data_hash'], status,
-                              self.service.last_result.text)
+                    locator, copies = self.do_task(service, service_root)
+                except Exception as e:
+                    if e is not self.TaskFailed:
+                        _logger.exception("Exception in KeepWriterThread")
+                    self.queue.write_fail(service)
+                else:
+                    self.queue.write_success(locator, copies)
+                finally:
+                    self.queue.task_done()
+
+        def do_task(self, service, service_root):
+            success = bool(service.put(self.data_hash,
+                                        self.data,
+                                        timeout=self.timeout))
+            result = service.last_result()
+
+            if not success:
+                if result.get('status_code', None):
+                    _logger.debug("Request fail: PUT %s => %s %s",
+                                  self.data_hash,
+                                  result['status_code'],
+                                  result['body'])
+                raise self.TaskFailed
+
+            _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
+                          str(threading.current_thread()),
+                          self.data_hash,
+                          len(self.data),
+                          service_root)
+            try:
+                replicas_stored = int(result['headers']['x-keep-replicas-stored'])
+            except (KeyError, ValueError):
+                replicas_stored = 1
+
+            return result['body'].strip(), replicas_stored
 
 
     def __init__(self, api_client=None, proxy=None,
@@ -426,20 +669,28 @@ class KeepClient(object):
         :proxy:
           If specified, this KeepClient will send requests to this Keep
           proxy.  Otherwise, KeepClient will fall back to the setting of the
-          ARVADOS_KEEP_PROXY configuration setting.  If you want to ensure
-          KeepClient does not use a proxy, pass in an empty string.
+          ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
+          If you want to KeepClient does not use a proxy, pass in an empty
+          string.
 
         :timeout:
-          The timeout (in seconds) for HTTP requests to Keep
-          non-proxy servers.  A tuple of two floats is interpreted as
-          (connection_timeout, read_timeout): see
-          http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
-          Default: (2, 300).
+          The initial timeout (in seconds) for HTTP requests to Keep
+          non-proxy servers.  A tuple of three floats is interpreted as
+          (connection_timeout, read_timeout, minimum_bandwidth). A connection
+          will be aborted if the average traffic rate falls below
+          minimum_bandwidth bytes per second over an interval of read_timeout
+          seconds. Because timeouts are often a result of transient server
+          load, the actual connection timeout will be increased by a factor
+          of two on each retry.
+          Default: (2, 256, 32768).
 
         :proxy_timeout:
-          The timeout (in seconds) for HTTP requests to
-          Keep proxies. A tuple of two floats is interpreted as
-          (connection_timeout, read_timeout). Default: (20, 300).
+          The initial timeout (in seconds) for HTTP requests to
+          Keep proxies. A tuple of three floats is interpreted as
+          (connection_timeout, read_timeout, minimum_bandwidth). The behavior
+          described above for adjusting connection timeouts on retry also
+          applies.
+          Default: (20, 256, 32768).
 
         :api_token:
           If you're not using an API client, but only talking
@@ -460,14 +711,13 @@ class KeepClient(object):
           The default number of times to retry failed requests.
           This will be used as the default num_retries value when get() and
           put() are called.  Default 0.
-
-        :session:
-          The requests.Session object to use for get() and put() requests.
-          Will create one if not specified.
         """
         self.lock = threading.Lock()
         if proxy is None:
-            proxy = config.get('ARVADOS_KEEP_PROXY')
+            if config.get('ARVADOS_KEEP_SERVICES'):
+                proxy = config.get('ARVADOS_KEEP_SERVICES')
+            else:
+                proxy = config.get('ARVADOS_KEEP_PROXY')
         if api_token is None:
             if api_client is None:
                 api_token = config.get('ARVADOS_API_TOKEN')
@@ -482,6 +732,13 @@ class KeepClient(object):
         self.block_cache = block_cache if block_cache else KeepBlockCache()
         self.timeout = timeout
         self.proxy_timeout = proxy_timeout
+        self._user_agent_pool = Queue.LifoQueue()
+        self.upload_counter = Counter()
+        self.download_counter = Counter()
+        self.put_counter = Counter()
+        self.get_counter = Counter()
+        self.hits_counter = Counter()
+        self.misses_counter = Counter()
 
         if local_store:
             self.local_store = local_store
@@ -489,15 +746,24 @@ class KeepClient(object):
             self.put = self.local_store_put
         else:
             self.num_retries = num_retries
-            self.session = session if session is not None else requests.Session()
+            self.max_replicas_per_service = None
             if proxy:
-                if not proxy.endswith('/'):
-                    proxy += '/'
+                proxy_uris = proxy.split()
+                for i in range(len(proxy_uris)):
+                    if not proxy_uris[i].endswith('/'):
+                        proxy_uris[i] += '/'
+                    # URL validation
+                    url = urlparse.urlparse(proxy_uris[i])
+                    if not (url.scheme and url.netloc):
+                        raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
                 self.api_token = api_token
+                self._gateway_services = {}
                 self._keep_services = [{
-                    'uuid': 'proxy',
-                    '_service_root': proxy,
-                    }]
+                    'uuid': "00000-bi6l4-%015d" % idx,
+                    'service_type': 'proxy',
+                    '_service_root': uri,
+                    } for idx, uri in enumerate(proxy_uris)]
+                self._writable_services = self._keep_services
                 self.using_proxy = True
                 self._static_services_list = True
             else:
@@ -507,18 +773,32 @@ class KeepClient(object):
                     api_client = arvados.api('v1')
                 self.api_client = api_client
                 self.api_token = api_client.api_token
+                self._gateway_services = {}
                 self._keep_services = None
+                self._writable_services = None
                 self.using_proxy = None
                 self._static_services_list = False
 
-    def current_timeout(self):
-        """Return the appropriate timeout to use for this client: the proxy
-        timeout setting if the backend service is currently a proxy,
-        the regular timeout setting otherwise.
+    def current_timeout(self, attempt_number):
+        """Return the appropriate timeout to use for this client.
+
+        The proxy timeout setting if the backend service is currently a proxy,
+        the regular timeout setting otherwise.  The `attempt_number` indicates
+        how many times the operation has been tried already (starting from 0
+        for the first try), and scales the connection timeout portion of the
+        return value accordingly.
+
         """
         # TODO(twp): the timeout should be a property of a
         # KeepService, not a KeepClient. See #4488.
-        return self.proxy_timeout if self.using_proxy else self.timeout
+        t = self.proxy_timeout if self.using_proxy else self.timeout
+        if len(t) == 2:
+            return (t[0] * (1 << attempt_number), t[1])
+        else:
+            return (t[0] * (1 << attempt_number), t[1], t[2])
+    def _any_nondisk_services(self, service_list):
+        return any(ks.get('service_type', 'disk') != 'disk'
+                   for ks in service_list)
 
     def build_services_list(self, force_rebuild=False):
         if (self._static_services_list or
@@ -530,20 +810,38 @@ class KeepClient(object):
             except Exception:  # API server predates Keep services.
                 keep_services = self.api_client.keep_disks().list()
 
-            self._keep_services = keep_services.execute().get('items')
-            if not self._keep_services:
+            # Gateway services are only used when specified by UUID,
+            # so there's nothing to gain by filtering them by
+            # service_type.
+            self._gateway_services = {ks['uuid']: ks for ks in
+                                      keep_services.execute()['items']}
+            if not self._gateway_services:
                 raise arvados.errors.NoKeepServersError()
 
-            self.using_proxy = any(ks.get('service_type') == 'proxy'
-                                   for ks in self._keep_services)
-
             # Precompute the base URI for each service.
-            for r in self._keep_services:
-                r['_service_root'] = "{}://[{}]:{:d}/".format(
+            for r in self._gateway_services.itervalues():
+                host = r['service_host']
+                if not host.startswith('[') and host.find(':') >= 0:
+                    # IPv6 URIs must be formatted like http://[::1]:80/...
+                    host = '[' + host + ']'
+                r['_service_root'] = "{}://{}:{:d}/".format(
                     'https' if r['service_ssl_flag'] else 'http',
-                    r['service_host'],
+                    host,
                     r['service_port'])
-            _logger.debug(str(self._keep_services))
+
+            _logger.debug(str(self._gateway_services))
+            self._keep_services = [
+                ks for ks in self._gateway_services.itervalues()
+                if not ks.get('service_type', '').startswith('gateway:')]
+            self._writable_services = [ks for ks in self._keep_services
+                                       if not ks.get('read_only')]
+
+            # For disk type services, max_replicas_per_service is 1
+            # It is unknown (unlimited) for other service types.
+            if self._any_nondisk_services(self._writable_services):
+                self.max_replicas_per_service = None
+            else:
+                self.max_replicas_per_service = 1
 
     def _service_weight(self, data_hash, service_uuid):
         """Compute the weight of a Keep service endpoint for a data
@@ -554,34 +852,56 @@ class KeepClient(object):
         """
         return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
 
-    def weighted_service_roots(self, data_hash, force_rebuild=False):
+    def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
         """Return an array of Keep service endpoints, in the order in
         which they should be probed when reading or writing data with
-        the given hash.
+        the given hash+hints.
         """
         self.build_services_list(force_rebuild)
 
-        # Sort the available services by weight (heaviest first) for
-        # this data_hash, and return their service_roots (base URIs)
+        sorted_roots = []
+        # Use the services indicated by the given +K@... remote
+        # service hints, if any are present and can be resolved to a
+        # URI.
+        for hint in locator.hints:
+            if hint.startswith('K@'):
+                if len(hint) == 7:
+                    sorted_roots.append(
+                        "https://keep.{}.arvadosapi.com/".format(hint[2:]))
+                elif len(hint) == 29:
+                    svc = self._gateway_services.get(hint[2:])
+                    if svc:
+                        sorted_roots.append(svc['_service_root'])
+
+        # Sort the available local services by weight (heaviest first)
+        # for this locator, and return their service_roots (base URIs)
         # in that order.
-        sorted_roots = [
+        use_services = self._keep_services
+        if need_writable:
+            use_services = self._writable_services
+        self.using_proxy = self._any_nondisk_services(use_services)
+        sorted_roots.extend([
             svc['_service_root'] for svc in sorted(
-                self._keep_services,
+                use_services,
                 reverse=True,
-                key=lambda svc: self._service_weight(data_hash, svc['uuid']))]
-        _logger.debug(data_hash + ': ' + str(sorted_roots))
+                key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
+        _logger.debug("{}: {}".format(locator, sorted_roots))
         return sorted_roots
 
-    def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
+    def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
         # roots_map is a dictionary, mapping Keep service root strings
         # to KeepService objects.  Poll for Keep services, and add any
         # new ones to roots_map.  Return the current list of local
         # root strings.
         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
-        local_roots = self.weighted_service_roots(md5_s, force_rebuild)
+        local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
         for root in local_roots:
             if root not in roots_map:
-                roots_map[root] = self.KeepService(root, self.session, **headers)
+                roots_map[root] = self.KeepService(
+                    root, self._user_agent_pool,
+                    upload_counter=self.upload_counter,
+                    download_counter=self.download_counter,
+                    **headers)
         return local_roots
 
     @staticmethod
@@ -605,13 +925,20 @@ class KeepClient(object):
     def get_from_cache(self, loc):
         """Fetch a block only if is in the cache, otherwise return None."""
         slot = self.block_cache.get(loc)
-        if slot.ready.is_set():
+        if slot is not None and slot.ready.is_set():
             return slot.get()
         else:
             return None
 
+    @retry.retry_method
+    def head(self, loc_s, num_retries=None):
+        return self._get_or_head(loc_s, method="HEAD", num_retries=num_retries)
+
     @retry.retry_method
     def get(self, loc_s, num_retries=None):
+        return self._get_or_head(loc_s, method="GET", num_retries=num_retries)
+
+    def _get_or_head(self, loc_s, method="GET", num_retries=None):
         """Get data from Keep.
 
         This method fetches one or more blocks of data from Keep.  It
@@ -633,30 +960,55 @@ class KeepClient(object):
         """
         if ',' in loc_s:
             return ''.join(self.get(x) for x in loc_s.split(','))
+
+        self.get_counter.add(1)
+
         locator = KeepLocator(loc_s)
-        expect_hash = locator.md5sum
-        slot, first = self.block_cache.reserve_cache(expect_hash)
-        if not first:
-            v = slot.get()
-            return v
+        if method == "GET":
+            slot, first = self.block_cache.reserve_cache(locator.md5sum)
+            if not first:
+                self.hits_counter.add(1)
+                v = slot.get()
+                return v
+
+        self.misses_counter.add(1)
+
+        # If the locator has hints specifying a prefix (indicating a
+        # remote keepproxy) or the UUID of a local gateway service,
+        # read data from the indicated service(s) instead of the usual
+        # list of local disk services.
+        hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
+                      for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
+        hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
+                           for hint in locator.hints if (
+                                   hint.startswith('K@') and
+                                   len(hint) == 29 and
+                                   self._gateway_services.get(hint[2:])
+                                   )])
+        # Map root URLs to their KeepService objects.
+        roots_map = {
+            root: self.KeepService(root, self._user_agent_pool,
+                                   upload_counter=self.upload_counter,
+                                   download_counter=self.download_counter)
+            for root in hint_roots
+        }
 
         # See #3147 for a discussion of the loop implementation.  Highlights:
         # * Refresh the list of Keep services after each failure, in case
         #   it's being updated.
         # * Retry until we succeed, we're out of retries, or every available
         #   service has returned permanent failure.
-        hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
-                      for hint in locator.hints if hint.startswith('K@')]
-        # Map root URLs their KeepService objects.
-        roots_map = {root: self.KeepService(root, self.session) for root in hint_roots}
+        sorted_roots = []
+        roots_map = {}
         blob = None
         loop = retry.RetryLoop(num_retries, self._check_loop_result,
                                backoff_start=2)
         for tries_left in loop:
             try:
-                local_roots = self.map_new_services(
-                    roots_map, expect_hash,
-                    force_rebuild=(tries_left < num_retries))
+                sorted_roots = self.map_new_services(
+                    roots_map, locator,
+                    force_rebuild=(tries_left < num_retries),
+                    need_writable=False)
             except Exception as error:
                 loop.save_result(error)
                 continue
@@ -664,41 +1016,40 @@ class KeepClient(object):
             # Query KeepService objects that haven't returned
             # permanent failure, in our specified shuffle order.
             services_to_try = [roots_map[root]
-                               for root in (local_roots + hint_roots)
+                               for root in sorted_roots
                                if roots_map[root].usable()]
             for keep_service in services_to_try:
-                blob = keep_service.get(locator, timeout=self.current_timeout())
+                blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
                 if blob is not None:
                     break
             loop.save_result((blob, len(services_to_try)))
 
         # Always cache the result, then return it if we succeeded.
-        slot.set(blob)
-        self.block_cache.cap_cache()
+        if method == "GET":
+            slot.set(blob)
+            self.block_cache.cap_cache()
         if loop.success():
-            return blob
+            if method == "HEAD":
+                return True
+            else:
+                return blob
 
-        try:
-            all_roots = local_roots + hint_roots
-        except NameError:
-            # We never successfully fetched local_roots.
-            all_roots = hint_roots
         # Q: Including 403 is necessary for the Keep tests to continue
         # passing, but maybe they should expect KeepReadError instead?
-        not_founds = sum(1 for key in all_roots
-                         if roots_map[key].last_status() in {403, 404, 410})
-        service_errors = ((key, roots_map[key].last_result)
-                          for key in all_roots)
+        not_founds = sum(1 for key in sorted_roots
+                         if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
+        service_errors = ((key, roots_map[key].last_result()['error'])
+                          for key in sorted_roots)
         if not roots_map:
             raise arvados.errors.KeepReadError(
                 "failed to read {}: no Keep services available ({})".format(
                     loc_s, loop.last_result()))
-        elif not_founds == len(all_roots):
+        elif not_founds == len(sorted_roots):
             raise arvados.errors.NotFoundError(
                 "{} not found".format(loc_s), service_errors)
         else:
             raise arvados.errors.KeepReadError(
-                "failed to read {}".format(loc_s), service_errors)
+                "failed to read {}".format(loc_s), service_errors, label="service")
 
     @retry.retry_method
     def put(self, data, copies=2, num_retries=None):
@@ -719,57 +1070,63 @@ class KeepClient(object):
           exponential backoff.  The default value is set when the
           KeepClient is initialized.
         """
+
+        if isinstance(data, unicode):
+            data = data.encode("ascii")
+        elif not isinstance(data, str):
+            raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
+
+        self.put_counter.add(1)
+
         data_hash = hashlib.md5(data).hexdigest()
+        loc_s = data_hash + '+' + str(len(data))
         if copies < 1:
-            return data_hash
+            return loc_s
+        locator = KeepLocator(loc_s)
 
         headers = {}
-        if self.using_proxy:
-            # Tell the proxy how many copies we want it to store
-            headers['X-Keep-Desired-Replication'] = str(copies)
+        # Tell the proxy how many copies we want it to store
+        headers['X-Keep-Desired-Replicas'] = str(copies)
         roots_map = {}
-        thread_limiter = KeepClient.ThreadLimiter(copies)
         loop = retry.RetryLoop(num_retries, self._check_loop_result,
                                backoff_start=2)
+        done = 0
         for tries_left in loop:
             try:
-                local_roots = self.map_new_services(
-                    roots_map, data_hash,
-                    force_rebuild=(tries_left < num_retries), **headers)
+                sorted_roots = self.map_new_services(
+                    roots_map, locator,
+                    force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
             except Exception as error:
                 loop.save_result(error)
                 continue
 
-            threads = []
-            for service_root, ks in roots_map.iteritems():
+            writer_pool = KeepClient.KeepWriterThreadPool(data=data, 
+                                                        data_hash=data_hash,
+                                                        copies=copies - done,
+                                                        max_service_replicas=self.max_replicas_per_service,
+                                                        timeout=self.current_timeout(num_retries - tries_left))
+            for service_root, ks in [(root, roots_map[root])
+                                     for root in sorted_roots]:
                 if ks.finished():
                     continue
-                t = KeepClient.KeepWriterThread(
-                    ks,
-                    data=data,
-                    data_hash=data_hash,
-                    service_root=service_root,
-                    thread_limiter=thread_limiter,
-                    timeout=self.current_timeout())
-                t.start()
-                threads.append(t)
-            for t in threads:
-                t.join()
-            loop.save_result((thread_limiter.done() >= copies, len(threads)))
+                writer_pool.add_task(ks, service_root)
+            writer_pool.join()
+            done += writer_pool.done()
+            loop.save_result((done >= copies, writer_pool.total_task_nr))
 
         if loop.success():
-            return thread_limiter.response()
+            return writer_pool.response()
         if not roots_map:
             raise arvados.errors.KeepWriteError(
                 "failed to write {}: no Keep services available ({})".format(
                     data_hash, loop.last_result()))
         else:
-            service_errors = ((key, roots_map[key].last_result)
-                              for key in local_roots
-                              if not roots_map[key].success_flag)
+            service_errors = ((key, roots_map[key].last_result()['error'])
+                              for key in sorted_roots
+                              if roots_map[key].last_result()['error'])
             raise arvados.errors.KeepWriteError(
                 "failed to write {} (wanted {} copies but wrote {})".format(
-                    data_hash, copies, thread_limiter.done()), service_errors)
+                    data_hash, copies, writer_pool.done()), service_errors, label="service")
 
     def local_store_put(self, data, copies=1, num_retries=None):
         """A stub for put().