Merge branch 'master' of git.curoverse.com:arvados into 13076-r-autogen-api
[arvados.git] / sdk / python / arvados / keep.py
index e27885efe3e4f4724edec8194467e4a72a3d7667..e8e95afc7013650c67e753a3f2de4e7ec227fc44 100644 (file)
@@ -1,16 +1,37 @@
-import cStringIO
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import absolute_import
+from __future__ import division
+from future import standard_library
+from future.utils import native_str
+standard_library.install_aliases()
+from builtins import next
+from builtins import str
+from builtins import range
+from builtins import object
+import collections
 import datetime
 import hashlib
 import datetime
 import hashlib
+import io
 import logging
 import math
 import os
 import pycurl
 import logging
 import math
 import os
 import pycurl
-import Queue
+import queue
 import re
 import socket
 import ssl
 import re
 import socket
 import ssl
+import sys
 import threading
 import threading
-import timer
+from . import timer
+import urllib.parse
+
+if sys.version_info >= (3, 0):
+    from io import BytesIO
+else:
+    from cStringIO import StringIO as BytesIO
 
 import arvados
 import arvados.config as config
 
 import arvados
 import arvados.config as config
@@ -22,6 +43,17 @@ _logger = logging.getLogger('arvados.keep')
 global_client_object = None
 
 
 global_client_object = None
 
 
+# Monkey patch TCP constants when not available (apple). Values sourced from:
+# http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
+if sys.platform == 'darwin':
+    if not hasattr(socket, 'TCP_KEEPALIVE'):
+        socket.TCP_KEEPALIVE = 0x010
+    if not hasattr(socket, 'TCP_KEEPINTVL'):
+        socket.TCP_KEEPINTVL = 0x101
+    if not hasattr(socket, 'TCP_KEEPCNT'):
+        socket.TCP_KEEPCNT = 0x102
+
+
 class KeepLocator(object):
     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
 class KeepLocator(object):
     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
@@ -46,8 +78,9 @@ class KeepLocator(object):
 
     def __str__(self):
         return '+'.join(
 
     def __str__(self):
         return '+'.join(
-            str(s) for s in [self.md5sum, self.size,
-                             self.permission_hint()] + self.hints
+            native_str(s)
+            for s in [self.md5sum, self.size,
+                      self.permission_hint()] + self.hints
             if s is not None)
 
     def stripped(self):
             if s is not None)
 
     def stripped(self):
@@ -64,7 +97,7 @@ class KeepLocator(object):
             return getattr(self, data_name)
         def setter(self, hex_str):
             if not arvados.util.is_hex(hex_str, length):
             return getattr(self, data_name)
         def setter(self, hex_str):
             if not arvados.util.is_hex(hex_str, length):
-                raise ValueError("{} is not a {}-digit hex string: {}".
+                raise ValueError("{} is not a {}-digit hex string: {!r}".
                                  format(name, length, hex_str))
             setattr(self, data_name, hex_str)
         return property(getter, setter)
                                  format(name, length, hex_str))
             setattr(self, data_name, hex_str)
         return property(getter, setter)
@@ -177,7 +210,7 @@ class KeepBlockCache(object):
             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
             sm = sum([slot.size() for slot in self._cache])
             while len(self._cache) > 0 and sm > self.cache_max:
             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
             sm = sum([slot.size() for slot in self._cache])
             while len(self._cache) > 0 and sm > self.cache_max:
-                for i in xrange(len(self._cache)-1, -1, -1):
+                for i in range(len(self._cache)-1, -1, -1):
                     if self._cache[i].ready.is_set():
                         del self._cache[i]
                         break
                     if self._cache[i].ready.is_set():
                         del self._cache[i]
                         break
@@ -185,7 +218,7 @@ class KeepBlockCache(object):
 
     def _get(self, locator):
         # Test if the locator is already in the cache
 
     def _get(self, locator):
         # Test if the locator is already in the cache
-        for i in xrange(0, len(self._cache)):
+        for i in range(0, len(self._cache)):
             if self._cache[i].locator == locator:
                 n = self._cache[i]
                 if i != 0:
             if self._cache[i].locator == locator:
                 n = self._cache[i]
                 if i != 0:
@@ -237,90 +270,6 @@ class KeepClient(object):
     DEFAULT_TIMEOUT = (2, 256, 32768)
     DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
 
     DEFAULT_TIMEOUT = (2, 256, 32768)
     DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
 
