Merge branch 'patch-1' of https://github.com/mr-c/arvados into mr-c-patch-1
[arvados.git] / sdk / python / arvados / keep.py
index 5b4770c4d0dca8824c268448296d0658c8ba04d8..bc43b849c3a01dd661c4ba080d83f65f597adde6 100644 (file)
@@ -1,19 +1,37 @@
-import cStringIO
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import absolute_import
+from __future__ import division
+from future import standard_library
+from future.utils import native_str
+standard_library.install_aliases()
+from builtins import next
+from builtins import str
+from builtins import range
+from builtins import object
 import collections
 import datetime
 import hashlib
 import collections
 import datetime
 import hashlib
+import io
 import logging
 import math
 import os
 import pycurl
 import logging
 import math
 import os
 import pycurl
-import Queue
+import queue
 import re
 import socket
 import ssl
 import sys
 import threading
 import re
 import socket
 import ssl
 import sys
 import threading
-import timer
-import urlparse
+from . import timer
+import urllib.parse
+
+if sys.version_info >= (3, 0):
+    from io import BytesIO
+else:
+    from cStringIO import StringIO as BytesIO
 
 import arvados
 import arvados.config as config
 
 import arvados
 import arvados.config as config
@@ -60,8 +78,9 @@ class KeepLocator(object):
 
     def __str__(self):
         return '+'.join(
 
     def __str__(self):
         return '+'.join(
-            str(s) for s in [self.md5sum, self.size,
-                             self.permission_hint()] + self.hints
+            native_str(s)
+            for s in [self.md5sum, self.size,
+                      self.permission_hint()] + self.hints
             if s is not None)
 
     def stripped(self):
             if s is not None)
 
     def stripped(self):
@@ -78,7 +97,7 @@ class KeepLocator(object):
             return getattr(self, data_name)
         def setter(self, hex_str):
             if not arvados.util.is_hex(hex_str, length):
             return getattr(self, data_name)
         def setter(self, hex_str):
             if not arvados.util.is_hex(hex_str, length):
-                raise ValueError("{} is not a {}-digit hex string: {}".
+                raise ValueError("{} is not a {}-digit hex string: {!r}".
                                  format(name, length, hex_str))
             setattr(self, data_name, hex_str)
         return property(getter, setter)
                                  format(name, length, hex_str))
             setattr(self, data_name, hex_str)
         return property(getter, setter)
@@ -191,7 +210,7 @@ class KeepBlockCache(object):
             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
             sm = sum([slot.size() for slot in self._cache])
             while len(self._cache) > 0 and sm > self.cache_max:
             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
             sm = sum([slot.size() for slot in self._cache])
             while len(self._cache) > 0 and sm > self.cache_max:
-                for i in xrange(len(self._cache)-1, -1, -1):
+                for i in range(len(self._cache)-1, -1, -1):
                     if self._cache[i].ready.is_set():
                         del self._cache[i]
                         break
                     if self._cache[i].ready.is_set():
                         del self._cache[i]
                         break
@@ -199,7 +218,7 @@ class KeepBlockCache(object):
 
     def _get(self, locator):
         # Test if the locator is already in the cache
 
     def _get(self, locator):
         # Test if the locator is already in the cache
-        for i in xrange(0, len(self._cache)):
+        for i in range(0, len(self._cache)):
             if self._cache[i].locator == locator:
                 n = self._cache[i]
                 if i != 0:
             if self._cache[i].locator == locator:
                 n = self._cache[i]
                 if i != 0:
@@ -270,19 +289,23 @@ class KeepClient(object):
             arvados.errors.HttpError,
         )
 
             arvados.errors.HttpError,
         )
 
-        def __init__(self, root, user_agent_pool=Queue.LifoQueue(),
+        def __init__(self, root, user_agent_pool=queue.LifoQueue(),
                      upload_counter=None,
                      upload_counter=None,
-                     download_counter=None, **headers):
+                     download_counter=None,
+                     headers={},
+                     insecure=False):
             self.root = root
             self._user_agent_pool = user_agent_pool
             self._result = {'error': None}
             self._usable = True
             self._session = None
             self.root = root
             self._user_agent_pool = user_agent_pool
             self._result = {'error': None}
             self._usable = True
             self._session = None
