Merge branch 'patch-1' of https://github.com/mr-c/arvados into mr-c-patch-1
[arvados.git] / sdk / python / arvados / keep.py
index 5b4770c4d0dca8824c268448296d0658c8ba04d8..bc43b849c3a01dd661c4ba080d83f65f597adde6 100644 (file)
@@ -1,19 +1,37 @@
-import cStringIO
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import absolute_import
+from __future__ import division
+from future import standard_library
+from future.utils import native_str
+standard_library.install_aliases()
+from builtins import next
+from builtins import str
+from builtins import range
+from builtins import object
 import collections
 import datetime
 import hashlib
+import io
 import logging
 import math
 import os
 import pycurl
-import Queue
+import queue
 import re
 import socket
 import ssl
 import sys
 import threading
-import timer
-import urlparse
+from . import timer
+import urllib.parse
+
+if sys.version_info >= (3, 0):
+    from io import BytesIO
+else:
+    from cStringIO import StringIO as BytesIO
 
 import arvados
 import arvados.config as config
@@ -60,8 +78,9 @@ class KeepLocator(object):
 
     def __str__(self):
         return '+'.join(
-            str(s) for s in [self.md5sum, self.size,
-                             self.permission_hint()] + self.hints
+            native_str(s)
+            for s in [self.md5sum, self.size,
+                      self.permission_hint()] + self.hints
             if s is not None)
 
     def stripped(self):
@@ -78,7 +97,7 @@ class KeepLocator(object):
             return getattr(self, data_name)
         def setter(self, hex_str):
             if not arvados.util.is_hex(hex_str, length):
-                raise ValueError("{} is not a {}-digit hex string: {}".
+                raise ValueError("{} is not a {}-digit hex string: {!r}".
                                  format(name, length, hex_str))
             setattr(self, data_name, hex_str)
         return property(getter, setter)
@@ -191,7 +210,7 @@ class KeepBlockCache(object):
             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
             sm = sum([slot.size() for slot in self._cache])
             while len(self._cache) > 0 and sm > self.cache_max:
-                for i in xrange(len(self._cache)-1, -1, -1):
+                for i in range(len(self._cache)-1, -1, -1):
                     if self._cache[i].ready.is_set():
                         del self._cache[i]
                         break
@@ -199,7 +218,7 @@ class KeepBlockCache(object):
 
     def _get(self, locator):
         # Test if the locator is already in the cache
-        for i in xrange(0, len(self._cache)):
+        for i in range(0, len(self._cache)):
             if self._cache[i].locator == locator:
                 n = self._cache[i]
                 if i != 0:
@@ -270,19 +289,23 @@ class KeepClient(object):
             arvados.errors.HttpError,
         )
 
-        def __init__(self, root, user_agent_pool=Queue.LifoQueue(),
+        def __init__(self, root, user_agent_pool=queue.LifoQueue(),
                      upload_counter=None,
-                     download_counter=None, **headers):
+                     download_counter=None,
+                     headers={},
+                     insecure=False):
             self.root = root
             self._user_agent_pool = user_agent_pool
             self._result = {'error': None}
             self._usable = True
             self._session = None
+            self._socket = None
             self.get_headers = {'Accept': 'application/octet-stream'}
             self.get_headers.update(headers)
             self.put_headers = headers
             self.upload_counter = upload_counter
             self.download_counter = download_counter
+            self.insecure = insecure
 
         def usable(self):
             """Is it worth attempting a request?"""
@@ -298,7 +321,7 @@ class KeepClient(object):
         def _get_user_agent(self):
             try:
                 return self._user_agent_pool.get(block=False)
-            except Queue.Empty:
+            except queue.Empty:
                 return pycurl.Curl()
 
         def _put_user_agent(self, ua):
@@ -329,6 +352,7 @@ class KeepClient(object):
             if hasattr(socket, 'TCP_KEEPIDLE'):
                 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
+            self._socket = s
             return s
 
         def get(self, locator, method="GET", timeout=None):
@@ -340,22 +364,31 @@ class KeepClient(object):
             try:
                 with timer.Timer() as t:
                     self._headers = {}
