Merge branch 'master' into 13822-nm-delayed-daemon
[arvados.git] / sdk / python / arvados / keep.py
index 17f691306e6b3a83cd3e3fd0f11021900efc737d..71e101cf4c5073d40e78f73c0bf46a9ff231f937 100644 (file)
@@ -1,18 +1,37 @@
-import cStringIO
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import absolute_import
+from __future__ import division
+from future import standard_library
+from future.utils import native_str
+standard_library.install_aliases()
+from builtins import next
+from builtins import str
+from builtins import range
+from builtins import object
+import collections
 import datetime
 import hashlib
 import datetime
 import hashlib
+import io
 import logging
 import math
 import os
 import pycurl
 import logging
 import math
 import os
 import pycurl
-import Queue
+import queue
 import re
 import socket
 import ssl
 import sys
 import threading
 import re
 import socket
 import ssl
 import sys
 import threading
-import timer
-import urlparse
+from . import timer
+import urllib.parse
+
+if sys.version_info >= (3, 0):
+    from io import BytesIO
+else:
+    from cStringIO import StringIO as BytesIO
 
 import arvados
 import arvados.config as config
 
 import arvados
 import arvados.config as config
@@ -59,8 +78,9 @@ class KeepLocator(object):
 
     def __str__(self):
         return '+'.join(
 
     def __str__(self):
         return '+'.join(
-            str(s) for s in [self.md5sum, self.size,
-                             self.permission_hint()] + self.hints
+            native_str(s)
+            for s in [self.md5sum, self.size,
+                      self.permission_hint()] + self.hints
             if s is not None)
 
     def stripped(self):
             if s is not None)
 
     def stripped(self):
@@ -77,7 +97,7 @@ class KeepLocator(object):
             return getattr(self, data_name)
         def setter(self, hex_str):
             if not arvados.util.is_hex(hex_str, length):
             return getattr(self, data_name)
         def setter(self, hex_str):
             if not arvados.util.is_hex(hex_str, length):
-                raise ValueError("{} is not a {}-digit hex string: {}".
+                raise ValueError("{} is not a {}-digit hex string: {!r}".
                                  format(name, length, hex_str))
             setattr(self, data_name, hex_str)
         return property(getter, setter)
                                  format(name, length, hex_str))
             setattr(self, data_name, hex_str)
         return property(getter, setter)
@@ -190,7 +210,7 @@ class KeepBlockCache(object):
             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
             sm = sum([slot.size() for slot in self._cache])
             while len(self._cache) > 0 and sm > self.cache_max:
             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
             sm = sum([slot.size() for slot in self._cache])
             while len(self._cache) > 0 and sm > self.cache_max:
-                for i in xrange(len(self._cache)-1, -1, -1):
+                for i in range(len(self._cache)-1, -1, -1):
                     if self._cache[i].ready.is_set():
                         del self._cache[i]
                         break
                     if self._cache[i].ready.is_set():
                         del self._cache[i]
                         break
@@ -198,7 +218,7 @@ class KeepBlockCache(object):
 
     def _get(self, locator):
         # Test if the locator is already in the cache
 
     def _get(self, locator):
         # Test if the locator is already in the cache
-        for i in xrange(0, len(self._cache)):
+        for i in range(0, len(self._cache)):
             if self._cache[i].locator == locator:
                 n = self._cache[i]
                 if i != 0:
             if self._cache[i].locator == locator:
                 n = self._cache[i]
                 if i != 0:
@@ -269,19 +289,23 @@ class KeepClient(object):
             arvados.errors.HttpError,
         )
 
             arvados.errors.HttpError,
         )
 
-        def __init__(self, root, user_agent_pool=Queue.LifoQueue(),
+        def __init__(self, root, user_agent_pool=queue.LifoQueue(),
                      upload_counter=None,
                      upload_counter=None,
-                     download_counter=None, **headers):
+                     download_counter=None,
+                     headers={},
+                     insecure=False):
             self.root = root
             self._user_agent_pool = user_agent_pool
             self._result = {'error': None}
             self._usable = True
             self._session = None
             self.root = root
             self._user_agent_pool = user_agent_pool
             self._result = {'error': None}
             self._usable = True
             self._session = None
