13111: Merge branch 'master' into 12308-go-fuse
[arvados.git] / sdk / python / arvados / keep.py
index e6e93f080659abf9a51cec9d4425cafe8bdc976b..351f7f5dda8a96ebb805fd4d4896380cb3addbb8 100644 (file)
@@ -291,7 +291,8 @@ class KeepClient(object):
 
         def __init__(self, root, user_agent_pool=queue.LifoQueue(),
                      upload_counter=None,
 
         def __init__(self, root, user_agent_pool=queue.LifoQueue(),
                      upload_counter=None,
-                     download_counter=None, **headers):
+                     download_counter=None,
+                     headers={}):
             self.root = root
             self._user_agent_pool = user_agent_pool
             self._result = {'error': None}
             self.root = root
             self._user_agent_pool = user_agent_pool
             self._result = {'error': None}
@@ -920,7 +921,7 @@ class KeepClient(object):
         _logger.debug("{}: {}".format(locator, sorted_roots))
         return sorted_roots
 
         _logger.debug("{}: {}".format(locator, sorted_roots))
         return sorted_roots
 
-    def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
+    def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
         # roots_map is a dictionary, mapping Keep service root strings
         # to KeepService objects.  Poll for Keep services, and add any
         # new ones to roots_map.  Return the current list of local
         # roots_map is a dictionary, mapping Keep service root strings
         # to KeepService objects.  Poll for Keep services, and add any
         # new ones to roots_map.  Return the current list of local
@@ -933,7 +934,7 @@ class KeepClient(object):
                     root, self._user_agent_pool,
                     upload_counter=self.upload_counter,
                     download_counter=self.download_counter,
                     root, self._user_agent_pool,
                     upload_counter=self.upload_counter,
                     download_counter=self.download_counter,
-                    **headers)
+                    headers=headers)
         return local_roots
 
     @staticmethod
         return local_roots
 
     @staticmethod
@@ -963,14 +964,14 @@ class KeepClient(object):
             return None
 
     @retry.retry_method
             return None
 
     @retry.retry_method
-    def head(self, loc_s, num_retries=None):
-        return self._get_or_head(loc_s, method="HEAD", num_retries=num_retries)
+    def head(self, loc_s, **kwargs):
+        return self._get_or_head(loc_s, method="HEAD", **kwargs)
 
     @retry.retry_method
 
     @retry.retry_method
-    def get(self, loc_s, num_retries=None):
-        return self._get_or_head(loc_s, method="GET", num_retries=num_retries)
+    def get(self, loc_s, **kwargs):
+        return self._get_or_head(loc_s, method="GET", **kwargs)
 
 
-    def _get_or_head(self, loc_s, method="GET", num_retries=None):
+    def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None):
         """Get data from Keep.
 
         This method fetches one or more blocks of data from Keep.  It
         """Get data from Keep.
 
         This method fetches one or more blocks of data from Keep.  It
@@ -1005,6 +1006,12 @@ class KeepClient(object):
 
         self.misses_counter.add(1)
 
 
         self.misses_counter.add(1)
 
+        headers = {
+            'X-Request-Id': (request_id or
+                             (hasattr(self, 'api_client') and self.api_client.request_id) or
+                             arvados.util.new_request_id()),
+        }
+
         # If the locator has hints specifying a prefix (indicating a
         # remote keepproxy) or the UUID of a local gateway service,
         # read data from the indicated service(s) instead of the usual
         # If the locator has hints specifying a prefix (indicating a
         # remote keepproxy) or the UUID of a local gateway service,
         # read data from the indicated service(s) instead of the usual
@@ -1021,7 +1028,8 @@ class KeepClient(object):
         roots_map = {
             root: self.KeepService(root, self._user_agent_pool,
                                    upload_counter=self.upload_counter,
         roots_map = {
             root: self.KeepService(root, self._user_agent_pool,
                                    upload_counter=self.upload_counter,
-                                   download_counter=self.download_counter)
+                                   download_counter=self.download_counter,
+                                   headers=headers)
             for root in hint_roots
         }
 
             for root in hint_roots
         }
 
@@ -1040,7 +1048,8 @@ class KeepClient(object):
                 sorted_roots = self.map_new_services(
                     roots_map, locator,
                     force_rebuild=(tries_left < num_retries),
                 sorted_roots = self.map_new_services(
                     roots_map, locator,
                     force_rebuild=(tries_left < num_retries),
-                    need_writable=False)
+                    need_writable=False,
+                    headers=headers)
             except Exception as error:
                 loop.save_result(error)
                 continue
             except Exception as error:
                 loop.save_result(error)
                 continue
@@ -1084,7 +1093,7 @@ class KeepClient(object):
                 "failed to read {}".format(loc_s), service_errors, label="service")
 
     @retry.retry_method
                 "failed to read {}".format(loc_s), service_errors, label="service")
 
     @retry.retry_method
-    def put(self, data, copies=2, num_retries=None):
+    def put(self, data, copies=2, num_retries=None, request_id=None):
         """Save data in Keep.
 
         This method will get a list of Keep services from the API server, and
         """Save data in Keep.
 
         This method will get a list of Keep services from the API server, and
@@ -1114,9 +1123,12 @@ class KeepClient(object):
             return loc_s
         locator = KeepLocator(loc_s)
 
             return loc_s
         locator = KeepLocator(loc_s)
 
-        headers = {}
-        # Tell the proxy how many copies we want it to store
-        headers['X-Keep-Desired-Replicas'] = str(copies)
+        headers = {
+            'X-Request-Id': (request_id or
+                             (hasattr(self, 'api_client') and self.api_client.request_id) or
+                             arvados.util.new_request_id()),
+            'X-Keep-Desired-Replicas': str(copies),
+        }
         roots_map = {}
         loop = retry.RetryLoop(num_retries, self._check_loop_result,
                                backoff_start=2)
         roots_map = {}
         loop = retry.RetryLoop(num_retries, self._check_loop_result,
                                backoff_start=2)
@@ -1125,7 +1137,9 @@ class KeepClient(object):
             try:
                 sorted_roots = self.map_new_services(
                     roots_map, locator,
             try:
                 sorted_roots = self.map_new_services(
                     roots_map, locator,
-                    force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
+                    force_rebuild=(tries_left < num_retries),
+                    need_writable=True,
+                    headers=headers)
             except Exception as error:
                 loop.save_result(error)
                 continue
             except Exception as error:
                 loop.save_result(error)
                 continue