From 2321fa1ced5faa1ace9d5b72e3b2eb4afd0e721f Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Fri, 8 Dec 2017 10:13:27 -0500 Subject: [PATCH] 12167: Send caller-provided or random X-Request-Id in Keep requests. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- sdk/python/arvados/keep.py | 44 +++++++++++++++++++++++++------------- 1 file changed, 29 insertions(+), 15 deletions(-) diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py index e6e93f0806..351f7f5dda 100644 --- a/sdk/python/arvados/keep.py +++ b/sdk/python/arvados/keep.py @@ -291,7 +291,8 @@ class KeepClient(object): def __init__(self, root, user_agent_pool=queue.LifoQueue(), upload_counter=None, - download_counter=None, **headers): + download_counter=None, + headers={}): self.root = root self._user_agent_pool = user_agent_pool self._result = {'error': None} @@ -920,7 +921,7 @@ class KeepClient(object): _logger.debug("{}: {}".format(locator, sorted_roots)) return sorted_roots - def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers): + def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers): # roots_map is a dictionary, mapping Keep service root strings # to KeepService objects. Poll for Keep services, and add any # new ones to roots_map. Return the current list of local @@ -933,7 +934,7 @@ class KeepClient(object): root, self._user_agent_pool, upload_counter=self.upload_counter, download_counter=self.download_counter, - **headers) + headers=headers) return local_roots @staticmethod @@ -963,14 +964,14 @@ class KeepClient(object): return None @retry.retry_method - def head(self, loc_s, num_retries=None): - return self._get_or_head(loc_s, method="HEAD", num_retries=num_retries) + def head(self, loc_s, **kwargs): + return self._get_or_head(loc_s, method="HEAD", **kwargs) @retry.retry_method - def get(self, loc_s, num_retries=None): - return self._get_or_head(loc_s, method="GET", num_retries=num_retries) + def get(self, loc_s, **kwargs): + return self._get_or_head(loc_s, method="GET", **kwargs) - def _get_or_head(self, loc_s, method="GET", num_retries=None): + def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None): """Get data from Keep. This method fetches one or more blocks of data from Keep. It @@ -1005,6 +1006,12 @@ class KeepClient(object): self.misses_counter.add(1) + headers = { + 'X-Request-Id': (request_id or + (hasattr(self, 'api_client') and self.api_client.request_id) or + arvados.util.new_request_id()), + } + # If the locator has hints specifying a prefix (indicating a # remote keepproxy) or the UUID of a local gateway service, # read data from the indicated service(s) instead of the usual @@ -1021,7 +1028,8 @@ class KeepClient(object): roots_map = { root: self.KeepService(root, self._user_agent_pool, upload_counter=self.upload_counter, - download_counter=self.download_counter) + download_counter=self.download_counter, + headers=headers) for root in hint_roots } @@ -1040,7 +1048,8 @@ class KeepClient(object): sorted_roots = self.map_new_services( roots_map, locator, force_rebuild=(tries_left < num_retries), - need_writable=False) + need_writable=False, + headers=headers) except Exception as error: loop.save_result(error) continue @@ -1084,7 +1093,7 @@ class KeepClient(object): "failed to read {}".format(loc_s), service_errors, label="service") @retry.retry_method - def put(self, data, copies=2, num_retries=None): + def put(self, data, copies=2, num_retries=None, request_id=None): """Save data in Keep. This method will get a list of Keep services from the API server, and @@ -1114,9 +1123,12 @@ class KeepClient(object): return loc_s locator = KeepLocator(loc_s) - headers = {} - # Tell the proxy how many copies we want it to store - headers['X-Keep-Desired-Replicas'] = str(copies) + headers = { + 'X-Request-Id': (request_id or + (hasattr(self, 'api_client') and self.api_client.request_id) or + arvados.util.new_request_id()), + 'X-Keep-Desired-Replicas': str(copies), + } roots_map = {} loop = retry.RetryLoop(num_retries, self._check_loop_result, backoff_start=2) @@ -1125,7 +1137,9 @@ class KeepClient(object): try: sorted_roots = self.map_new_services( roots_map, locator, - force_rebuild=(tries_left < num_retries), need_writable=True, **headers) + force_rebuild=(tries_left < num_retries), + need_writable=True, + headers=headers) except Exception as error: loop.save_result(error) continue -- 2.30.2