+            self._socket = None
             self.get_headers = {'Accept': 'application/octet-stream'}
             self.get_headers.update(headers)
             self.put_headers = headers
             self.upload_counter = upload_counter
             self.download_counter = download_counter
             self.get_headers = {'Accept': 'application/octet-stream'}
             self.get_headers.update(headers)
             self.put_headers = headers
             self.upload_counter = upload_counter
             self.download_counter = download_counter
+            self.insecure = insecure
 
         def usable(self):
             """Is it worth attempting a request?"""
 
         def usable(self):
             """Is it worth attempting a request?"""
@@ -296,26 +320,39 @@ class KeepClient(object):
 
         def _get_user_agent(self):
             try:
 
         def _get_user_agent(self):
             try:
-                return self._user_agent_pool.get(False)
-            except Queue.Empty:
+                return self._user_agent_pool.get(block=False)
+            except queue.Empty:
                 return pycurl.Curl()
 
         def _put_user_agent(self, ua):
             try:
                 ua.reset()
                 return pycurl.Curl()
 
         def _put_user_agent(self, ua):
             try:
                 ua.reset()
-                self._user_agent_pool.put(ua, False)
+                self._user_agent_pool.put(ua, block=False)
             except:
                 ua.close()
 
             except:
                 ua.close()
 
-        @staticmethod
-        def _socket_open(family, socktype, protocol, address=None):
+        def _socket_open(self, *args, **kwargs):
+            if len(args) + len(kwargs) == 2:
+                return self._socket_open_pycurl_7_21_5(*args, **kwargs)
+            else:
+                return self._socket_open_pycurl_7_19_3(*args, **kwargs)
+
+        def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
+            return self._socket_open_pycurl_7_21_5(
+                purpose=None,
+                address=collections.namedtuple(
+                    'Address', ['family', 'socktype', 'protocol', 'addr'],
+                )(family, socktype, protocol, address))
+
+        def _socket_open_pycurl_7_21_5(self, purpose, address):
             """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
             """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
-            s = socket.socket(family, socktype, protocol)
+            s = socket.socket(address.family, address.socktype, address.protocol)
             s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
             # Will throw invalid protocol error on mac. This test prevents that.
             if hasattr(socket, 'TCP_KEEPIDLE'):
                 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
             s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
             # Will throw invalid protocol error on mac. This test prevents that.
             if hasattr(socket, 'TCP_KEEPIDLE'):
                 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
+            self._socket = s
             return s
 
         def get(self, locator, method="GET", timeout=None):
             return s
 
         def get(self, locator, method="GET", timeout=None):
@@ -327,14 +364,17 @@ class KeepClient(object):
             try:
                 with timer.Timer() as t:
                     self._headers = {}
             try:
                 with timer.Timer() as t:
                     self._headers = {}
-                    response_body = cStringIO.StringIO()
+                    response_body = BytesIO()
                     curl.setopt(pycurl.NOSIGNAL, 1)
                     curl.setopt(pycurl.NOSIGNAL, 1)