+            self._socket = None
             self.get_headers = {'Accept': 'application/octet-stream'}
             self.get_headers.update(headers)
             self.put_headers = headers
             self.upload_counter = upload_counter
             self.download_counter = download_counter
             self.get_headers = {'Accept': 'application/octet-stream'}
             self.get_headers.update(headers)
             self.put_headers = headers
             self.upload_counter = upload_counter
             self.download_counter = download_counter
+            self.insecure = insecure
 
         def usable(self):
             """Is it worth attempting a request?"""
 
         def usable(self):
             """Is it worth attempting a request?"""
@@ -298,7 +321,7 @@ class KeepClient(object):
         def _get_user_agent(self):
             try:
                 return self._user_agent_pool.get(block=False)
         def _get_user_agent(self):
             try:
                 return self._user_agent_pool.get(block=False)
-            except Queue.Empty:
+            except queue.Empty:
                 return pycurl.Curl()
 
         def _put_user_agent(self, ua):
                 return pycurl.Curl()
 
         def _put_user_agent(self, ua):
@@ -329,6 +352,7 @@ class KeepClient(object):
             if hasattr(socket, 'TCP_KEEPIDLE'):
                 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
             if hasattr(socket, 'TCP_KEEPIDLE'):
                 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
+            self._socket = s
             return s
 
         def get(self, locator, method="GET", timeout=None):
             return s
 
         def get(self, locator, method="GET", timeout=None):
@@ -340,22 +364,31 @@ class KeepClient(object):
             try:
                 with timer.Timer() as t:
                     self._headers = {}
             try:
                 with timer.Timer() as t:
                     self._headers = {}
-                    response_body = cStringIO.StringIO()
+                    response_body = BytesIO()
                     curl.setopt(pycurl.NOSIGNAL, 1)
                     curl.setopt(pycurl.NOSIGNAL, 1)
