X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/a6419676c073a863232c4656f0602b2d038ec3cd..4ffe3382ff35cebce873668dfdfad2eef2def3d3:/sdk/python/arvados/keep.py diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py index e3ef642c96..0018687ff3 100644 --- a/sdk/python/arvados/keep.py +++ b/sdk/python/arvados/keep.py @@ -570,8 +570,8 @@ class KeepClient(object): self.confirmed_storage_classes = {} self.response = None self.storage_classes_tracking = True - self.queue_data_lock = threading.Lock() - self.pending_tries = max(copies, len(classes))+1 + self.queue_data_lock = threading.RLock() + self.pending_tries = max(copies, len(classes)) self.pending_tries_notification = threading.Condition() def write_success(self, response, replicas_nr, classes_confirmed): @@ -585,6 +585,7 @@ class KeepClient(object): self.confirmed_storage_classes[st_class] += st_copies except KeyError: self.confirmed_storage_classes[st_class] = st_copies + self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes())) self.response = response with self.pending_tries_notification: self.pending_tries_notification.notify_all() @@ -719,11 +720,11 @@ class KeepClient(object): result = service.last_result() if not success: - if result.get('status_code', None): + if result.get('status_code'): _logger.debug("Request fail: PUT %s => %s %s", self.data_hash, - result['status_code'], - result['body']) + result.get('status_code'), + result.get('body')) raise self.TaskFailed() _logger.debug("KeepWriterThread %s succeeded %s+%i %s", @@ -840,6 +841,8 @@ class KeepClient(object): self.get_counter = Counter() self.hits_counter = Counter() self.misses_counter = Counter() + self._storage_classes_unsupported_warning = False + self._default_classes = [] if local_store: self.local_store = local_store @@ -880,6 +883,12 @@ class KeepClient(object): self._writable_services = None self.using_proxy = None self._static_services_list = False + try: + self._default_classes = [ + k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']] + except KeyError: + # We're talking to an old cluster + pass def current_timeout(self, attempt_number): """Return the appropriate timeout to use for this client. @@ -1071,6 +1080,13 @@ class KeepClient(object): self.get_counter.add(1) + request_id = (request_id or + (hasattr(self, 'api_client') and self.api_client.request_id) or + arvados.util.new_request_id()) + if headers is None: + headers = {} + headers['X-Request-Id'] = request_id + slot = None blob = None try: @@ -1087,12 +1103,6 @@ class KeepClient(object): self.misses_counter.add(1) - if headers is None: - headers = {} - headers['X-Request-Id'] = (request_id or - (hasattr(self, 'api_client') and self.api_client.request_id) or - arvados.util.new_request_id()) - # If the locator has hints specifying a prefix (indicating a # remote keepproxy) or the UUID of a local gateway service, # read data from the indicated service(s) instead of the usual @@ -1162,17 +1172,17 @@ class KeepClient(object): for key in sorted_roots) if not roots_map: raise arvados.errors.KeepReadError( - "failed to read {}: no Keep services available ({})".format( - loc_s, loop.last_result())) + "[{}] failed to read {}: no Keep services available ({})".format( + request_id, loc_s, loop.last_result())) elif not_founds == len(sorted_roots): raise arvados.errors.NotFoundError( - "{} not found".format(loc_s), service_errors) + "[{}] {} not found".format(request_id, loc_s), service_errors) else: raise arvados.errors.KeepReadError( - "failed to read {} after {}".format(loc_s, loop.attempts_str()), service_errors, label="service") + "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service") @retry.retry_method - def put(self, data, copies=2, num_retries=None, request_id=None, classes=[]): + def put(self, data, copies=2, num_retries=None, request_id=None, classes=None): """Save data in Keep. This method will get a list of Keep services from the API server, and @@ -1193,6 +1203,8 @@ class KeepClient(object): be written. """ + classes = classes or self._default_classes + if not isinstance(data, bytes): data = data.encode() @@ -1204,10 +1216,11 @@ class KeepClient(object): return loc_s locator = KeepLocator(loc_s) + request_id = (request_id or + (hasattr(self, 'api_client') and self.api_client.request_id) or + arvados.util.new_request_id()) headers = { - 'X-Request-Id': (request_id or - (hasattr(self, 'api_client') and self.api_client.request_id) or - arvados.util.new_request_id()), + 'X-Request-Id': request_id, 'X-Keep-Desired-Replicas': str(copies), } roots_map = {} @@ -1253,7 +1266,9 @@ class KeepClient(object): # success is determined only by successful copies. # # Disable storage classes tracking from this point forward. - _logger.warning("X-Keep-Storage-Classes header not supported by the cluster") + if not self._storage_classes_unsupported_warning: + self._storage_classes_unsupported_warning = True + _logger.warning("X-Keep-Storage-Classes header not supported by the cluster") done_classes = None loop.save_result( (done_copies >= copies, writer_pool.total_task_nr)) @@ -1262,17 +1277,17 @@ class KeepClient(object): return writer_pool.response() if not roots_map: raise arvados.errors.KeepWriteError( - "failed to write {}: no Keep services available ({})".format( - data_hash, loop.last_result())) + "[{}] failed to write {}: no Keep services available ({})".format( + request_id, data_hash, loop.last_result())) else: service_errors = ((key, roots_map[key].last_result()['error']) for key in sorted_roots if roots_map[key].last_result()['error']) raise arvados.errors.KeepWriteError( - "failed to write {} after {} (wanted {} copies but wrote {})".format( - data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service") + "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format( + request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service") - def local_store_put(self, data, copies=1, num_retries=None): + def local_store_put(self, data, copies=1, num_retries=None, classes=[]): """A stub for put(). This method is used in place of the real put() method when