-    class ThreadLimiter(object):
-        """Limit the number of threads writing to Keep at once.
-
-        This ensures that only a number of writer threads that could
-        potentially achieve the desired replication level run at once.
-        Once the desired replication level is achieved, queued threads
-        are instructed not to run.
-
-        Should be used in a "with" block.
-        """
-        def __init__(self, want_copies, max_service_replicas):
-            self._started = 0
-            self._want_copies = want_copies
-            self._done = 0
-            self._thread_failures = 0
-            self._response = None
-            self._start_lock = threading.Condition()
-            if (not max_service_replicas) or (max_service_replicas >= want_copies):
-                max_threads = 1
-            else:
-                max_threads = math.ceil(float(want_copies) / max_service_replicas)
-            _logger.debug("Limiter max threads is %d", max_threads)
-            self._todo_lock = threading.Semaphore(max_threads)
-            self._done_lock = threading.Lock()
-            self._thread_failures_lock = threading.Lock()
-            self._local = threading.local()
-
-        def __enter__(self):
-            self._start_lock.acquire()
-            if getattr(self._local, 'sequence', None) is not None:
-                # If the calling thread has used set_sequence(N), then
-                # we wait here until N other threads have started.
-                while self._started < self._local.sequence:
-                    self._start_lock.wait()
-            self._todo_lock.acquire()
-            self._started += 1
-            self._start_lock.notifyAll()
-            self._start_lock.release()
-            return self
-
-        def __exit__(self, type, value, traceback):
-            with self._thread_failures_lock:
-                if self._thread_failures > 0:
-                    self._thread_failures -= 1
-                    self._todo_lock.release()
-
-            # If work is finished, release al pending threads
-            if not self.shall_i_proceed():
-                self._todo_lock.release()
-
-        def set_sequence(self, sequence):
-            self._local.sequence = sequence
-
-        def shall_i_proceed(self):
-            """
-            Return true if the current thread should write to Keep.
-            Return false otherwise.
-            """
-            with self._done_lock:
-                return (self._done < self._want_copies)
-
-        def save_response(self, response_body, replicas_stored):
-            """
-            Records a response body (a locator, possibly signed) returned by
-            the Keep server, and the number of replicas it stored.
-            """
-            if replicas_stored == 0:
-                # Failure notification, should start a new thread to try to reach full replication
-                with self._thread_failures_lock:
-                    self._thread_failures += 1
-            else:
-                with self._done_lock:
-                    self._done += replicas_stored
-                    self._response = response_body
-
-        def response(self):
-            """Return the body from the response to a PUT request."""
-            with self._done_lock:
-                return self._response
-
-        def done(self):
-            """Return the total number of replicas successfully stored."""
-            with self._done_lock:
-                return self._done
 
     class KeepService(object):
         """Make requests to a single Keep service, and track results.
 
     class KeepService(object):
         """Make requests to a single Keep service, and track results.
@@ -340,14 +289,16 @@ class KeepClient(object):
             arvados.errors.HttpError,
         )
 
             arvados.errors.HttpError,
         )
 
-        def __init__(self, root, user_agent_pool=Queue.LifoQueue(),
+        def __init__(self, root, user_agent_pool=queue.LifoQueue(),
                      upload_counter=None,
                      upload_counter=None,
-                     download_counter=None, **headers):
+                     download_counter=None,
+                     headers={}):
             self.root = root
             self._user_agent_pool = user_agent_pool
             self._result = {'error': None}
             self._usable = True
             self._session = None
             self.root = root
             self._user_agent_pool = user_agent_pool
             self._result = {'error': None}
             self._usable = True
             self._session = None
+            self._socket = None
             self.get_headers = {'Accept': 'application/octet-stream'}
             self.get_headers.update(headers)
             self.put_headers = headers
             self.get_headers = {'Accept': 'application/octet-stream'}
             self.get_headers.update(headers)
             self.put_headers = headers
@@ -367,24 +318,39 @@ class KeepClient(object):
 
         def _get_user_agent(self):
             try:
 
         def _get_user_agent(self):
             try:
-                return self._user_agent_pool.get(False)
-            except Queue.Empty:
+                return self._user_agent_pool.get(block=False)
+            except queue.Empty:
                 return pycurl.Curl()
 
         def _put_user_agent(self, ua):
             try:
                 ua.reset()
                 return pycurl.Curl()
 
         def _put_user_agent(self, ua):
             try:
                 ua.reset()
-                self._user_agent_pool.put(ua, False)
+                self._user_agent_pool.put(ua, block=False)
             except:
                 ua.close()
 
             except:
                 ua.close()
 
-        @staticmethod
-        def _socket_open(family, socktype, protocol, address=None):
+        def _socket_open(self, *args, **kwargs):
+            if len(args) + len(kwargs) == 2:
+                return self._socket_open_pycurl_7_21_5(*args, **kwargs)
+            else:
+                return self._socket_open_pycurl_7_19_3(*args, **kwargs)
+
+        def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
+            return self._socket_open_pycurl_7_21_5(
+                purpose=None,
+                address=collections.namedtuple(
+                    'Address', ['family', 'socktype', 'protocol', 'addr'],
+                )(family, socktype, protocol, address))
+
+        def _socket_open_pycurl_7_21_5(self, purpose, address):
             """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
             """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
-            s = socket.socket(family, socktype, protocol)
+            s = socket.socket(address.family, address.socktype, address.protocol)
             s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
             s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
-            s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
+            # Will throw invalid protocol error on mac. This test prevents that.
+            if hasattr(socket, 'TCP_KEEPIDLE'):
+                s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
+            self._socket = s
             return s
 
         def get(self, locator, method="GET", timeout=None):
             return s
 
         def get(self, locator, method="GET", timeout=None):
@@ -396,12 +362,13 @@ class KeepClient(object):
             try:
                 with timer.Timer() as t:
                     self._headers = {}
             try:
                 with timer.Timer() as t:
                     self._headers = {}
-                    response_body = cStringIO.StringIO()
+                    response_body = BytesIO()
                     curl.setopt(pycurl.NOSIGNAL, 1)
                     curl.setopt(pycurl.NOSIGNAL, 1)