-                    curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
+                    curl.setopt(pycurl.OPENSOCKETFUNCTION,
+                                lambda *args, **kwargs: self._socket_open(*args, **kwargs))
                     curl.setopt(pycurl.URL, url.encode('utf-8'))
                     curl.setopt(pycurl.HTTPHEADER, [
                     curl.setopt(pycurl.URL, url.encode('utf-8'))
                     curl.setopt(pycurl.HTTPHEADER, [
-                        '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
+                        '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
+                    if self.insecure:
+                        curl.setopt(pycurl.SSL_VERIFYPEER, 0)
+                    else:
+                        curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
                     if method == "HEAD":
                         curl.setopt(pycurl.NOBODY, True)
                     if method == "HEAD":
                         curl.setopt(pycurl.NOBODY, True)
-                    self._setcurltimeouts(curl, timeout)
+                    self._setcurltimeouts(curl, timeout, method=="HEAD")
 
                     try:
                         curl.perform()
                     except Exception as e:
                         raise arvados.errors.HttpError(0, str(e))
 
                     try:
                         curl.perform()
                     except Exception as e:
                         raise arvados.errors.HttpError(0, str(e))
+                    finally:
+                        if self._socket:
+                            self._socket.close()
+                            self._socket = None
                     self._result = {
                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
                         'body': response_body.getvalue(),
                     self._result = {
                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
                         'body': response_body.getvalue(),
@@ -390,13 +423,17 @@ class KeepClient(object):
                 _logger.info("HEAD %s: %s bytes",
                          self._result['status_code'],
                          self._result.get('content-length'))
                 _logger.info("HEAD %s: %s bytes",
                          self._result['status_code'],
                          self._result.get('content-length'))
+                if self._result['headers'].get('x-keep-locator'):
+                    # This is a response to a remote block copy request, return
+                    # the local copy block locator.
+                    return self._result['headers'].get('x-keep-locator')
                 return True
 
             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
                          self._result['status_code'],
                          len(self._result['body']),
                          t.msecs,
                 return True
 
             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
                          self._result['status_code'],
                          len(self._result['body']),
                          t.msecs,
-                         (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
+                         1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
 
             if self.download_counter:
                 self.download_counter.add(len(self._result['body']))
 
             if self.download_counter:
                 self.download_counter.add(len(self._result['body']))
@@ -417,10 +454,11 @@ class KeepClient(object):
             try:
                 with timer.Timer() as t:
                     self._headers = {}
             try:
                 with timer.Timer() as t:
                     self._headers = {}
-                    body_reader = cStringIO.StringIO(body)
-                    response_body = cStringIO.StringIO()
+                    body_reader = BytesIO(body)
+                    response_body = BytesIO()
                     curl.setopt(pycurl.NOSIGNAL, 1)
                     curl.setopt(pycurl.NOSIGNAL, 1)
-                    curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
+                    curl.setopt(pycurl.OPENSOCKETFUNCTION,
+                                lambda *args, **kwargs: self._socket_open(*args, **kwargs))
                     curl.setopt(pycurl.URL, url.encode('utf-8'))
                     # Using UPLOAD tells cURL to wait for a "go ahead" from the
                     # Keep server (in the form of a HTTP/1.1 "100 Continue"
                     curl.setopt(pycurl.URL, url.encode('utf-8'))
                     # Using UPLOAD tells cURL to wait for a "go ahead" from the
                     # Keep server (in the form of a HTTP/1.1 "100 Continue"
@@ -432,17 +470,25 @@ class KeepClient(object):
                     curl.setopt(pycurl.INFILESIZE, len(body))
                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
                     curl.setopt(pycurl.HTTPHEADER, [
                     curl.setopt(pycurl.INFILESIZE, len(body))
                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
                     curl.setopt(pycurl.HTTPHEADER, [
-                        '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
+                        '{}: {}'.format(k,v) for k,v in self.put_headers.items()])
                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
+                    if self.insecure:
+                        curl.setopt(pycurl.SSL_VERIFYPEER, 0)
+                    else:
+                        curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
                     self._setcurltimeouts(curl, timeout)
                     try:
                         curl.perform()
                     except Exception as e:
                         raise arvados.errors.HttpError(0, str(e))
                     self._setcurltimeouts(curl, timeout)
                     try:
                         curl.perform()
                     except Exception as e:
                         raise arvados.errors.HttpError(0, str(e))
+                    finally:
+                        if self._socket:
+                            self._socket.close()
+                            self._socket = None
                     self._result = {
                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
                     self._result = {
                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
-                        'body': response_body.getvalue(),
+                        'body': response_body.getvalue().decode('utf-8'),
                         'headers': self._headers,
                         'error': False,
                     }
                         'headers': self._headers,
                         'error': False,
                     }
@@ -469,12 +515,12 @@ class KeepClient(object):
                          self._result['status_code'],
                          len(body),
                          t.msecs,
                          self._result['status_code'],
                          len(body),
                          t.msecs,
-                         (len(body)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
+                         1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
             if self.upload_counter:
                 self.upload_counter.add(len(body))
             return True
 
             if self.upload_counter:
                 self.upload_counter.add(len(body))
             return True
 
-        def _setcurltimeouts(self, curl, timeouts):
+        def _setcurltimeouts(self, curl, timeouts, ignore_bandwidth=False):
             if not timeouts:
                 return
             elif isinstance(timeouts, tuple):
             if not timeouts:
                 return
             elif isinstance(timeouts, tuple):
@@ -487,11 +533,13 @@ class KeepClient(object):
                 conn_t, xfer_t = (timeouts, timeouts)
                 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
                 conn_t, xfer_t = (timeouts, timeouts)
                 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
-            curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
-            curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
+            if not ignore_bandwidth:
+                curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
+                curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
 
         def _headerfunction(self, header_line):
 
         def _headerfunction(self, header_line):
-            header_line = header_line.decode('iso-8859-1')
+            if isinstance(header_line, bytes):
+                header_line = header_line.decode('iso-8859-1')
             if ':' in header_line:
                 name, value = header_line.split(':', 1)
                 name = name.strip().lower()
             if ':' in header_line:
                 name, value = header_line.split(':', 1)
                 name = name.strip().lower()
@@ -508,30 +556,30 @@ class KeepClient(object):
             self._lastheadername = name
             self._headers[name] = value
             # Returning None implies all bytes were written
             self._lastheadername = name
             self._headers[name] = value
             # Returning None implies all bytes were written
-    
 
 
-    class KeepWriterQueue(Queue.Queue):
+
+    class KeepWriterQueue(queue.Queue):
         def __init__(self, copies):
         def __init__(self, copies):
-            Queue.Queue.__init__(self) # Old-style superclass
+            queue.Queue.__init__(self) # Old-style superclass
             self.wanted_copies = copies
             self.successful_copies = 0
             self.response = None
             self.successful_copies_lock = threading.Lock()
             self.pending_tries = copies
             self.pending_tries_notification = threading.Condition()
             self.wanted_copies = copies
             self.successful_copies = 0
             self.response = None
             self.successful_copies_lock = threading.Lock()
             self.pending_tries = copies
             self.pending_tries_notification = threading.Condition()
-        
+
         def write_success(self, response, replicas_nr):
             with self.successful_copies_lock:
                 self.successful_copies += replicas_nr
                 self.response = response
             with self.pending_tries_notification:
                 self.pending_tries_notification.notify_all()
         def write_success(self, response, replicas_nr):
             with self.successful_copies_lock:
                 self.successful_copies += replicas_nr
                 self.response = response
             with self.pending_tries_notification:
                 self.pending_tries_notification.notify_all()
-        
+
         def write_fail(self, ks):
             with self.pending_tries_notification:
                 self.pending_tries += 1
                 self.pending_tries_notification.notify()
         def write_fail(self, ks):
             with self.pending_tries_notification:
                 self.pending_tries += 1
                 self.pending_tries_notification.notify()
-        
+
         def pending_copies(self):
             with self.successful_copies_lock:
                 return self.wanted_copies - self.successful_copies
         def pending_copies(self):
             with self.successful_copies_lock:
                 return self.wanted_copies - self.successful_copies
@@ -560,7 +608,7 @@ class KeepClient(object):
                         return service, service_root
                     elif self.empty():
                         self.pending_tries_notification.notify_all()
                         return service, service_root
                     elif self.empty():
                         self.pending_tries_notification.notify_all()
-                        raise Queue.Empty
+                        raise queue.Empty
                     else:
                         self.pending_tries_notification.wait()
 
                     else:
                         self.pending_tries_notification.wait()
 
@@ -572,7 +620,7 @@ class KeepClient(object):
             if (not max_service_replicas) or (max_service_replicas >= copies):
                 num_threads = 1
             else:
             if (not max_service_replicas) or (max_service_replicas >= copies):
                 num_threads = 1
             else:
-                num_threads = int(math.ceil(float(copies) / max_service_replicas))
+                num_threads = int(math.ceil(1.0*copies/max_service_replicas))
             _logger.debug("Pool max threads is %d", num_threads)
             self.workers = []
             self.queue = KeepClient.KeepWriterQueue(copies)
             _logger.debug("Pool max threads is %d", num_threads)
             self.workers = []
             self.queue = KeepClient.KeepWriterQueue(copies)
@@ -580,25 +628,25 @@ class KeepClient(object):
             for _ in range(num_threads):
                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
                 self.workers.append(w)
             for _ in range(num_threads):
                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
                 self.workers.append(w)
-        
+
         def add_task(self, ks, service_root):
             self.queue.put((ks, service_root))
             self.total_task_nr += 1
         def add_task(self, ks, service_root):
             self.queue.put((ks, service_root))
             self.total_task_nr += 1
-        
+
         def done(self):
             return self.queue.successful_copies
         def done(self):
             return self.queue.successful_copies
-        
+
         def join(self):
             # Start workers
             for worker in self.workers:
                 worker.start()
             # Wait for finished work
             self.queue.join()
         def join(self):
             # Start workers
             for worker in self.workers:
                 worker.start()
             # Wait for finished work
             self.queue.join()
-        
+
         def response(self):
             return self.queue.response
         def response(self):
             return self.queue.response
-    
-    
+
+
     class KeepWriterThread(threading.Thread):
         TaskFailed = RuntimeError()
 
     class KeepWriterThread(threading.Thread):
         TaskFailed = RuntimeError()
 
@@ -614,7 +662,7 @@ class KeepClient(object):
             while True:
                 try:
                     service, service_root = self.queue.get_next_task()
             while True:
                 try:
                     service, service_root = self.queue.get_next_task()
-                except Queue.Empty:
+                except queue.Empty:
                     return
                 try:
                     locator, copies = self.do_task(service, service_root)
                     return
                 try:
                     locator, copies = self.do_task(service, service_root)
@@ -729,10 +777,15 @@ class KeepClient(object):
         if local_store is None:
             local_store = os.environ.get('KEEP_LOCAL_STORE')
 
         if local_store is None:
             local_store = os.environ.get('KEEP_LOCAL_STORE')
 
+        if api_client is None:
+            self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
+        else:
+            self.insecure = api_client.insecure
+
         self.block_cache = block_cache if block_cache else KeepBlockCache()
         self.timeout = timeout
         self.proxy_timeout = proxy_timeout
         self.block_cache = block_cache if block_cache else KeepBlockCache()
         self.timeout = timeout
         self.proxy_timeout = proxy_timeout
-        self._user_agent_pool = Queue.LifoQueue()
+        self._user_agent_pool = queue.LifoQueue()
         self.upload_counter = Counter()
         self.download_counter = Counter()
         self.put_counter = Counter()
         self.upload_counter = Counter()
         self.download_counter = Counter()
         self.put_counter = Counter()
@@ -742,6 +795,7 @@ class KeepClient(object):
 
         if local_store:
             self.local_store = local_store
 
         if local_store:
             self.local_store = local_store
+            self.head = self.local_store_head
             self.get = self.local_store_get
             self.put = self.local_store_put
         else:
             self.get = self.local_store_get
             self.put = self.local_store_put
         else:
@@ -753,7 +807,7 @@ class KeepClient(object):
                     if not proxy_uris[i].endswith('/'):
                         proxy_uris[i] += '/'
                     # URL validation
                     if not proxy_uris[i].endswith('/'):
                         proxy_uris[i] += '/'
                     # URL validation
-                    url = urlparse.urlparse(proxy_uris[i])
+                    url = urllib.parse.urlparse(proxy_uris[i])
                     if not (url.scheme and url.netloc):
                         raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
                 self.api_token = api_token
                     if not (url.scheme and url.netloc):
                         raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
                 self.api_token = api_token
@@ -819,7 +873,7 @@ class KeepClient(object):
                 raise arvados.errors.NoKeepServersError()
 
             # Precompute the base URI for each service.
                 raise arvados.errors.NoKeepServersError()
 
             # Precompute the base URI for each service.
-            for r in self._gateway_services.itervalues():
+            for r in self._gateway_services.values():
                 host = r['service_host']
                 if not host.startswith('[') and host.find(':') >= 0:
                     # IPv6 URIs must be formatted like http://[::1]:80/...
                 host = r['service_host']
                 if not host.startswith('[') and host.find(':') >= 0:
                     # IPv6 URIs must be formatted like http://[::1]:80/...
@@ -831,7 +885,7 @@ class KeepClient(object):
 
             _logger.debug(str(self._gateway_services))
             self._keep_services = [
 
             _logger.debug(str(self._gateway_services))
             self._keep_services = [
-                ks for ks in self._gateway_services.itervalues()
+                ks for ks in self._gateway_services.values()
                 if not ks.get('service_type', '').startswith('gateway:')]
             self._writable_services = [ks for ks in self._keep_services
                                        if not ks.get('read_only')]
                 if not ks.get('service_type', '').startswith('gateway:')]
             self._writable_services = [ks for ks in self._keep_services
                                        if not ks.get('read_only')]
@@ -850,7 +904,7 @@ class KeepClient(object):
         The weight is md5(h + u) where u is the last 15 characters of
         the service endpoint's UUID.
         """
         The weight is md5(h + u) where u is the last 15 characters of
         the service endpoint's UUID.
         """
-        return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
+        return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
 
     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
         """Return an array of Keep service endpoints, in the order in
 
     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
         """Return an array of Keep service endpoints, in the order in
@@ -888,7 +942,7 @@ class KeepClient(object):
         _logger.debug("{}: {}".format(locator, sorted_roots))
         return sorted_roots
 
         _logger.debug("{}: {}".format(locator, sorted_roots))
         return sorted_roots
 
-    def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
+    def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
         # roots_map is a dictionary, mapping Keep service root strings
         # to KeepService objects.  Poll for Keep services, and add any
         # new ones to roots_map.  Return the current list of local
         # roots_map is a dictionary, mapping Keep service root strings
         # to KeepService objects.  Poll for Keep services, and add any
         # new ones to roots_map.  Return the current list of local
@@ -901,7 +955,8 @@ class KeepClient(object):
                     root, self._user_agent_pool,
                     upload_counter=self.upload_counter,
                     download_counter=self.download_counter,
                     root, self._user_agent_pool,
                     upload_counter=self.upload_counter,
                     download_counter=self.download_counter,
-                    **headers)
+                    headers=headers,
+                    insecure=self.insecure)
         return local_roots
 
     @staticmethod
         return local_roots
 
     @staticmethod
@@ -930,15 +985,20 @@ class KeepClient(object):
         else:
             return None
 
         else:
             return None
 
+    def refresh_signature(self, loc):
+        """Ask Keep to get the remote block and return its local signature"""
+        now = datetime.datetime.utcnow().isoformat("T") + 'Z'
+        return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
+
     @retry.retry_method
     @retry.retry_method
-    def head(self, loc_s, num_retries=None):
-        return self._get_or_head(loc_s, method="HEAD", num_retries=num_retries)
+    def head(self, loc_s, **kwargs):
+        return self._get_or_head(loc_s, method="HEAD", **kwargs)
 
     @retry.retry_method
 
     @retry.retry_method
-    def get(self, loc_s, num_retries=None):
-        return self._get_or_head(loc_s, method="GET", num_retries=num_retries)
+    def get(self, loc_s, **kwargs):
+        return self._get_or_head(loc_s, method="GET", **kwargs)
 
 
-    def _get_or_head(self, loc_s, method="GET", num_retries=None):
+    def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None):
         """Get data from Keep.
 
         This method fetches one or more blocks of data from Keep.  It
         """Get data from Keep.
 
         This method fetches one or more blocks of data from Keep.  It
@@ -963,76 +1023,88 @@ class KeepClient(object):
 
         self.get_counter.add(1)
 
 
         self.get_counter.add(1)
 
-        locator = KeepLocator(loc_s)
-        if method == "GET":
-            slot, first = self.block_cache.reserve_cache(locator.md5sum)
-            if not first:
-                self.hits_counter.add(1)
-                v = slot.get()
-                return v
-
-        self.misses_counter.add(1)
-
-        # If the locator has hints specifying a prefix (indicating a
-        # remote keepproxy) or the UUID of a local gateway service,
-        # read data from the indicated service(s) instead of the usual
-        # list of local disk services.
-        hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
-                      for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
-        hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
-                           for hint in locator.hints if (
-                                   hint.startswith('K@') and
-                                   len(hint) == 29 and
-                                   self._gateway_services.get(hint[2:])
-                                   )])
-        # Map root URLs to their KeepService objects.
-        roots_map = {
-            root: self.KeepService(root, self._user_agent_pool,
-                                   upload_counter=self.upload_counter,
-                                   download_counter=self.download_counter)
-            for root in hint_roots
-        }
-
-        # See #3147 for a discussion of the loop implementation.  Highlights:
-        # * Refresh the list of Keep services after each failure, in case
-        #   it's being updated.
-        # * Retry until we succeed, we're out of retries, or every available
-        #   service has returned permanent failure.
-        sorted_roots = []
-        roots_map = {}
+        slot = None
         blob = None
         blob = None
-        loop = retry.RetryLoop(num_retries, self._check_loop_result,
-                               backoff_start=2)
-        for tries_left in loop:
-            try:
-                sorted_roots = self.map_new_services(
-                    roots_map, locator,
-                    force_rebuild=(tries_left < num_retries),
-                    need_writable=False)
-            except Exception as error:
-                loop.save_result(error)
-                continue
+        try:
+            locator = KeepLocator(loc_s)
+            if method == "GET":
+                slot, first = self.block_cache.reserve_cache(locator.md5sum)
+                if not first:
+                    self.hits_counter.add(1)
+                    blob = slot.get()
+                    if blob is None:
+                        raise arvados.errors.KeepReadError(
+                            "failed to read {}".format(loc_s))
+                    return blob
+
+            self.misses_counter.add(1)
+
+            if headers is None:
+                headers = {}
+            headers['X-Request-Id'] = (request_id or
+                                        (hasattr(self, 'api_client') and self.api_client.request_id) or
+                                        arvados.util.new_request_id())
+
+            # If the locator has hints specifying a prefix (indicating a
+            # remote keepproxy) or the UUID of a local gateway service,
+            # read data from the indicated service(s) instead of the usual
+            # list of local disk services.
+            hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
+                          for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
+            hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
+                               for hint in locator.hints if (
+                                       hint.startswith('K@') and
+                                       len(hint) == 29 and
+                                       self._gateway_services.get(hint[2:])
+                                       )])
+            # Map root URLs to their KeepService objects.
+            roots_map = {
+                root: self.KeepService(root, self._user_agent_pool,
+                                       upload_counter=self.upload_counter,
+                                       download_counter=self.download_counter,
+                                       headers=headers,
+                                       insecure=self.insecure)
+                for root in hint_roots
+            }
+
+            # See #3147 for a discussion of the loop implementation.  Highlights:
+            # * Refresh the list of Keep services after each failure, in case
+            #   it's being updated.
+            # * Retry until we succeed, we're out of retries, or every available
+            #   service has returned permanent failure.
+            sorted_roots = []
+            roots_map = {}
+            loop = retry.RetryLoop(num_retries, self._check_loop_result,
+                                   backoff_start=2)
+            for tries_left in loop:
+                try:
+                    sorted_roots = self.map_new_services(
+                        roots_map, locator,
+                        force_rebuild=(tries_left < num_retries),
+                        need_writable=False,
+                        headers=headers)
+                except Exception as error:
+                    loop.save_result(error)
+                    continue
 
 
-            # Query KeepService objects that haven't returned
-            # permanent failure, in our specified shuffle order.
-            services_to_try = [roots_map[root]
-                               for root in sorted_roots
-                               if roots_map[root].usable()]
-            for keep_service in services_to_try:
-                blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
-                if blob is not None:
-                    break
-            loop.save_result((blob, len(services_to_try)))
-
-        # Always cache the result, then return it if we succeeded.
-        if method == "GET":
-            slot.set(blob)
-            self.block_cache.cap_cache()
-        if loop.success():
-            if method == "HEAD":
-                return True
-            else:
+                # Query KeepService objects that haven't returned
+                # permanent failure, in our specified shuffle order.
+                services_to_try = [roots_map[root]
+                                   for root in sorted_roots
+                                   if roots_map[root].usable()]
+                for keep_service in services_to_try:
+                    blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
+                    if blob is not None:
+                        break
+                loop.save_result((blob, len(services_to_try)))
+
+            # Always cache the result, then return it if we succeeded.
+            if loop.success():
                 return blob
                 return blob
+        finally:
+            if slot is not None:
+                slot.set(blob)
+                self.block_cache.cap_cache()
 
         # Q: Including 403 is necessary for the Keep tests to continue
         # passing, but maybe they should expect KeepReadError instead?
 
         # Q: Including 403 is necessary for the Keep tests to continue
         # passing, but maybe they should expect KeepReadError instead?
@@ -1049,10 +1121,10 @@ class KeepClient(object):
                 "{} not found".format(loc_s), service_errors)
         else:
             raise arvados.errors.KeepReadError(
                 "{} not found".format(loc_s), service_errors)
         else:
             raise arvados.errors.KeepReadError(
-                "failed to read {}".format(loc_s), service_errors, label="service")
+                "failed to read {} after {}".format(loc_s, loop.attempts_str()), service_errors, label="service")
 
     @retry.retry_method
 
     @retry.retry_method
-    def put(self, data, copies=2, num_retries=None):
+    def put(self, data, copies=2, num_retries=None, request_id=None):
         """Save data in Keep.
 
         This method will get a list of Keep services from the API server, and
         """Save data in Keep.
 
         This method will get a list of Keep services from the API server, and
@@ -1071,10 +1143,8 @@ class KeepClient(object):
           KeepClient is initialized.
         """
 
           KeepClient is initialized.
         """
 
-        if isinstance(data, unicode):
-            data = data.encode("ascii")
-        elif not isinstance(data, str):
-            raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
+        if not isinstance(data, bytes):
+            data = data.encode()
 
         self.put_counter.add(1)
 
 
         self.put_counter.add(1)
 
@@ -1084,9 +1154,12 @@ class KeepClient(object):
             return loc_s
         locator = KeepLocator(loc_s)
 
             return loc_s
         locator = KeepLocator(loc_s)
 
-        headers = {}
-        # Tell the proxy how many copies we want it to store
-        headers['X-Keep-Desired-Replicas'] = str(copies)
+        headers = {
+            'X-Request-Id': (request_id or
+                             (hasattr(self, 'api_client') and self.api_client.request_id) or
+                             arvados.util.new_request_id()),
+            'X-Keep-Desired-Replicas': str(copies),
+        }
         roots_map = {}
         loop = retry.RetryLoop(num_retries, self._check_loop_result,
                                backoff_start=2)
         roots_map = {}
         loop = retry.RetryLoop(num_retries, self._check_loop_result,
                                backoff_start=2)
@@ -1095,12 +1168,14 @@ class KeepClient(object):
             try:
                 sorted_roots = self.map_new_services(
                     roots_map, locator,
             try:
                 sorted_roots = self.map_new_services(
                     roots_map, locator,
-                    force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
+                    force_rebuild=(tries_left < num_retries),
+                    need_writable=True,
+                    headers=headers)
             except Exception as error:
                 loop.save_result(error)
                 continue
 
             except Exception as error:
                 loop.save_result(error)
                 continue
 
-            writer_pool = KeepClient.KeepWriterThreadPool(data=data, 
+            writer_pool = KeepClient.KeepWriterThreadPool(data=data,
                                                         data_hash=data_hash,
                                                         copies=copies - done,
                                                         max_service_replicas=self.max_replicas_per_service,
                                                         data_hash=data_hash,
                                                         copies=copies - done,
                                                         max_service_replicas=self.max_replicas_per_service,
@@ -1125,8 +1200,8 @@ class KeepClient(object):
                               for key in sorted_roots
                               if roots_map[key].last_result()['error'])
             raise arvados.errors.KeepWriteError(
                               for key in sorted_roots
                               if roots_map[key].last_result()['error'])
             raise arvados.errors.KeepWriteError(
-                "failed to write {} (wanted {} copies but wrote {})".format(
-                    data_hash, copies, writer_pool.done()), service_errors, label="service")
+                "failed to write {} after {} (wanted {} copies but wrote {})".format(
+                    data_hash, loop.attempts_str(), copies, writer_pool.done()), service_errors, label="service")
 
     def local_store_put(self, data, copies=1, num_retries=None):
         """A stub for put().
 
     def local_store_put(self, data, copies=1, num_retries=None):
         """A stub for put().
@@ -1142,7 +1217,7 @@ class KeepClient(object):
         """
         md5 = hashlib.md5(data).hexdigest()
         locator = '%s+%d' % (md5, len(data))
         """
         md5 = hashlib.md5(data).hexdigest()
         locator = '%s+%d' % (md5, len(data))
-        with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
+        with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
             f.write(data)
         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
                   os.path.join(self.local_store, md5))
             f.write(data)
         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
                   os.path.join(self.local_store, md5))
@@ -1156,9 +1231,21 @@ class KeepClient(object):
             raise arvados.errors.NotFoundError(
                 "Invalid data locator: '%s'" % loc_s)
         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
             raise arvados.errors.NotFoundError(
                 "Invalid data locator: '%s'" % loc_s)
         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
-            return ''
-        with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
+            return b''
+        with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
             return f.read()
 
             return f.read()
 
+    def local_store_head(self, loc_s, num_retries=None):
+        """Companion to local_store_put()."""
+        try:
+            locator = KeepLocator(loc_s)
+        except ValueError:
+            raise arvados.errors.NotFoundError(
+                "Invalid data locator: '%s'" % loc_s)
+        if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
+            return True
+        if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
+            return True
+
     def is_cached(self, locator):
         return self.block_cache.reserve_cache(expect_hash)
     def is_cached(self, locator):
         return self.block_cache.reserve_cache(expect_hash)