17609: Merge branch 'master'
[arvados.git] / sdk / python / arvados / keep.py
index e3ef642c961184c7380a90196a19799c81ee48a9..86b1d91b8246ef20ed860cdb516ca5d4a53624b9 100644 (file)
@@ -570,8 +570,8 @@ class KeepClient(object):
             self.confirmed_storage_classes = {}
             self.response = None
             self.storage_classes_tracking = True
             self.confirmed_storage_classes = {}
             self.response = None
             self.storage_classes_tracking = True
-            self.queue_data_lock = threading.Lock()
-            self.pending_tries = max(copies, len(classes))+1
+            self.queue_data_lock = threading.RLock()
+            self.pending_tries = max(copies, len(classes))
             self.pending_tries_notification = threading.Condition()
 
         def write_success(self, response, replicas_nr, classes_confirmed):
             self.pending_tries_notification = threading.Condition()
 
         def write_success(self, response, replicas_nr, classes_confirmed):
@@ -585,6 +585,7 @@ class KeepClient(object):
                             self.confirmed_storage_classes[st_class] += st_copies
                         except KeyError:
                             self.confirmed_storage_classes[st_class] = st_copies
                             self.confirmed_storage_classes[st_class] += st_copies
                         except KeyError:
                             self.confirmed_storage_classes[st_class] = st_copies
+                    self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
                 self.response = response
             with self.pending_tries_notification:
                 self.pending_tries_notification.notify_all()
                 self.response = response
             with self.pending_tries_notification:
                 self.pending_tries_notification.notify_all()
@@ -840,6 +841,7 @@ class KeepClient(object):
         self.get_counter = Counter()
         self.hits_counter = Counter()
         self.misses_counter = Counter()
         self.get_counter = Counter()
         self.hits_counter = Counter()
         self.misses_counter = Counter()
+        self._storage_classes_unsupported_warning = False
 
         if local_store:
             self.local_store = local_store
 
         if local_store:
             self.local_store = local_store
@@ -1253,7 +1255,9 @@ class KeepClient(object):
                 # success is determined only by successful copies.
                 #
                 # Disable storage classes tracking from this point forward.
                 # success is determined only by successful copies.
                 #
                 # Disable storage classes tracking from this point forward.
-                _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
+                if not self._storage_classes_unsupported_warning:
+                    self._storage_classes_unsupported_warning = True
+                    _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
                 done_classes = None
                 loop.save_result(
                     (done_copies >= copies, writer_pool.total_task_nr))
                 done_classes = None
                 loop.save_result(
                     (done_copies >= copies, writer_pool.total_task_nr))
@@ -1272,7 +1276,7 @@ class KeepClient(object):
                 "failed to write {} after {} (wanted {} copies but wrote {})".format(
                     data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
 
                 "failed to write {} after {} (wanted {} copies but wrote {})".format(
                     data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
 
-    def local_store_put(self, data, copies=1, num_retries=None):
+    def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
         """A stub for put().
 
         This method is used in place of the real put() method when
         """A stub for put().
 
         This method is used in place of the real put() method when