-                    curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
+                    curl.setopt(pycurl.OPENSOCKETFUNCTION,
+                                lambda *args, **kwargs: self._socket_open(*args, **kwargs))
                     curl.setopt(pycurl.URL, url.encode('utf-8'))
                     curl.setopt(pycurl.HTTPHEADER, [
                     curl.setopt(pycurl.URL, url.encode('utf-8'))
                     curl.setopt(pycurl.HTTPHEADER, [
-                        '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
+                        '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
                     if method == "HEAD":
                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
                     if method == "HEAD":
@@ -412,6 +379,10 @@ class KeepClient(object):
                         curl.perform()
                     except Exception as e:
                         raise arvados.errors.HttpError(0, str(e))
                         curl.perform()
                     except Exception as e:
                         raise arvados.errors.HttpError(0, str(e))
+                    finally:
+                        if self._socket:
+                            self._socket.close()
+                            self._socket = None
                     self._result = {
                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
                         'body': response_body.getvalue(),
                     self._result = {
                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
                         'body': response_body.getvalue(),
@@ -452,7 +423,7 @@ class KeepClient(object):
                          self._result['status_code'],
                          len(self._result['body']),
                          t.msecs,
                          self._result['status_code'],
                          len(self._result['body']),
                          t.msecs,
-                         (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
+                         1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
 
             if self.download_counter:
                 self.download_counter.add(len(self._result['body']))
 
             if self.download_counter:
                 self.download_counter.add(len(self._result['body']))
@@ -473,10 +444,11 @@ class KeepClient(object):
             try:
                 with timer.Timer() as t:
                     self._headers = {}
             try:
                 with timer.Timer() as t:
                     self._headers = {}
-                    body_reader = cStringIO.StringIO(body)
-                    response_body = cStringIO.StringIO()
+                    body_reader = BytesIO(body)
+                    response_body = BytesIO()
                     curl.setopt(pycurl.NOSIGNAL, 1)
                     curl.setopt(pycurl.NOSIGNAL, 1)
-                    curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
+                    curl.setopt(pycurl.OPENSOCKETFUNCTION,
+                                lambda *args, **kwargs: self._socket_open(*args, **kwargs))
                     curl.setopt(pycurl.URL, url.encode('utf-8'))
                     # Using UPLOAD tells cURL to wait for a "go ahead" from the
                     # Keep server (in the form of a HTTP/1.1 "100 Continue"
                     curl.setopt(pycurl.URL, url.encode('utf-8'))
                     # Using UPLOAD tells cURL to wait for a "go ahead" from the
                     # Keep server (in the form of a HTTP/1.1 "100 Continue"
@@ -488,7 +460,7 @@ class KeepClient(object):
                     curl.setopt(pycurl.INFILESIZE, len(body))
                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
                     curl.setopt(pycurl.HTTPHEADER, [
                     curl.setopt(pycurl.INFILESIZE, len(body))
                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
                     curl.setopt(pycurl.HTTPHEADER, [
-                        '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
+                        '{}: {}'.format(k,v) for k,v in self.put_headers.items()])
                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
                     self._setcurltimeouts(curl, timeout)
                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
                     self._setcurltimeouts(curl, timeout)
@@ -496,9 +468,13 @@ class KeepClient(object):
                         curl.perform()
                     except Exception as e:
                         raise arvados.errors.HttpError(0, str(e))
                         curl.perform()
                     except Exception as e:
                         raise arvados.errors.HttpError(0, str(e))
+                    finally:
+                        if self._socket:
+                            self._socket.close()
+                            self._socket = None
                     self._result = {
                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
                     self._result = {
                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
-                        'body': response_body.getvalue(),
+                        'body': response_body.getvalue().decode('utf-8'),
                         'headers': self._headers,
                         'error': False,
                     }
                         'headers': self._headers,
                         'error': False,
                     }
@@ -525,7 +501,7 @@ class KeepClient(object):
                          self._result['status_code'],
                          len(body),
                          t.msecs,
                          self._result['status_code'],
                          len(body),
                          t.msecs,
-                         (len(body)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
+                         1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
             if self.upload_counter:
                 self.upload_counter.add(len(body))
             return True
             if self.upload_counter:
                 self.upload_counter.add(len(body))
             return True
@@ -547,7 +523,8 @@ class KeepClient(object):
             curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
 
         def _headerfunction(self, header_line):
             curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
 
         def _headerfunction(self, header_line):
-            header_line = header_line.decode('iso-8859-1')
+            if isinstance(header_line, bytes):
+                header_line = header_line.decode('iso-8859-1')
             if ':' in header_line:
                 name, value = header_line.split(':', 1)
                 name = name.strip().lower()
             if ':' in header_line:
                 name, value = header_line.split(':', 1)
                 name = name.strip().lower()
@@ -564,164 +541,150 @@ class KeepClient(object):
             self._lastheadername = name
             self._headers[name] = value
             # Returning None implies all bytes were written
             self._lastheadername = name
             self._headers[name] = value
             # Returning None implies all bytes were written
-    
 
 
-    class KeepWriterQueue(Queue.Queue):
+
+    class KeepWriterQueue(queue.Queue):
         def __init__(self, copies):
         def __init__(self, copies):
-            super(KeepClient.KeepWriterQueue, self).__init__()
+            queue.Queue.__init__(self) # Old-style superclass
             self.wanted_copies = copies
             self.successful_copies = 0
             self.wanted_copies = copies
             self.successful_copies = 0
+            self.response = None
             self.successful_copies_lock = threading.Lock()
             self.successful_copies_lock = threading.Lock()
-            self.retries = copies
-            self.retries_notification = threading.Condition()
-        
-        def write_success(self, replicas_nr):
+            self.pending_tries = copies
+            self.pending_tries_notification = threading.Condition()
+
+        def write_success(self, response, replicas_nr):
             with self.successful_copies_lock:
                 self.successful_copies += replicas_nr
             with self.successful_copies_lock:
                 self.successful_copies += replicas_nr
-        
-        def write_fail(self, ks, status_code):
-            with self.retries_notification:
-                self.retries += 1
-                self.retries_notification.notify()
-        
+                self.response = response
+            with self.pending_tries_notification:
+                self.pending_tries_notification.notify_all()
+
+        def write_fail(self, ks):
+            with self.pending_tries_notification:
+                self.pending_tries += 1
+                self.pending_tries_notification.notify()
+
         def pending_copies(self):
         def pending_copies(self):
-            return self.wanted_copies - self.successful_copies
-    
-    
+            with self.successful_copies_lock:
+                return self.wanted_copies - self.successful_copies
+
+        def get_next_task(self):
+            with self.pending_tries_notification:
+                while True:
+                    if self.pending_copies() < 1:
+                        # This notify_all() is unnecessary --
+                        # write_success() already called notify_all()
+                        # when pending<1 became true, so it's not
+                        # possible for any other thread to be in
+                        # wait() now -- but it's cheap insurance
+                        # against deadlock so we do it anyway:
+                        self.pending_tries_notification.notify_all()
+                        # Drain the queue and then raise Queue.Empty
+                        while True:
+                            self.get_nowait()
+                            self.task_done()
+                    elif self.pending_tries > 0:
+                        service, service_root = self.get_nowait()
+                        if service.finished():
+                            self.task_done()
+                            continue
+                        self.pending_tries -= 1
+                        return service, service_root
+                    elif self.empty():
+                        self.pending_tries_notification.notify_all()
+                        raise queue.Empty
+                    else:
+                        self.pending_tries_notification.wait()
+
+
     class KeepWriterThreadPool(object):
     class KeepWriterThreadPool(object):
-        def __init__(self, data, data_hash, num_threads, copies=2):
+        def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None):
+            self.total_task_nr = 0
             self.wanted_copies = copies
             self.wanted_copies = copies
+            if (not max_service_replicas) or (max_service_replicas >= copies):
+                num_threads = 1
+            else:
+                num_threads = int(math.ceil(1.0*copies/max_service_replicas))
+            _logger.debug("Pool max threads is %d", num_threads)
             self.workers = []
             self.queue = KeepClient.KeepWriterQueue(copies)
             self.workers = []
             self.queue = KeepClient.KeepWriterQueue(copies)
-            # Start workers
+            # Create workers
             for _ in range(num_threads):
             for _ in range(num_threads):
-                self.workers.append(KeepClient.KeepWriterThreadNew(self.queue, data, data_hash))
-        
+                w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
+                self.workers.append(w)
+
         def add_task(self, ks, service_root):
             self.queue.put((ks, service_root))
         def add_task(self, ks, service_root):
             self.queue.put((ks, service_root))
-        
-        def successful_copies(self):
+            self.total_task_nr += 1
+
+        def done(self):
             return self.queue.successful_copies
             return self.queue.successful_copies
-        
-        def start(self):
+
+        def join(self):
+            # Start workers
             for worker in self.workers:
                 worker.start()
             for worker in self.workers:
                 worker.start()
-    
-    
-    class KeepWriterThreadNew(threading.Thread):
-        def __init__(self, queue, data, data_hash):
-            super(KeepClient.KeepWriterThreadNew, self).__init__()
-            self.queue = queue
-            self.data = data
-            self.data_hash = data_hash
-            self.daemon = True
-        
-        def run(self):
-            while not self.queue.empty():
-                if self.queue.pending_copies() > 0:
-                    # Avoid overreplication, wait for some needed retry
-                    with self.queue.retries_notification:
-                        if not self.queue.retries > 0:
-                            self.queue.retries_notification.wait()
-                            continue # try again when awake
-                        self.queue.retries -= 1
-
-                    # Get to work
-                    service, service_root = self.queue.get()
-                    
-                    success = bool(self.service.put(self.data_hash,
-                                                    self.data,
-                                                    timeout=None))
-                    result = service.last_result()
-                    if success:
-                        _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
-                                      str(threading.current_thread()),
-                                      self.data_hash,
-                                      len(self.data),
-                                      service_root)
-                        try:
-                            replicas_stored = int(result['headers']['x-keep-replicas-stored'])
-                        except (KeyError, ValueError):
-                            replicas_stored = 1
-                        self.queue.write_success(replicas_stored)
-                    else:
-                        if result.get('status_code', None):
-                            _logger.debug("Request fail: PUT %s => %s %s",
-                                          self.data_hash,
-                                          result['status_code'],
-                                          result['body'])
-                        self.queue.write_fail(service, result.get('status_code', None)) # Schedule a retry
-                else:
-                    # Remove the task from the queue anyways
-                    self.queue.get()
-                # Mark as done so the queue can be join()ed
-                self.queue.task_done()
+            # Wait for finished work
+            self.queue.join()
+
+        def response(self):
+            return self.queue.response
 
 
     class KeepWriterThread(threading.Thread):
 
 
     class KeepWriterThread(threading.Thread):
-        """
-        Write a blob of data to the given Keep server. On success, call
-        save_response() of the given ThreadLimiter to save the returned
-        locator.
-        """
-        def __init__(self, keep_service, **kwargs):
-            super(KeepClient.KeepWriterThread, self).__init__()
-            self.service = keep_service
-            self.args = kwargs
-            self._success = False
+        TaskFailed = RuntimeError()
 
 
-        def success(self):
-            return self._success
+        def __init__(self, queue, data, data_hash, timeout=None):
+            super(KeepClient.KeepWriterThread, self).__init__()
+            self.timeout = timeout
+            self.queue = queue
+            self.data = data
+            self.data_hash = data_hash
+            self.daemon = True
 
         def run(self):
 
         def run(self):
-            limiter = self.args['thread_limiter']
-            sequence = self.args['thread_sequence']
-            if sequence is not None:
-                limiter.set_sequence(sequence)
-            with limiter:
-                if not limiter.shall_i_proceed():
-                    # My turn arrived, but the job has been done without
-                    # me.
+            while True:
+                try:
+                    service, service_root = self.queue.get_next_task()
+                except queue.Empty:
                     return
                     return
-                self.run_with_limiter(limiter)
-
-        def run_with_limiter(self, limiter):
-            if self.service.finished():
-                return
-            _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
-                          str(threading.current_thread()),
-                          self.args['data_hash'],
-                          len(self.args['data']),
-                          self.args['service_root'])
-            self._success = bool(self.service.put(
-                self.args['data_hash'],
-                self.args['data'],
-                timeout=self.args.get('timeout', None)))
-            result = self.service.last_result()
-            if self._success:
-                _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
-                              str(threading.current_thread()),
-                              self.args['data_hash'],
-                              len(self.args['data']),
-                              self.args['service_root'])
-                # Tick the 'done' counter for the number of replica
-                # reported stored by the server, for the case that
-                # we're talking to a proxy or other backend that
-                # stores to multiple copies for us.
                 try:
                 try:
-                    replicas_stored = int(result['headers']['x-keep-replicas-stored'])
-                except (KeyError, ValueError):
-                    replicas_stored = 1
-                limiter.save_response(result['body'].strip(), replicas_stored)
-            elif result.get('status_code', None):
-                _logger.debug("Request fail: PUT %s => %s %s",
-                              self.args['data_hash'],
-                              result['status_code'],
-                              result['body'])
-            if not self._success:
-                # Notify the failure so that the Thread limiter allows
-                # a new one to run.
-                limiter.save_response(None, 0)
+                    locator, copies = self.do_task(service, service_root)
+                except Exception as e:
+                    if e is not self.TaskFailed:
+                        _logger.exception("Exception in KeepWriterThread")
+                    self.queue.write_fail(service)
+                else:
+                    self.queue.write_success(locator, copies)
+                finally:
+                    self.queue.task_done()
+
+        def do_task(self, service, service_root):
+            success = bool(service.put(self.data_hash,
+                                        self.data,
+                                        timeout=self.timeout))
+            result = service.last_result()
+
+            if not success:
+                if result.get('status_code', None):
+                    _logger.debug("Request fail: PUT %s => %s %s",
+                                  self.data_hash,
+                                  result['status_code'],
+                                  result['body'])
+                raise self.TaskFailed
+
+            _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
+                          str(threading.current_thread()),
+                          self.data_hash,
+                          len(self.data),
+                          service_root)
+            try:
+                replicas_stored = int(result['headers']['x-keep-replicas-stored'])
+            except (KeyError, ValueError):
+                replicas_stored = 1
+
+            return result['body'].strip(), replicas_stored
 
 
     def __init__(self, api_client=None, proxy=None,
 
 
     def __init__(self, api_client=None, proxy=None,
@@ -739,8 +702,9 @@ class KeepClient(object):
         :proxy:
           If specified, this KeepClient will send requests to this Keep
           proxy.  Otherwise, KeepClient will fall back to the setting of the
         :proxy:
           If specified, this KeepClient will send requests to this Keep
           proxy.  Otherwise, KeepClient will fall back to the setting of the
-          ARVADOS_KEEP_PROXY configuration setting.  If you want to ensure
-          KeepClient does not use a proxy, pass in an empty string.
+          ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
+          If you want to KeepClient does not use a proxy, pass in an empty
+          string.
 
         :timeout:
           The initial timeout (in seconds) for HTTP requests to Keep
 
         :timeout:
           The initial timeout (in seconds) for HTTP requests to Keep
@@ -783,7 +747,10 @@ class KeepClient(object):
         """
         self.lock = threading.Lock()
         if proxy is None:
         """
         self.lock = threading.Lock()
         if proxy is None:
-            proxy = config.get('ARVADOS_KEEP_PROXY')
+            if config.get('ARVADOS_KEEP_SERVICES'):
+                proxy = config.get('ARVADOS_KEEP_SERVICES')
+            else:
+                proxy = config.get('ARVADOS_KEEP_PROXY')
         if api_token is None:
             if api_client is None:
                 api_token = config.get('ARVADOS_API_TOKEN')
         if api_token is None:
             if api_client is None:
                 api_token = config.get('ARVADOS_API_TOKEN')
@@ -798,7 +765,7 @@ class KeepClient(object):
         self.block_cache = block_cache if block_cache else KeepBlockCache()
         self.timeout = timeout
         self.proxy_timeout = proxy_timeout
         self.block_cache = block_cache if block_cache else KeepBlockCache()
         self.timeout = timeout
         self.proxy_timeout = proxy_timeout
-        self._user_agent_pool = Queue.LifoQueue()
+        self._user_agent_pool = queue.LifoQueue()
         self.upload_counter = Counter()
         self.download_counter = Counter()
         self.put_counter = Counter()
         self.upload_counter = Counter()
         self.download_counter = Counter()
         self.put_counter = Counter()
@@ -814,15 +781,21 @@ class KeepClient(object):
             self.num_retries = num_retries
             self.max_replicas_per_service = None
             if proxy:
             self.num_retries = num_retries
             self.max_replicas_per_service = None
             if proxy:
-                if not proxy.endswith('/'):
-                    proxy += '/'
+                proxy_uris = proxy.split()
+                for i in range(len(proxy_uris)):
+                    if not proxy_uris[i].endswith('/'):
+                        proxy_uris[i] += '/'
+                    # URL validation
+                    url = urllib.parse.urlparse(proxy_uris[i])
+                    if not (url.scheme and url.netloc):
+                        raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
                 self.api_token = api_token
                 self._gateway_services = {}
                 self._keep_services = [{
                 self.api_token = api_token
                 self._gateway_services = {}
                 self._keep_services = [{
-                    'uuid': 'proxy',
+                    'uuid': "00000-bi6l4-%015d" % idx,
                     'service_type': 'proxy',
                     'service_type': 'proxy',
-                    '_service_root': proxy,
-                    }]
+                    '_service_root': uri,
+                    } for idx, uri in enumerate(proxy_uris)]
                 self._writable_services = self._keep_services
                 self.using_proxy = True
                 self._static_services_list = True
                 self._writable_services = self._keep_services
                 self.using_proxy = True
                 self._static_services_list = True
@@ -879,7 +852,7 @@ class KeepClient(object):
                 raise arvados.errors.NoKeepServersError()
 
             # Precompute the base URI for each service.
                 raise arvados.errors.NoKeepServersError()
 
             # Precompute the base URI for each service.
-            for r in self._gateway_services.itervalues():
+            for r in self._gateway_services.values():
                 host = r['service_host']
                 if not host.startswith('[') and host.find(':') >= 0:
                     # IPv6 URIs must be formatted like http://[::1]:80/...
                 host = r['service_host']
                 if not host.startswith('[') and host.find(':') >= 0:
                     # IPv6 URIs must be formatted like http://[::1]:80/...
@@ -891,7 +864,7 @@ class KeepClient(object):
 
             _logger.debug(str(self._gateway_services))
             self._keep_services = [
 
             _logger.debug(str(self._gateway_services))
             self._keep_services = [
-                ks for ks in self._gateway_services.itervalues()
+                ks for ks in self._gateway_services.values()
                 if not ks.get('service_type', '').startswith('gateway:')]
             self._writable_services = [ks for ks in self._keep_services
                                        if not ks.get('read_only')]
                 if not ks.get('service_type', '').startswith('gateway:')]
             self._writable_services = [ks for ks in self._keep_services
                                        if not ks.get('read_only')]
@@ -910,7 +883,7 @@ class KeepClient(object):
         The weight is md5(h + u) where u is the last 15 characters of
         the service endpoint's UUID.
         """
         The weight is md5(h + u) where u is the last 15 characters of
         the service endpoint's UUID.
         """
-        return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
+        return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
 
     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
         """Return an array of Keep service endpoints, in the order in
 
     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
         """Return an array of Keep service endpoints, in the order in
@@ -948,7 +921,7 @@ class KeepClient(object):
         _logger.debug("{}: {}".format(locator, sorted_roots))
         return sorted_roots
 
         _logger.debug("{}: {}".format(locator, sorted_roots))
         return sorted_roots
 
-    def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
+    def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
         # roots_map is a dictionary, mapping Keep service root strings
         # to KeepService objects.  Poll for Keep services, and add any
         # new ones to roots_map.  Return the current list of local
         # roots_map is a dictionary, mapping Keep service root strings
         # to KeepService objects.  Poll for Keep services, and add any
         # new ones to roots_map.  Return the current list of local
@@ -961,7 +934,7 @@ class KeepClient(object):
                     root, self._user_agent_pool,
                     upload_counter=self.upload_counter,
                     download_counter=self.download_counter,
                     root, self._user_agent_pool,
                     upload_counter=self.upload_counter,
                     download_counter=self.download_counter,
-                    **headers)
+                    headers=headers)
         return local_roots
 
     @staticmethod
         return local_roots
 
     @staticmethod
@@ -991,14 +964,14 @@ class KeepClient(object):
             return None
 
     @retry.retry_method
             return None
 
     @retry.retry_method
-    def head(self, loc_s, num_retries=None):
-        return self._get_or_head(loc_s, method="HEAD", num_retries=num_retries)
+    def head(self, loc_s, **kwargs):
+        return self._get_or_head(loc_s, method="HEAD", **kwargs)
 
     @retry.retry_method
 
     @retry.retry_method
-    def get(self, loc_s, num_retries=None):
-        return self._get_or_head(loc_s, method="GET", num_retries=num_retries)
+    def get(self, loc_s, **kwargs):
+        return self._get_or_head(loc_s, method="GET", **kwargs)
 
 
-    def _get_or_head(self, loc_s, method="GET", num_retries=None):
+    def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None):
         """Get data from Keep.
 
         This method fetches one or more blocks of data from Keep.  It
         """Get data from Keep.
 
         This method fetches one or more blocks of data from Keep.  It
@@ -1023,76 +996,90 @@ class KeepClient(object):
 
         self.get_counter.add(1)
 
 
         self.get_counter.add(1)
 
-        locator = KeepLocator(loc_s)
-        if method == "GET":
-            slot, first = self.block_cache.reserve_cache(locator.md5sum)
-            if not first:
-                self.hits_counter.add(1)
-                v = slot.get()
-                return v
-
-        self.misses_counter.add(1)
-
-        # If the locator has hints specifying a prefix (indicating a
-        # remote keepproxy) or the UUID of a local gateway service,
-        # read data from the indicated service(s) instead of the usual
-        # list of local disk services.
-        hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
-                      for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
-        hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
-                           for hint in locator.hints if (
-                                   hint.startswith('K@') and
-                                   len(hint) == 29 and
-                                   self._gateway_services.get(hint[2:])
-                                   )])
-        # Map root URLs to their KeepService objects.
-        roots_map = {
-            root: self.KeepService(root, self._user_agent_pool,
-                                   upload_counter=self.upload_counter,
-                                   download_counter=self.download_counter)
-            for root in hint_roots
-        }
-
-        # See #3147 for a discussion of the loop implementation.  Highlights:
-        # * Refresh the list of Keep services after each failure, in case
-        #   it's being updated.
-        # * Retry until we succeed, we're out of retries, or every available
-        #   service has returned permanent failure.
-        sorted_roots = []
-        roots_map = {}
+        slot = None
         blob = None
         blob = None
-        loop = retry.RetryLoop(num_retries, self._check_loop_result,
-                               backoff_start=2)
-        for tries_left in loop:
-            try:
-                sorted_roots = self.map_new_services(
-                    roots_map, locator,
-                    force_rebuild=(tries_left < num_retries),
-                    need_writable=False)
-            except Exception as error:
-                loop.save_result(error)
-                continue
+        try:
+            locator = KeepLocator(loc_s)
+            if method == "GET":
+                slot, first = self.block_cache.reserve_cache(locator.md5sum)
+                if not first:
+                    self.hits_counter.add(1)
+                    blob = slot.get()
+                    if blob is None:
+                        raise arvados.errors.KeepReadError(
+                            "failed to read {}".format(loc_s))
+                    return blob
+
+            self.misses_counter.add(1)
+
+            headers = {
+                'X-Request-Id': (request_id or
+                                 (hasattr(self, 'api_client') and self.api_client.request_id) or
+                                 arvados.util.new_request_id()),
+            }
+
+            # If the locator has hints specifying a prefix (indicating a
+            # remote keepproxy) or the UUID of a local gateway service,
+            # read data from the indicated service(s) instead of the usual
+            # list of local disk services.
+            hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
+                          for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
+            hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
+                               for hint in locator.hints if (
+                                       hint.startswith('K@') and
+                                       len(hint) == 29 and
+                                       self._gateway_services.get(hint[2:])
+                                       )])
+            # Map root URLs to their KeepService objects.
+            roots_map = {
+                root: self.KeepService(root, self._user_agent_pool,
+                                       upload_counter=self.upload_counter,
+                                       download_counter=self.download_counter,
+                                       headers=headers)
+                for root in hint_roots
+            }
+
+            # See #3147 for a discussion of the loop implementation.  Highlights:
+            # * Refresh the list of Keep services after each failure, in case
+            #   it's being updated.
+            # * Retry until we succeed, we're out of retries, or every available
+            #   service has returned permanent failure.
+            sorted_roots = []
+            roots_map = {}
+            loop = retry.RetryLoop(num_retries, self._check_loop_result,
+                                   backoff_start=2)
+            for tries_left in loop:
+                try:
+                    sorted_roots = self.map_new_services(
+                        roots_map, locator,
+                        force_rebuild=(tries_left < num_retries),
+                        need_writable=False,
+                        headers=headers)
+                except Exception as error:
+                    loop.save_result(error)
+                    continue
 
 
-            # Query KeepService objects that haven't returned
-            # permanent failure, in our specified shuffle order.
-            services_to_try = [roots_map[root]
-                               for root in sorted_roots
-                               if roots_map[root].usable()]
-            for keep_service in services_to_try:
-                blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
-                if blob is not None:
-                    break
-            loop.save_result((blob, len(services_to_try)))
-
-        # Always cache the result, then return it if we succeeded.
-        if method == "GET":
-            slot.set(blob)
-            self.block_cache.cap_cache()
-        if loop.success():
-            if method == "HEAD":
-                return True
-            else:
-                return blob
+                # Query KeepService objects that haven't returned
+                # permanent failure, in our specified shuffle order.
+                services_to_try = [roots_map[root]
+                                   for root in sorted_roots
+                                   if roots_map[root].usable()]
+                for keep_service in services_to_try:
+                    blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
+                    if blob is not None:
+                        break
+                loop.save_result((blob, len(services_to_try)))
+
+            # Always cache the result, then return it if we succeeded.
+            if loop.success():
+                if method == "HEAD":
+                    return True
+                else:
+                    return blob
+        finally:
+            if slot is not None:
+                slot.set(blob)
+                self.block_cache.cap_cache()
 
         # Q: Including 403 is necessary for the Keep tests to continue
         # passing, but maybe they should expect KeepReadError instead?
 
         # Q: Including 403 is necessary for the Keep tests to continue
         # passing, but maybe they should expect KeepReadError instead?