-                    curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
+                    curl.setopt(pycurl.OPENSOCKETFUNCTION,
+                                lambda *args, **kwargs: self._socket_open(*args, **kwargs))
                     curl.setopt(pycurl.URL, url.encode('utf-8'))
                     curl.setopt(pycurl.HTTPHEADER, [
                     curl.setopt(pycurl.URL, url.encode('utf-8'))
                     curl.setopt(pycurl.HTTPHEADER, [
-                        '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
+                        '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
+                    if self.insecure:
+                        curl.setopt(pycurl.SSL_VERIFYPEER, 0)
                     if method == "HEAD":
                         curl.setopt(pycurl.NOBODY, True)
                     self._setcurltimeouts(curl, timeout)
                     if method == "HEAD":
                         curl.setopt(pycurl.NOBODY, True)
                     self._setcurltimeouts(curl, timeout)
@@ -343,6 +383,10 @@ class KeepClient(object):
                         curl.perform()
                     except Exception as e:
                         raise arvados.errors.HttpError(0, str(e))
                         curl.perform()
                     except Exception as e:
                         raise arvados.errors.HttpError(0, str(e))
+                    finally:
+                        if self._socket:
+                            self._socket.close()
+                            self._socket = None
                     self._result = {
                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
                         'body': response_body.getvalue(),
                     self._result = {
                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
                         'body': response_body.getvalue(),
@@ -383,7 +427,7 @@ class KeepClient(object):
                          self._result['status_code'],
                          len(self._result['body']),
                          t.msecs,
                          self._result['status_code'],
                          len(self._result['body']),
                          t.msecs,
-                         (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
+                         1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
 
             if self.download_counter:
                 self.download_counter.add(len(self._result['body']))
 
             if self.download_counter:
                 self.download_counter.add(len(self._result['body']))
@@ -404,10 +448,11 @@ class KeepClient(object):
             try:
                 with timer.Timer() as t:
                     self._headers = {}
             try:
                 with timer.Timer() as t:
                     self._headers = {}
-                    body_reader = cStringIO.StringIO(body)
-                    response_body = cStringIO.StringIO()
+                    body_reader = BytesIO(body)
+                    response_body = BytesIO()
                     curl.setopt(pycurl.NOSIGNAL, 1)
                     curl.setopt(pycurl.NOSIGNAL, 1)
-                    curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
+                    curl.setopt(pycurl.OPENSOCKETFUNCTION,
+                                lambda *args, **kwargs: self._socket_open(*args, **kwargs))
                     curl.setopt(pycurl.URL, url.encode('utf-8'))
                     # Using UPLOAD tells cURL to wait for a "go ahead" from the
                     # Keep server (in the form of a HTTP/1.1 "100 Continue"
                     curl.setopt(pycurl.URL, url.encode('utf-8'))
                     # Using UPLOAD tells cURL to wait for a "go ahead" from the
                     # Keep server (in the form of a HTTP/1.1 "100 Continue"
@@ -419,17 +464,23 @@ class KeepClient(object):
                     curl.setopt(pycurl.INFILESIZE, len(body))
                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
                     curl.setopt(pycurl.HTTPHEADER, [
                     curl.setopt(pycurl.INFILESIZE, len(body))
                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
                     curl.setopt(pycurl.HTTPHEADER, [
-                        '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
+                        '{}: {}'.format(k,v) for k,v in self.put_headers.items()])
                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
+                    if self.insecure:
+                        curl.setopt(pycurl.SSL_VERIFYPEER, 0)
                     self._setcurltimeouts(curl, timeout)
                     try:
                         curl.perform()
                     except Exception as e:
                         raise arvados.errors.HttpError(0, str(e))
                     self._setcurltimeouts(curl, timeout)
                     try:
                         curl.perform()
                     except Exception as e:
                         raise arvados.errors.HttpError(0, str(e))
+                    finally:
+                        if self._socket:
+                            self._socket.close()
+                            self._socket = None
                     self._result = {
                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
                     self._result = {
                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
-                        'body': response_body.getvalue(),
+                        'body': response_body.getvalue().decode('utf-8'),
                         'headers': self._headers,
                         'error': False,
                     }
                         'headers': self._headers,
                         'error': False,
                     }
@@ -456,7 +507,7 @@ class KeepClient(object):
                          self._result['status_code'],
                          len(body),
                          t.msecs,
                          self._result['status_code'],
                          len(body),
                          t.msecs,
-                         (len(body)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
+                         1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
             if self.upload_counter:
                 self.upload_counter.add(len(body))
             return True
             if self.upload_counter:
                 self.upload_counter.add(len(body))
             return True
@@ -478,7 +529,8 @@ class KeepClient(object):
             curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
 
         def _headerfunction(self, header_line):
             curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
 
         def _headerfunction(self, header_line):
-            header_line = header_line.decode('iso-8859-1')
+            if isinstance(header_line, bytes):
+                header_line = header_line.decode('iso-8859-1')
             if ':' in header_line:
                 name, value = header_line.split(':', 1)
                 name = name.strip().lower()
             if ':' in header_line:
                 name, value = header_line.split(':', 1)
                 name = name.strip().lower()
@@ -495,30 +547,30 @@ class KeepClient(object):
             self._lastheadername = name
             self._headers[name] = value
             # Returning None implies all bytes were written
             self._lastheadername = name
             self._headers[name] = value
             # Returning None implies all bytes were written
-    
 
 
-    class KeepWriterQueue(Queue.Queue):
+
+    class KeepWriterQueue(queue.Queue):
         def __init__(self, copies):
         def __init__(self, copies):
-            Queue.Queue.__init__(self) # Old-style superclass
+            queue.Queue.__init__(self) # Old-style superclass
             self.wanted_copies = copies
             self.successful_copies = 0
             self.response = None
             self.successful_copies_lock = threading.Lock()
             self.pending_tries = copies
             self.pending_tries_notification = threading.Condition()
             self.wanted_copies = copies
             self.successful_copies = 0
             self.response = None
             self.successful_copies_lock = threading.Lock()
             self.pending_tries = copies
             self.pending_tries_notification = threading.Condition()
-        
+
         def write_success(self, response, replicas_nr):
             with self.successful_copies_lock:
                 self.successful_copies += replicas_nr
                 self.response = response
             with self.pending_tries_notification:
                 self.pending_tries_notification.notify_all()
         def write_success(self, response, replicas_nr):
             with self.successful_copies_lock:
                 self.successful_copies += replicas_nr
                 self.response = response
             with self.pending_tries_notification:
                 self.pending_tries_notification.notify_all()
-        
+
         def write_fail(self, ks):
             with self.pending_tries_notification:
                 self.pending_tries += 1
                 self.pending_tries_notification.notify()
         def write_fail(self, ks):
             with self.pending_tries_notification:
                 self.pending_tries += 1
                 self.pending_tries_notification.notify()
-        
+
         def pending_copies(self):
             with self.successful_copies_lock:
                 return self.wanted_copies - self.successful_copies
         def pending_copies(self):
             with self.successful_copies_lock:
                 return self.wanted_copies - self.successful_copies
@@ -527,6 +579,13 @@ class KeepClient(object):
             with self.pending_tries_notification:
                 while True:
                     if self.pending_copies() < 1:
             with self.pending_tries_notification:
                 while True:
                     if self.pending_copies() < 1:
+                        # This notify_all() is unnecessary --
+                        # write_success() already called notify_all()
+                        # when pending<1 became true, so it's not
+                        # possible for any other thread to be in
+                        # wait() now -- but it's cheap insurance
+                        # against deadlock so we do it anyway:
+                        self.pending_tries_notification.notify_all()
                         # Drain the queue and then raise Queue.Empty
                         while True:
                             self.get_nowait()
                         # Drain the queue and then raise Queue.Empty
                         while True:
                             self.get_nowait()
@@ -540,7 +599,7 @@ class KeepClient(object):
                         return service, service_root
                     elif self.empty():
                         self.pending_tries_notification.notify_all()
                         return service, service_root
                     elif self.empty():
                         self.pending_tries_notification.notify_all()
-                        raise Queue.Empty
+                        raise queue.Empty
                     else:
                         self.pending_tries_notification.wait()
 
                     else:
                         self.pending_tries_notification.wait()
 
@@ -552,7 +611,7 @@ class KeepClient(object):
             if (not max_service_replicas) or (max_service_replicas >= copies):
                 num_threads = 1
             else:
             if (not max_service_replicas) or (max_service_replicas >= copies):
                 num_threads = 1
             else:
-                num_threads = int(math.ceil(float(copies) / max_service_replicas))
+                num_threads = int(math.ceil(1.0*copies/max_service_replicas))
             _logger.debug("Pool max threads is %d", num_threads)
             self.workers = []
             self.queue = KeepClient.KeepWriterQueue(copies)
             _logger.debug("Pool max threads is %d", num_threads)
             self.workers = []
             self.queue = KeepClient.KeepWriterQueue(copies)
@@ -560,25 +619,25 @@ class KeepClient(object):
             for _ in range(num_threads):
                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
                 self.workers.append(w)
             for _ in range(num_threads):
                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
                 self.workers.append(w)
-        
+
         def add_task(self, ks, service_root):
             self.queue.put((ks, service_root))
             self.total_task_nr += 1
         def add_task(self, ks, service_root):
             self.queue.put((ks, service_root))
             self.total_task_nr += 1
-        
+
         def done(self):
             return self.queue.successful_copies
         def done(self):
             return self.queue.successful_copies
-        
+
         def join(self):
             # Start workers
             for worker in self.workers:
                 worker.start()
             # Wait for finished work
             self.queue.join()
         def join(self):
             # Start workers
             for worker in self.workers:
                 worker.start()
             # Wait for finished work
             self.queue.join()
-        
+
         def response(self):
             return self.queue.response
         def response(self):
             return self.queue.response
-    
-    
+
+
     class KeepWriterThread(threading.Thread):
         TaskFailed = RuntimeError()
 
     class KeepWriterThread(threading.Thread):
         TaskFailed = RuntimeError()
 
@@ -594,12 +653,12 @@ class KeepClient(object):
             while True:
                 try:
                     service, service_root = self.queue.get_next_task()
             while True:
                 try:
                     service, service_root = self.queue.get_next_task()
-                except Queue.Empty:
+                except queue.Empty:
                     return
                 try:
                     locator, copies = self.do_task(service, service_root)
                 except Exception as e:
                     return
                 try:
                     locator, copies = self.do_task(service, service_root)
                 except Exception as e:
-                    if e != self.TaskFailed:
+                    if e is not self.TaskFailed:
                         _logger.exception("Exception in KeepWriterThread")
                     self.queue.write_fail(service)
                 else:
                         _logger.exception("Exception in KeepWriterThread")
                     self.queue.write_fail(service)
                 else:
@@ -709,10 +768,15 @@ class KeepClient(object):
         if local_store is None:
             local_store = os.environ.get('KEEP_LOCAL_STORE')
 
         if local_store is None:
             local_store = os.environ.get('KEEP_LOCAL_STORE')
 
+        if api_client is None:
+            self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
+        else:
+            self.insecure = api_client.insecure
+
         self.block_cache = block_cache if block_cache else KeepBlockCache()
         self.timeout = timeout
         self.proxy_timeout = proxy_timeout
         self.block_cache = block_cache if block_cache else KeepBlockCache()
         self.timeout = timeout
         self.proxy_timeout = proxy_timeout
-        self._user_agent_pool = Queue.LifoQueue()
+        self._user_agent_pool = queue.LifoQueue()
         self.upload_counter = Counter()
         self.download_counter = Counter()
         self.put_counter = Counter()
         self.upload_counter = Counter()
         self.download_counter = Counter()
         self.put_counter = Counter()
@@ -733,7 +797,7 @@ class KeepClient(object):
                     if not proxy_uris[i].endswith('/'):
                         proxy_uris[i] += '/'
                     # URL validation
                     if not proxy_uris[i].endswith('/'):
                         proxy_uris[i] += '/'
                     # URL validation
-                    url = urlparse.urlparse(proxy_uris[i])
+                    url = urllib.parse.urlparse(proxy_uris[i])
                     if not (url.scheme and url.netloc):
                         raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
                 self.api_token = api_token
                     if not (url.scheme and url.netloc):
                         raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
                 self.api_token = api_token
@@ -799,7 +863,7 @@ class KeepClient(object):
                 raise arvados.errors.NoKeepServersError()
 
             # Precompute the base URI for each service.
                 raise arvados.errors.NoKeepServersError()
 
             # Precompute the base URI for each service.
-            for r in self._gateway_services.itervalues():
+            for r in self._gateway_services.values():
                 host = r['service_host']
                 if not host.startswith('[') and host.find(':') >= 0:
                     # IPv6 URIs must be formatted like http://[::1]:80/...
                 host = r['service_host']
                 if not host.startswith('[') and host.find(':') >= 0:
                     # IPv6 URIs must be formatted like http://[::1]:80/...
@@ -811,7 +875,7 @@ class KeepClient(object):
 
             _logger.debug(str(self._gateway_services))
             self._keep_services = [
 
             _logger.debug(str(self._gateway_services))
             self._keep_services = [
-                ks for ks in self._gateway_services.itervalues()
+                ks for ks in self._gateway_services.values()
                 if not ks.get('service_type', '').startswith('gateway:')]
             self._writable_services = [ks for ks in self._keep_services
                                        if not ks.get('read_only')]
                 if not ks.get('service_type', '').startswith('gateway:')]
             self._writable_services = [ks for ks in self._keep_services
                                        if not ks.get('read_only')]
@@ -830,7 +894,7 @@ class KeepClient(object):
         The weight is md5(h + u) where u is the last 15 characters of
         the service endpoint's UUID.
         """
         The weight is md5(h + u) where u is the last 15 characters of
         the service endpoint's UUID.
         """
-        return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
+        return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
 
     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
         """Return an array of Keep service endpoints, in the order in
 
     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
         """Return an array of Keep service endpoints, in the order in
@@ -868,7 +932,7 @@ class KeepClient(object):
         _logger.debug("{}: {}".format(locator, sorted_roots))
         return sorted_roots
 
         _logger.debug("{}: {}".format(locator, sorted_roots))
         return sorted_roots
 
-    def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
+    def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
         # roots_map is a dictionary, mapping Keep service root strings
         # to KeepService objects.  Poll for Keep services, and add any
         # new ones to roots_map.  Return the current list of local
         # roots_map is a dictionary, mapping Keep service root strings
         # to KeepService objects.  Poll for Keep services, and add any
         # new ones to roots_map.  Return the current list of local
@@ -881,7 +945,8 @@ class KeepClient(object):
                     root, self._user_agent_pool,
                     upload_counter=self.upload_counter,
                     download_counter=self.download_counter,
                     root, self._user_agent_pool,
                     upload_counter=self.upload_counter,
                     download_counter=self.download_counter,
-                    **headers)
+                    headers=headers,
+                    insecure=self.insecure)
         return local_roots
 
     @staticmethod
         return local_roots
 
     @staticmethod
@@ -911,14 +976,14 @@ class KeepClient(object):
             return None
 
     @retry.retry_method
             return None
 
     @retry.retry_method
-    def head(self, loc_s, num_retries=None):
-        return self._get_or_head(loc_s, method="HEAD", num_retries=num_retries)
+    def head(self, loc_s, **kwargs):
+        return self._get_or_head(loc_s, method="HEAD", **kwargs)
 
     @retry.retry_method
 
     @retry.retry_method
-    def get(self, loc_s, num_retries=None):
-        return self._get_or_head(loc_s, method="GET", num_retries=num_retries)
+    def get(self, loc_s, **kwargs):
+        return self._get_or_head(loc_s, method="GET", **kwargs)
 
 
-    def _get_or_head(self, loc_s, method="GET", num_retries=None):
+    def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None):
         """Get data from Keep.
 
         This method fetches one or more blocks of data from Keep.  It
         """Get data from Keep.
 
         This method fetches one or more blocks of data from Keep.  It
@@ -943,76 +1008,91 @@ class KeepClient(object):
 
         self.get_counter.add(1)
 
 
         self.get_counter.add(1)
 
-        locator = KeepLocator(loc_s)
-        if method == "GET":
-            slot, first = self.block_cache.reserve_cache(locator.md5sum)
-            if not first:
-                self.hits_counter.add(1)
-                v = slot.get()
-                return v
-
-        self.misses_counter.add(1)
-
-        # If the locator has hints specifying a prefix (indicating a
-        # remote keepproxy) or the UUID of a local gateway service,
-        # read data from the indicated service(s) instead of the usual
-        # list of local disk services.
-        hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
-                      for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
-        hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
-                           for hint in locator.hints if (
-                                   hint.startswith('K@') and
-                                   len(hint) == 29 and
-                                   self._gateway_services.get(hint[2:])
-                                   )])
-        # Map root URLs to their KeepService objects.
-        roots_map = {
-            root: self.KeepService(root, self._user_agent_pool,
-                                   upload_counter=self.upload_counter,
-                                   download_counter=self.download_counter)
-            for root in hint_roots
-        }
-
-        # See #3147 for a discussion of the loop implementation.  Highlights:
-        # * Refresh the list of Keep services after each failure, in case
-        #   it's being updated.
-        # * Retry until we succeed, we're out of retries, or every available
-        #   service has returned permanent failure.
-        sorted_roots = []
-        roots_map = {}
+        slot = None
         blob = None
         blob = None
-        loop = retry.RetryLoop(num_retries, self._check_loop_result,
-                               backoff_start=2)
-        for tries_left in loop:
-            try:
-                sorted_roots = self.map_new_services(
-                    roots_map, locator,
-                    force_rebuild=(tries_left < num_retries),
-                    need_writable=False)
-            except Exception as error:
-                loop.save_result(error)
-                continue
+        try:
+            locator = KeepLocator(loc_s)
+            if method == "GET":
+                slot, first = self.block_cache.reserve_cache(locator.md5sum)
+                if not first:
+                    self.hits_counter.add(1)
+                    blob = slot.get()
+                    if blob is None:
+                        raise arvados.errors.KeepReadError(
+                            "failed to read {}".format(loc_s))
+                    return blob
+
+            self.misses_counter.add(1)
+
+            headers = {
+                'X-Request-Id': (request_id or
+                                 (hasattr(self, 'api_client') and self.api_client.request_id) or
+                                 arvados.util.new_request_id()),
+            }
+
+            # If the locator has hints specifying a prefix (indicating a
+            # remote keepproxy) or the UUID of a local gateway service,
+            # read data from the indicated service(s) instead of the usual
+            # list of local disk services.
+            hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
+                          for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
+            hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
+                               for hint in locator.hints if (
+                                       hint.startswith('K@') and
+                                       len(hint) == 29 and
+                                       self._gateway_services.get(hint[2:])
+                                       )])
+            # Map root URLs to their KeepService objects.
+            roots_map = {
+                root: self.KeepService(root, self._user_agent_pool,
+                                       upload_counter=self.upload_counter,
+                                       download_counter=self.download_counter,
+                                       headers=headers,
+                                       insecure=self.insecure)
+                for root in hint_roots
+            }
+
+            # See #3147 for a discussion of the loop implementation.  Highlights:
+            # * Refresh the list of Keep services after each failure, in case
+            #   it's being updated.
+            # * Retry until we succeed, we're out of retries, or every available
+            #   service has returned permanent failure.
+            sorted_roots = []
+            roots_map = {}
+            loop = retry.RetryLoop(num_retries, self._check_loop_result,
+                                   backoff_start=2)
+            for tries_left in loop:
+                try:
+                    sorted_roots = self.map_new_services(
+                        roots_map, locator,
+                        force_rebuild=(tries_left < num_retries),
+                        need_writable=False,
+                        headers=headers)
+                except Exception as error:
+                    loop.save_result(error)
+                    continue
 
 
-            # Query KeepService objects that haven't returned
-            # permanent failure, in our specified shuffle order.
-            services_to_try = [roots_map[root]
-                               for root in sorted_roots
-                               if roots_map[root].usable()]
-            for keep_service in services_to_try:
-                blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
-                if blob is not None:
-                    break
-            loop.save_result((blob, len(services_to_try)))
-
-        # Always cache the result, then return it if we succeeded.
-        if method == "GET":
-            slot.set(blob)
-            self.block_cache.cap_cache()
-        if loop.success():
-            if method == "HEAD":
-                return True
-            else:
-                return blob
+                # Query KeepService objects that haven't returned
+                # permanent failure, in our specified shuffle order.
+                services_to_try = [roots_map[root]
+                                   for root in sorted_roots
+                                   if roots_map[root].usable()]
+                for keep_service in services_to_try:
+                    blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
+                    if blob is not None:
+                        break
+                loop.save_result((blob, len(services_to_try)))
+
+            # Always cache the result, then return it if we succeeded.
+            if loop.success():
+                if method == "HEAD":
+                    return True
+                else:
+                    return blob
+        finally:
+            if slot is not None:
+                slot.set(blob)
+                self.block_cache.cap_cache()
 
         # Q: Including 403 is necessary for the Keep tests to continue
         # passing, but maybe they should expect KeepReadError instead?
 
         # Q: Including 403 is necessary for the Keep tests to continue
         # passing, but maybe they should expect KeepReadError instead?
@@ -1032,7 +1112,7 @@ class KeepClient(object):
                 "failed to read {}".format(loc_s), service_errors, label="service")
 
     @retry.retry_method
                 "failed to read {}".format(loc_s), service_errors, label="service")
 
     @retry.retry_method
-    def put(self, data, copies=2, num_retries=None):
+    def put(self, data, copies=2, num_retries=None, request_id=None):
         """Save data in Keep.
 
         This method will get a list of Keep services from the API server, and
         """Save data in Keep.
 
         This method will get a list of Keep services from the API server, and
@@ -1051,10 +1131,8 @@ class KeepClient(object):
           KeepClient is initialized.
         """
 
           KeepClient is initialized.
         """
 
-        if isinstance(data, unicode):
-            data = data.encode("ascii")
-        elif not isinstance(data, str):
-            raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
+        if not isinstance(data, bytes):
+            data = data.encode()
 
         self.put_counter.add(1)
 
 
         self.put_counter.add(1)
 
@@ -1064,9 +1142,12 @@ class KeepClient(object):
             return loc_s
         locator = KeepLocator(loc_s)
 
             return loc_s
         locator = KeepLocator(loc_s)
 
-        headers = {}
-        # Tell the proxy how many copies we want it to store
-        headers['X-Keep-Desired-Replicas'] = str(copies)
+        headers = {
+            'X-Request-Id': (request_id or
+                             (hasattr(self, 'api_client') and self.api_client.request_id) or
+                             arvados.util.new_request_id()),
+            'X-Keep-Desired-Replicas': str(copies),
+        }
         roots_map = {}
         loop = retry.RetryLoop(num_retries, self._check_loop_result,
                                backoff_start=2)
         roots_map = {}
         loop = retry.RetryLoop(num_retries, self._check_loop_result,
                                backoff_start=2)
@@ -1075,12 +1156,14 @@ class KeepClient(object):
             try:
                 sorted_roots = self.map_new_services(
                     roots_map, locator,
             try:
                 sorted_roots = self.map_new_services(
                     roots_map, locator,
-                    force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
+                    force_rebuild=(tries_left < num_retries),
+                    need_writable=True,
+                    headers=headers)
             except Exception as error:
                 loop.save_result(error)
                 continue
 
             except Exception as error:
                 loop.save_result(error)
                 continue
 
-            writer_pool = KeepClient.KeepWriterThreadPool(data=data, 
+            writer_pool = KeepClient.KeepWriterThreadPool(data=data,
                                                         data_hash=data_hash,
                                                         copies=copies - done,
                                                         max_service_replicas=self.max_replicas_per_service,
                                                         data_hash=data_hash,
                                                         copies=copies - done,
                                                         max_service_replicas=self.max_replicas_per_service,
@@ -1122,7 +1205,7 @@ class KeepClient(object):
         """
         md5 = hashlib.md5(data).hexdigest()
         locator = '%s+%d' % (md5, len(data))
         """
         md5 = hashlib.md5(data).hexdigest()
         locator = '%s+%d' % (md5, len(data))
-        with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
+        with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
             f.write(data)
         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
                   os.path.join(self.local_store, md5))
             f.write(data)
         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
                   os.path.join(self.local_store, md5))
@@ -1136,8 +1219,8 @@ class KeepClient(object):
             raise arvados.errors.NotFoundError(
                 "Invalid data locator: '%s'" % loc_s)
         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
             raise arvados.errors.NotFoundError(
                 "Invalid data locator: '%s'" % loc_s)
         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
-            return ''
-        with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
+            return b''
+        with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
             return f.read()
 
     def is_cached(self, locator):
             return f.read()
 
     def is_cached(self, locator):