Merge branch '18051-webdav-cache'
[arvados.git] / sdk / python / arvados / keep.py
index 2f20132aecdf17ee9b8d6da2d043037508e989ce..bc07851835e2471ee9f1055b689fe6a789ea4d62 100644 (file)
@@ -570,8 +570,8 @@ class KeepClient(object):
             self.confirmed_storage_classes = {}
             self.response = None
             self.storage_classes_tracking = True
-            self.queue_data_lock = threading.Lock()
-            self.pending_tries = max(copies, len(classes))+1
+            self.queue_data_lock = threading.RLock()
+            self.pending_tries = max(copies, len(classes))
             self.pending_tries_notification = threading.Condition()
 
         def write_success(self, response, replicas_nr, classes_confirmed):
@@ -585,6 +585,7 @@ class KeepClient(object):
                             self.confirmed_storage_classes[st_class] += st_copies
                         except KeyError:
                             self.confirmed_storage_classes[st_class] = st_copies
+                    self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
                 self.response = response
             with self.pending_tries_notification:
                 self.pending_tries_notification.notify_all()
@@ -719,11 +720,11 @@ class KeepClient(object):
             result = service.last_result()
 
             if not success:
-                if result.get('status_code', None):
+                if result.get('status_code'):
                     _logger.debug("Request fail: PUT %s => %s %s",
                                   self.data_hash,
-                                  result['status_code'],
-                                  result['body'])
+                                  result.get('status_code'),
+                                  result.get('body'))
                 raise self.TaskFailed()
 
             _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
@@ -840,6 +841,8 @@ class KeepClient(object):
         self.get_counter = Counter()
         self.hits_counter = Counter()
         self.misses_counter = Counter()
+        self._storage_classes_unsupported_warning = False
+        self._default_classes = []
 
         if local_store:
             self.local_store = local_store
@@ -880,6 +883,12 @@ class KeepClient(object):
                 self._writable_services = None
                 self.using_proxy = None
                 self._static_services_list = False
+                try:
+                    self._default_classes = [
+                        k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
+                except KeyError:
+                    # We're talking to an old cluster
+                    pass
 
     def current_timeout(self, attempt_number):
         """Return the appropriate timeout to use for this client.
@@ -1172,7 +1181,7 @@ class KeepClient(object):
                 "failed to read {} after {}".format(loc_s, loop.attempts_str()), service_errors, label="service")
 
     @retry.retry_method
-    def put(self, data, copies=2, num_retries=None, request_id=None, classes=[]):
+    def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
         """Save data in Keep.
 
         This method will get a list of Keep services from the API server, and
@@ -1193,6 +1202,8 @@ class KeepClient(object):
           be written.
         """
 
+        classes = classes or self._default_classes
+
         if not isinstance(data, bytes):
             data = data.encode()
 
@@ -1253,7 +1264,9 @@ class KeepClient(object):
                 # success is determined only by successful copies.
                 #
                 # Disable storage classes tracking from this point forward.
-                _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
+                if not self._storage_classes_unsupported_warning:
+                    self._storage_classes_unsupported_warning = True
+                    _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
                 done_classes = None
                 loop.save_result(
                     (done_copies >= copies, writer_pool.total_task_nr))