-                    response_body = cStringIO.StringIO()
+                    response_body = BytesIO()
                     curl.setopt(pycurl.NOSIGNAL, 1)
-                    curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
+                    curl.setopt(pycurl.OPENSOCKETFUNCTION,
+                                lambda *args, **kwargs: self._socket_open(*args, **kwargs))
                     curl.setopt(pycurl.URL, url.encode('utf-8'))
                     curl.setopt(pycurl.HTTPHEADER, [
-                        '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
+                        '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
+                    if self.insecure:
+                        curl.setopt(pycurl.SSL_VERIFYPEER, 0)
+                    else:
+                        curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
                     if method == "HEAD":
                         curl.setopt(pycurl.NOBODY, True)
-                    self._setcurltimeouts(curl, timeout)
+                    self._setcurltimeouts(curl, timeout, method=="HEAD")
 
                     try:
                         curl.perform()
                     except Exception as e:
                         raise arvados.errors.HttpError(0, str(e))
+                    finally:
+                        if self._socket:
+                            self._socket.close()
+                            self._socket = None
                     self._result = {
                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
                         'body': response_body.getvalue(),
@@ -390,13 +423,17 @@ class KeepClient(object):
                 _logger.info("HEAD %s: %s bytes",
                          self._result['status_code'],
                          self._result.get('content-length'))
+                if self._result['headers'].get('x-keep-locator'):
+                    # This is a response to a remote block copy request, return
+                    # the local copy block locator.
+                    return self._result['headers'].get('x-keep-locator')
                 return True
 
             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
                          self._result['status_code'],
                          len(self._result['body']),
                          t.msecs,
-                         (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
+                         1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
 
             if self.download_counter:
                 self.download_counter.add(len(self._result['body']))
@@ -417,10 +454,11 @@ class KeepClient(object):
             try:
                 with timer.Timer() as t:
                     self._headers = {}
-                    body_reader = cStringIO.StringIO(body)
-                    response_body = cStringIO.StringIO()
+                    body_reader = BytesIO(body)
+                    response_body = BytesIO()
                     curl.setopt(pycurl.NOSIGNAL, 1)
-                    curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
+                    curl.setopt(pycurl.OPENSOCKETFUNCTION,
+                                lambda *args, **kwargs: self._socket_open(*args, **kwargs))
                     curl.setopt(pycurl.URL, url.encode('utf-8'))
                     # Using UPLOAD tells cURL to wait for a "go ahead" from the
                     # Keep server (in the form of a HTTP/1.1 "100 Continue"
@@ -432,17 +470,25 @@ class KeepClient(object):
                     curl.setopt(pycurl.INFILESIZE, len(body))
                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
                     curl.setopt(pycurl.HTTPHEADER, [
-                        '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
+                        '{}: {}'.format(k,v) for k,v in self.put_headers.items()])
                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
+                    if self.insecure:
+                        curl.setopt(pycurl.SSL_VERIFYPEER, 0)
+                    else:
+                        curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
                     self._setcurltimeouts(curl, timeout)
                     try:
                         curl.perform()
                     except Exception as e:
                         raise arvados.errors.HttpError(0, str(e))
+                    finally:
+                        if self._socket:
+                            self._socket.close()
+                            self._socket = None
                     self._result = {
                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
-                        'body': response_body.getvalue(),
+                        'body': response_body.getvalue().decode('utf-8'),
                         'headers': self._headers,
                         'error': False,
                     }
@@ -469,12 +515,12 @@ class KeepClient(object):
                          self._result['status_code'],
                          len(body),
                          t.msecs,
-                         (len(body)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
+                         1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
             if self.upload_counter:
                 self.upload_counter.add(len(body))
             return True
 
-        def _setcurltimeouts(self, curl, timeouts):
+        def _setcurltimeouts(self, curl, timeouts, ignore_bandwidth=False):
             if not timeouts:
                 return
             elif isinstance(timeouts, tuple):
@@ -487,11 +533,13 @@ class KeepClient(object):
                 conn_t, xfer_t = (timeouts, timeouts)
                 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
-            curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
-            curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
+            if not ignore_bandwidth:
+                curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
+                curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
 
         def _headerfunction(self, header_line):
-            header_line = header_line.decode('iso-8859-1')
+            if isinstance(header_line, bytes):
+                header_line = header_line.decode('iso-8859-1')
             if ':' in header_line:
                 name, value = header_line.split(':', 1)
                 name = name.strip().lower()
@@ -508,30 +556,30 @@ class KeepClient(object):
             self._lastheadername = name
             self._headers[name] = value
             # Returning None implies all bytes were written
-    
 
-    class KeepWriterQueue(Queue.Queue):
+
+    class KeepWriterQueue(queue.Queue):
         def __init__(self, copies):
-            Queue.Queue.__init__(self) # Old-style superclass
+            queue.Queue.__init__(self) # Old-style superclass
             self.wanted_copies = copies
             self.successful_copies = 0
             self.response = None
             self.successful_copies_lock = threading.Lock()
             self.pending_tries = copies
             self.pending_tries_notification = threading.Condition()
-        
+
         def write_success(self, response, replicas_nr):
             with self.successful_copies_lock:
                 self.successful_copies += replicas_nr
                 self.response = response
             with self.pending_tries_notification:
                 self.pending_tries_notification.notify_all()
-        
+
         def write_fail(self, ks):
             with self.pending_tries_notification:
                 self.pending_tries += 1
                 self.pending_tries_notification.notify()
-        
+
         def pending_copies(self):
             with self.successful_copies_lock:
                 return self.wanted_copies - self.successful_copies
@@ -560,7 +608,7 @@ class KeepClient(object):
                         return service, service_root
                     elif self.empty():
                         self.pending_tries_notification.notify_all()
-                        raise Queue.Empty
+                        raise queue.Empty
                     else:
                         self.pending_tries_notification.wait()
 
@@ -572,7 +620,7 @@ class KeepClient(object):
             if (not max_service_replicas) or (max_service_replicas >= copies):
                 num_threads = 1
             else:
-                num_threads = int(math.ceil(float(copies) / max_service_replicas))
+                num_threads = int(math.ceil(1.0*copies/max_service_replicas))
             _logger.debug("Pool max threads is %d", num_threads)
             self.workers = []
             self.queue = KeepClient.KeepWriterQueue(copies)
@@ -580,25 +628,25 @@ class KeepClient(object):
             for _ in range(num_threads):
                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
                 self.workers.append(w)
-        
+
         def add_task(self, ks, service_root):
             self.queue.put((ks, service_root))
             self.total_task_nr += 1
-        
+
         def done(self):
             return self.queue.successful_copies
-        
+
         def join(self):
             # Start workers
             for worker in self.workers:
                 worker.start()
             # Wait for finished work
             self.queue.join()
-        
+
         def response(self):
             return self.queue.response
-    
-    
+
+
     class KeepWriterThread(threading.Thread):
         TaskFailed = RuntimeError()
 
@@ -614,7 +662,7 @@ class KeepClient(object):
             while True:
                 try:
                     service, service_root = self.queue.get_next_task()
-                except Queue.Empty:
+                except queue.Empty:
                     return
                 try:
                     locator, copies = self.do_task(service, service_root)
@@ -729,10 +777,15 @@ class KeepClient(object):
         if local_store is None:
             local_store = os.environ.get('KEEP_LOCAL_STORE')
 
+        if api_client is None:
+            self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
+        else:
+            self.insecure = api_client.insecure
+
         self.block_cache = block_cache if block_cache else KeepBlockCache()
         self.timeout = timeout
         self.proxy_timeout = proxy_timeout
-        self._user_agent_pool = Queue.LifoQueue()
+        self._user_agent_pool = queue.LifoQueue()
         self.upload_counter = Counter()
         self.download_counter = Counter()
         self.put_counter = Counter()
@@ -742,6 +795,7 @@ class KeepClient(object):
 
         if local_store:
             self.local_store = local_store
+            self.head = self.local_store_head
             self.get = self.local_store_get
             self.put = self.local_store_put
         else:
@@ -753,7 +807,7 @@ class KeepClient(object):
                     if not proxy_uris[i].endswith('/'):
                         proxy_uris[i] += '/'
                     # URL validation
-                    url = urlparse.urlparse(proxy_uris[i])
+                    url = urllib.parse.urlparse(proxy_uris[i])
                     if not (url.scheme and url.netloc):
                         raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
                 self.api_token = api_token
@@ -819,7 +873,7 @@ class KeepClient(object):
                 raise arvados.errors.NoKeepServersError()
 
             # Precompute the base URI for each service.
-            for r in self._gateway_services.itervalues():
+            for r in self._gateway_services.values():
                 host = r['service_host']
                 if not host.startswith('[') and host.find(':') >= 0:
                     # IPv6 URIs must be formatted like http://[::1]:80/...
@@ -831,7 +885,7 @@ class KeepClient(object):
 
             _logger.debug(str(self._gateway_services))
             self._keep_services = [
-                ks for ks in self._gateway_services.itervalues()
+                ks for ks in self._gateway_services.values()
                 if not ks.get('service_type', '').startswith('gateway:')]
             self._writable_services = [ks for ks in self._keep_services
                                        if not ks.get('read_only')]
@@ -850,7 +904,7 @@ class KeepClient(object):
         The weight is md5(h + u) where u is the last 15 characters of
         the service endpoint's UUID.
         """
-        return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
+        return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
 
     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
         """Return an array of Keep service endpoints, in the order in
@@ -888,7 +942,7 @@ class KeepClient(object):
         _logger.debug("{}: {}".format(locator, sorted_roots))
         return sorted_roots
 
-    def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
+    def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
         # roots_map is a dictionary, mapping Keep service root strings
         # to KeepService objects.  Poll for Keep services, and add any
         # new ones to roots_map.  Return the current list of local
@@ -901,7 +955,8 @@ class KeepClient(object):
                     root, self._user_agent_pool,
                     upload_counter=self.upload_counter,
                     download_counter=self.download_counter,
-                    **headers)
+                    headers=headers,
+                    insecure=self.insecure)
         return local_roots
 
     @staticmethod
@@ -930,15 +985,20 @@ class KeepClient(object):
         else:
             return None
 
+    def refresh_signature(self, loc):
+        """Ask Keep to get the remote block and return its local signature"""
+        now = datetime.datetime.utcnow().isoformat("T") + 'Z'
+        return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
+
     @retry.retry_method
-    def head(self, loc_s, num_retries=None):
-        return self._get_or_head(loc_s, method="HEAD", num_retries=num_retries)
+    def head(self, loc_s, **kwargs):
+        return self._get_or_head(loc_s, method="HEAD", **kwargs)
 
     @retry.retry_method
-    def get(self, loc_s, num_retries=None):
-        return self._get_or_head(loc_s, method="GET", num_retries=num_retries)
+    def get(self, loc_s, **kwargs):
+        return self._get_or_head(loc_s, method="GET", **kwargs)
 
-    def _get_or_head(self, loc_s, method="GET", num_retries=None):
+    def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None):
         """Get data from Keep.
 
         This method fetches one or more blocks of data from Keep.  It
@@ -963,76 +1023,88 @@ class KeepClient(object):
 
         self.get_counter.add(1)
 
-        locator = KeepLocator(loc_s)
-        if method == "GET":
-            slot, first = self.block_cache.reserve_cache(locator.md5sum)
-            if not first:
-                self.hits_counter.add(1)
-                v = slot.get()
-                return v
-
-        self.misses_counter.add(1)
-
-        # If the locator has hints specifying a prefix (indicating a
-        # remote keepproxy) or the UUID of a local gateway service,
-        # read data from the indicated service(s) instead of the usual
-        # list of local disk services.
-        hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
-                      for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
-        hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
-                           for hint in locator.hints if (
-                                   hint.startswith('K@') and
-                                   len(hint) == 29 and
-                                   self._gateway_services.get(hint[2:])
-                                   )])
-        # Map root URLs to their KeepService objects.
-        roots_map = {
-            root: self.KeepService(root, self._user_agent_pool,
-                                   upload_counter=self.upload_counter,
-                                   download_counter=self.download_counter)
-            for root in hint_roots
-        }
-
-        # See #3147 for a discussion of the loop implementation.  Highlights:
-        # * Refresh the list of Keep services after each failure, in case
-        #   it's being updated.
-        # * Retry until we succeed, we're out of retries, or every available
-        #   service has returned permanent failure.
-        sorted_roots = []
-        roots_map = {}
+        slot = None
         blob = None
-        loop = retry.RetryLoop(num_retries, self._check_loop_result,
-                               backoff_start=2)
-        for tries_left in loop:
-            try:
-                sorted_roots = self.map_new_services(
-                    roots_map, locator,
-                    force_rebuild=(tries_left < num_retries),
-                    need_writable=False)
-            except Exception as error:
-                loop.save_result(error)
-                continue
+        try:
+            locator = KeepLocator(loc_s)
+            if method == "GET":
+                slot, first = self.block_cache.reserve_cache(locator.md5sum)
+                if not first:
+                    self.hits_counter.add(1)
+                    blob = slot.get()
+                    if blob is None:
+                        raise arvados.errors.KeepReadError(
+                            "failed to read {}".format(loc_s))
+                    return blob
+
+            self.misses_counter.add(1)
+
+            if headers is None:
+                headers = {}
+            headers['X-Request-Id'] = (request_id or
+                                        (hasattr(self, 'api_client') and self.api_client.request_id) or
+                                        arvados.util.new_request_id())
+
+            # If the locator has hints specifying a prefix (indicating a
+            # remote keepproxy) or the UUID of a local gateway service,
+            # read data from the indicated service(s) instead of the usual
+            # list of local disk services.
+            hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
+                          for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
+            hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
+                               for hint in locator.hints if (
+                                       hint.startswith('K@') and
+                                       len(hint) == 29 and
+                                       self._gateway_services.get(hint[2:])
+                                       )])
+            # Map root URLs to their KeepService objects.
+            roots_map = {
+                root: self.KeepService(root, self._user_agent_pool,
+                                       upload_counter=self.upload_counter,
+                                       download_counter=self.download_counter,
+                                       headers=headers,
+                                       insecure=self.insecure)
+                for root in hint_roots
+            }
+
+            # See #3147 for a discussion of the loop implementation.  Highlights:
+            # * Refresh the list of Keep services after each failure, in case
+            #   it's being updated.
+            # * Retry until we succeed, we're out of retries, or every available
+            #   service has returned permanent failure.
+            sorted_roots = []
+            roots_map = {}
+            loop = retry.RetryLoop(num_retries, self._check_loop_result,
+                                   backoff_start=2)
+            for tries_left in loop:
+                try:
+                    sorted_roots = self.map_new_services(
+                        roots_map, locator,
+                        force_rebuild=(tries_left < num_retries),
+                        need_writable=False,
+                        headers=headers)
+                except Exception as error:
+                    loop.save_result(error)
+                    continue
 
-            # Query KeepService objects that haven't returned
-            # permanent failure, in our specified shuffle order.
-            services_to_try = [roots_map[root]
-                               for root in sorted_roots
-                               if roots_map[root].usable()]
-            for keep_service in services_to_try:
-                blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
-                if blob is not None:
-                    break
-            loop.save_result((blob, len(services_to_try)))
-
-        # Always cache the result, then return it if we succeeded.
-        if method == "GET":
-            slot.set(blob)
-            self.block_cache.cap_cache()
-        if loop.success():
-            if method == "HEAD":
-                return True
-            else:
+                # Query KeepService objects that haven't returned
+                # permanent failure, in our specified shuffle order.
+                services_to_try = [roots_map[root]
+                                   for root in sorted_roots
+                                   if roots_map[root].usable()]
+                for keep_service in services_to_try:
+                    blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
+                    if blob is not None:
+                        break
+                loop.save_result((blob, len(services_to_try)))
+
+            # Always cache the result, then return it if we succeeded.
+            if loop.success():
                 return blob
+        finally:
+            if slot is not None:
+                slot.set(blob)
+                self.block_cache.cap_cache()
 
         # Q: Including 403 is necessary for the Keep tests to continue
         # passing, but maybe they should expect KeepReadError instead?
@@ -1049,10 +1121,10 @@ class KeepClient(object):
                 "{} not found".format(loc_s), service_errors)
         else:
             raise arvados.errors.KeepReadError(
-                "failed to read {}".format(loc_s), service_errors, label="service")
+                "failed to read {} after {}".format(loc_s, loop.attempts_str()), service_errors, label="service")
 
     @retry.retry_method
-    def put(self, data, copies=2, num_retries=None):
+    def put(self, data, copies=2, num_retries=None, request_id=None):
         """Save data in Keep.
 
         This method will get a list of Keep services from the API server, and
@@ -1071,10 +1143,8 @@ class KeepClient(object):
           KeepClient is initialized.
         """
 
-        if isinstance(data, unicode):
-            data = data.encode("ascii")
-        elif not isinstance(data, str):
-            raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
+        if not isinstance(data, bytes):
+            data = data.encode()
 
         self.put_counter.add(1)
 
@@ -1084,9 +1154,12 @@ class KeepClient(object):
             return loc_s
         locator = KeepLocator(loc_s)
 
-        headers = {}
-        # Tell the proxy how many copies we want it to store
-        headers['X-Keep-Desired-Replicas'] = str(copies)
+        headers = {
+            'X-Request-Id': (request_id or
+                             (hasattr(self, 'api_client') and self.api_client.request_id) or
+                             arvados.util.new_request_id()),
+            'X-Keep-Desired-Replicas': str(copies),
+        }
         roots_map = {}
         loop = retry.RetryLoop(num_retries, self._check_loop_result,
                                backoff_start=2)
@@ -1095,12 +1168,14 @@ class KeepClient(object):
             try:
                 sorted_roots = self.map_new_services(
                     roots_map, locator,
-                    force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
+                    force_rebuild=(tries_left < num_retries),
+                    need_writable=True,
+                    headers=headers)
             except Exception as error:
                 loop.save_result(error)
                 continue
 
-            writer_pool = KeepClient.KeepWriterThreadPool(data=data, 
+            writer_pool = KeepClient.KeepWriterThreadPool(data=data,
                                                         data_hash=data_hash,
                                                         copies=copies - done,
                                                         max_service_replicas=self.max_replicas_per_service,
@@ -1125,8 +1200,8 @@ class KeepClient(object):
                               for key in sorted_roots
                               if roots_map[key].last_result()['error'])
             raise arvados.errors.KeepWriteError(
-                "failed to write {} (wanted {} copies but wrote {})".format(
-                    data_hash, copies, writer_pool.done()), service_errors, label="service")
+                "failed to write {} after {} (wanted {} copies but wrote {})".format(
+                    data_hash, loop.attempts_str(), copies, writer_pool.done()), service_errors, label="service")
 
     def local_store_put(self, data, copies=1, num_retries=None):
         """A stub for put().
@@ -1142,7 +1217,7 @@ class KeepClient(object):
         """
         md5 = hashlib.md5(data).hexdigest()
         locator = '%s+%d' % (md5, len(data))
-        with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
+        with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
             f.write(data)
         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
                   os.path.join(self.local_store, md5))
@@ -1156,9 +1231,21 @@ class KeepClient(object):
             raise arvados.errors.NotFoundError(
                 "Invalid data locator: '%s'" % loc_s)
         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
-            return ''
-        with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
+            return b''
+        with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
             return f.read()
 
+    def local_store_head(self, loc_s, num_retries=None):
+        """Companion to local_store_put()."""
+        try:
+            locator = KeepLocator(loc_s)
+        except ValueError:
+            raise arvados.errors.NotFoundError(
+                "Invalid data locator: '%s'" % loc_s)
+        if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
+            return True
+        if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
+            return True
+
     def is_cached(self, locator):
         return self.block_cache.reserve_cache(expect_hash)