@@ -1112,7 +1099,7 @@ class KeepClient(object):
                 "failed to read {}".format(loc_s), service_errors, label="service")
 
     @retry.retry_method
                 "failed to read {}".format(loc_s), service_errors, label="service")
 
     @retry.retry_method
-    def put(self, data, copies=2, num_retries=None):
+    def put(self, data, copies=2, num_retries=None, request_id=None):
         """Save data in Keep.
 
         This method will get a list of Keep services from the API server, and
         """Save data in Keep.
 
         This method will get a list of Keep services from the API server, and
@@ -1131,10 +1118,8 @@ class KeepClient(object):
           KeepClient is initialized.
         """
 
           KeepClient is initialized.
         """
 
-        if isinstance(data, unicode):
-            data = data.encode("ascii")
-        elif not isinstance(data, str):
-            raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
+        if not isinstance(data, bytes):
+            data = data.encode()
 
         self.put_counter.add(1)
 
 
         self.put_counter.add(1)
 
@@ -1144,9 +1129,12 @@ class KeepClient(object):
             return loc_s
         locator = KeepLocator(loc_s)
 
             return loc_s
         locator = KeepLocator(loc_s)
 
-        headers = {}
-        # Tell the proxy how many copies we want it to store
-        headers['X-Keep-Desired-Replication'] = str(copies)
+        headers = {
+            'X-Request-Id': (request_id or
+                             (hasattr(self, 'api_client') and self.api_client.request_id) or
+                             arvados.util.new_request_id()),
+            'X-Keep-Desired-Replicas': str(copies),
+        }
         roots_map = {}
         loop = retry.RetryLoop(num_retries, self._check_loop_result,
                                backoff_start=2)
         roots_map = {}
         loop = retry.RetryLoop(num_retries, self._check_loop_result,
                                backoff_start=2)
