17609: Merge branch 'master'
[arvados.git] / sdk / python / arvados / keep.py
index b11578f4cc02e41573c64764eb828856db6bd1e5..86b1d91b8246ef20ed860cdb516ca5d4a53624b9 100644 (file)
@@ -570,7 +570,7 @@ class KeepClient(object):
             self.confirmed_storage_classes = {}
             self.response = None
             self.storage_classes_tracking = True
-            self.queue_data_lock = threading.Lock()
+            self.queue_data_lock = threading.RLock()
             self.pending_tries = max(copies, len(classes))
             self.pending_tries_notification = threading.Condition()
 
@@ -585,6 +585,7 @@ class KeepClient(object):
                             self.confirmed_storage_classes[st_class] += st_copies
                         except KeyError:
                             self.confirmed_storage_classes[st_class] = st_copies
+                    self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
                 self.response = response
             with self.pending_tries_notification:
                 self.pending_tries_notification.notify_all()
@@ -608,7 +609,7 @@ class KeepClient(object):
 
         def pending_classes(self):
             with self.queue_data_lock:
-                if self.wanted_storage_classes is None:
+                if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
                     return []
                 unsatisfied_classes = copy.copy(self.wanted_storage_classes)
                 for st_class, st_copies in self.confirmed_storage_classes.items():
@@ -710,6 +711,7 @@ class KeepClient(object):
             classes = self.queue.pending_classes()
             headers = {}
             if len(classes) > 0:
+                classes.sort()
                 headers['X-Keep-Storage-Classes'] = ', '.join(classes)
             success = bool(service.put(self.data_hash,
                                         self.data,
@@ -839,6 +841,7 @@ class KeepClient(object):
         self.get_counter = Counter()
         self.hits_counter = Counter()
         self.misses_counter = Counter()
+        self._storage_classes_unsupported_warning = False
 
         if local_store:
             self.local_store = local_store
@@ -1252,7 +1255,9 @@ class KeepClient(object):
                 # success is determined only by successful copies.
                 #
                 # Disable storage classes tracking from this point forward.
-                _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
+                if not self._storage_classes_unsupported_warning:
+                    self._storage_classes_unsupported_warning = True
+                    _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
                 done_classes = None
                 loop.save_result(
                     (done_copies >= copies, writer_pool.total_task_nr))
@@ -1271,7 +1276,7 @@ class KeepClient(object):
                 "failed to write {} after {} (wanted {} copies but wrote {})".format(
                     data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
 
-    def local_store_put(self, data, copies=1, num_retries=None):
+    def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
         """A stub for put().
 
         This method is used in place of the real put() method when