+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
from __future__ import absolute_import
from __future__ import division
from future import standard_library
def __init__(self, root, user_agent_pool=queue.LifoQueue(),
upload_counter=None,
- download_counter=None, **headers):
+ download_counter=None,
+ headers={}):
self.root = root
self._user_agent_pool = user_agent_pool
self._result = {'error': None}
_logger.debug("{}: {}".format(locator, sorted_roots))
return sorted_roots
- def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
+ def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
# roots_map is a dictionary, mapping Keep service root strings
# to KeepService objects. Poll for Keep services, and add any
# new ones to roots_map. Return the current list of local
root, self._user_agent_pool,
upload_counter=self.upload_counter,
download_counter=self.download_counter,
- **headers)
+ headers=headers)
return local_roots
@staticmethod
return None
@retry.retry_method
- def head(self, loc_s, num_retries=None):
- return self._get_or_head(loc_s, method="HEAD", num_retries=num_retries)
+ def head(self, loc_s, **kwargs):
+ return self._get_or_head(loc_s, method="HEAD", **kwargs)
@retry.retry_method
- def get(self, loc_s, num_retries=None):
- return self._get_or_head(loc_s, method="GET", num_retries=num_retries)
+ def get(self, loc_s, **kwargs):
+ return self._get_or_head(loc_s, method="GET", **kwargs)
- def _get_or_head(self, loc_s, method="GET", num_retries=None):
+ def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None):
"""Get data from Keep.
This method fetches one or more blocks of data from Keep. It
self.misses_counter.add(1)
+ headers = {
+ 'X-Request-Id': (request_id or
+ (hasattr(self, 'api_client') and self.api_client.request_id) or
+ arvados.util.new_request_id()),
+ }
+
# If the locator has hints specifying a prefix (indicating a
# remote keepproxy) or the UUID of a local gateway service,
# read data from the indicated service(s) instead of the usual
roots_map = {
root: self.KeepService(root, self._user_agent_pool,
upload_counter=self.upload_counter,
- download_counter=self.download_counter)
+ download_counter=self.download_counter,
+ headers=headers)
for root in hint_roots
}
sorted_roots = self.map_new_services(
roots_map, locator,
force_rebuild=(tries_left < num_retries),
- need_writable=False)
+ need_writable=False,
+ headers=headers)
except Exception as error:
loop.save_result(error)
continue
"failed to read {}".format(loc_s), service_errors, label="service")
@retry.retry_method
- def put(self, data, copies=2, num_retries=None):
+ def put(self, data, copies=2, num_retries=None, request_id=None):
"""Save data in Keep.
This method will get a list of Keep services from the API server, and
return loc_s
locator = KeepLocator(loc_s)
- headers = {}
- # Tell the proxy how many copies we want it to store
- headers['X-Keep-Desired-Replicas'] = str(copies)
+ headers = {
+ 'X-Request-Id': (request_id or
+ (hasattr(self, 'api_client') and self.api_client.request_id) or
+ arvados.util.new_request_id()),
+ 'X-Keep-Desired-Replicas': str(copies),
+ }
roots_map = {}
loop = retry.RetryLoop(num_retries, self._check_loop_result,
backoff_start=2)
try:
sorted_roots = self.map_new_services(
roots_map, locator,
- force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
+ force_rebuild=(tries_left < num_retries),
+ need_writable=True,
+ headers=headers)
except Exception as error:
loop.save_result(error)
continue