@@ -1155,35 +1143,29 @@ class KeepClient(object):
             try:
                 sorted_roots = self.map_new_services(
                     roots_map, locator,
             try:
                 sorted_roots = self.map_new_services(
                     roots_map, locator,
-                    force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
+                    force_rebuild=(tries_left < num_retries),
+                    need_writable=True,
+                    headers=headers)
             except Exception as error:
                 loop.save_result(error)
                 continue
 
             except Exception as error:
                 loop.save_result(error)
                 continue
 
-            thread_limiter = KeepClient.ThreadLimiter(
-                copies - done, self.max_replicas_per_service)
-            threads = []
+            writer_pool = KeepClient.KeepWriterThreadPool(data=data,
+                                                        data_hash=data_hash,
+                                                        copies=copies - done,
+                                                        max_service_replicas=self.max_replicas_per_service,
+                                                        timeout=self.current_timeout(num_retries - tries_left))
             for service_root, ks in [(root, roots_map[root])
                                      for root in sorted_roots]:
                 if ks.finished():
                     continue
             for service_root, ks in [(root, roots_map[root])
                                      for root in sorted_roots]:
                 if ks.finished():
                     continue
-                t = KeepClient.KeepWriterThread(
-                    ks,
-                    data=data,
-                    data_hash=data_hash,
-                    service_root=service_root,
-                    thread_limiter=thread_limiter,
-                    timeout=self.current_timeout(num_retries-tries_left),
-                    thread_sequence=len(threads))
-                t.start()
-                threads.append(t)
-            for t in threads:
-                t.join()
-            done += thread_limiter.done()
-            loop.save_result((done >= copies, len(threads)))
+                writer_pool.add_task(ks, service_root)
+            writer_pool.join()
+            done += writer_pool.done()
+            loop.save_result((done >= copies, writer_pool.total_task_nr))
 
         if loop.success():
 
         if loop.success():
-            return thread_limiter.response()
+            return writer_pool.response()
         if not roots_map:
             raise arvados.errors.KeepWriteError(
                 "failed to write {}: no Keep services available ({})".format(
         if not roots_map:
             raise arvados.errors.KeepWriteError(
                 "failed to write {}: no Keep services available ({})".format(
@@ -1194,7 +1176,7 @@ class KeepClient(object):
                               if roots_map[key].last_result()['error'])
             raise arvados.errors.KeepWriteError(
                 "failed to write {} (wanted {} copies but wrote {})".format(
                               if roots_map[key].last_result()['error'])
             raise arvados.errors.KeepWriteError(
                 "failed to write {} (wanted {} copies but wrote {})".format(
-                    data_hash, copies, thread_limiter.done()), service_errors, label="service")
+                    data_hash, copies, writer_pool.done()), service_errors, label="service")
 
     def local_store_put(self, data, copies=1, num_retries=None):
         """A stub for put().
 
     def local_store_put(self, data, copies=1, num_retries=None):
         """A stub for put().
@@ -1210,7 +1192,7 @@ class KeepClient(object):
         """
         md5 = hashlib.md5(data).hexdigest()
         locator = '%s+%d' % (md5, len(data))
         """
         md5 = hashlib.md5(data).hexdigest()
         locator = '%s+%d' % (md5, len(data))
-        with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
+        with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
             f.write(data)
         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
                   os.path.join(self.local_store, md5))
             f.write(data)
         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
                   os.path.join(self.local_store, md5))
@@ -1224,8 +1206,8 @@ class KeepClient(object):
             raise arvados.errors.NotFoundError(
                 "Invalid data locator: '%s'" % loc_s)
         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
             raise arvados.errors.NotFoundError(
                 "Invalid data locator: '%s'" % loc_s)
         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
-            return ''
-        with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
+            return b''
+        with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
             return f.read()
 
     def is_cached(self, locator):
             return f.read()
 
     def is_cached(self, locator):