X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/607fe087f6167061714a524dd53cbbc21b974973..c02ceff00fce94ec5794b53fe890f681acf31121:/sdk/python/arvados/keep.py diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py index b0413ebf92..351f7f5dda 100644 --- a/sdk/python/arvados/keep.py +++ b/sdk/python/arvados/keep.py @@ -1,6 +1,11 @@ +# Copyright (C) The Arvados Authors. All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + from __future__ import absolute_import from __future__ import division from future import standard_library +from future.utils import native_str standard_library.install_aliases() from builtins import next from builtins import str @@ -73,8 +78,9 @@ class KeepLocator(object): def __str__(self): return '+'.join( - str(s) for s in [self.md5sum, self.size, - self.permission_hint()] + self.hints + native_str(s) + for s in [self.md5sum, self.size, + self.permission_hint()] + self.hints if s is not None) def stripped(self): @@ -285,7 +291,8 @@ class KeepClient(object): def __init__(self, root, user_agent_pool=queue.LifoQueue(), upload_counter=None, - download_counter=None, **headers): + download_counter=None, + headers={}): self.root = root self._user_agent_pool = user_agent_pool self._result = {'error': None} @@ -914,7 +921,7 @@ class KeepClient(object): _logger.debug("{}: {}".format(locator, sorted_roots)) return sorted_roots - def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers): + def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers): # roots_map is a dictionary, mapping Keep service root strings # to KeepService objects. Poll for Keep services, and add any # new ones to roots_map. Return the current list of local @@ -927,7 +934,7 @@ class KeepClient(object): root, self._user_agent_pool, upload_counter=self.upload_counter, download_counter=self.download_counter, - **headers) + headers=headers) return local_roots @staticmethod @@ -957,14 +964,14 @@ class KeepClient(object): return None @retry.retry_method - def head(self, loc_s, num_retries=None): - return self._get_or_head(loc_s, method="HEAD", num_retries=num_retries) + def head(self, loc_s, **kwargs): + return self._get_or_head(loc_s, method="HEAD", **kwargs) @retry.retry_method - def get(self, loc_s, num_retries=None): - return self._get_or_head(loc_s, method="GET", num_retries=num_retries) + def get(self, loc_s, **kwargs): + return self._get_or_head(loc_s, method="GET", **kwargs) - def _get_or_head(self, loc_s, method="GET", num_retries=None): + def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None): """Get data from Keep. This method fetches one or more blocks of data from Keep. It @@ -999,6 +1006,12 @@ class KeepClient(object): self.misses_counter.add(1) + headers = { + 'X-Request-Id': (request_id or + (hasattr(self, 'api_client') and self.api_client.request_id) or + arvados.util.new_request_id()), + } + # If the locator has hints specifying a prefix (indicating a # remote keepproxy) or the UUID of a local gateway service, # read data from the indicated service(s) instead of the usual @@ -1015,7 +1028,8 @@ class KeepClient(object): roots_map = { root: self.KeepService(root, self._user_agent_pool, upload_counter=self.upload_counter, - download_counter=self.download_counter) + download_counter=self.download_counter, + headers=headers) for root in hint_roots } @@ -1034,7 +1048,8 @@ class KeepClient(object): sorted_roots = self.map_new_services( roots_map, locator, force_rebuild=(tries_left < num_retries), - need_writable=False) + need_writable=False, + headers=headers) except Exception as error: loop.save_result(error) continue @@ -1078,7 +1093,7 @@ class KeepClient(object): "failed to read {}".format(loc_s), service_errors, label="service") @retry.retry_method - def put(self, data, copies=2, num_retries=None): + def put(self, data, copies=2, num_retries=None, request_id=None): """Save data in Keep. This method will get a list of Keep services from the API server, and @@ -1108,9 +1123,12 @@ class KeepClient(object): return loc_s locator = KeepLocator(loc_s) - headers = {} - # Tell the proxy how many copies we want it to store - headers['X-Keep-Desired-Replicas'] = str(copies) + headers = { + 'X-Request-Id': (request_id or + (hasattr(self, 'api_client') and self.api_client.request_id) or + arvados.util.new_request_id()), + 'X-Keep-Desired-Replicas': str(copies), + } roots_map = {} loop = retry.RetryLoop(num_retries, self._check_loop_result, backoff_start=2) @@ -1119,7 +1137,9 @@ class KeepClient(object): try: sorted_roots = self.map_new_services( roots_map, locator, - force_rebuild=(tries_left < num_retries), need_writable=True, **headers) + force_rebuild=(tries_left < num_retries), + need_writable=True, + headers=headers) except Exception as error: loop.save_result(error) continue