Merge branch '18027-unmount-fuse'
[arvados.git] / sdk / python / arvados / keep.py
index eac25e9d3f65dc57f1e9e9d1fbd3734152800814..9dfe0436dec9bdf22eb71ad9bfe2e8a201ee3ab6 100644 (file)
@@ -4,6 +4,7 @@
 
 from __future__ import absolute_import
 from __future__ import division
 
 from __future__ import absolute_import
 from __future__ import division
+import copy
 from future import standard_library
 from future.utils import native_str
 standard_library.install_aliases()
 from future import standard_library
 from future.utils import native_str
 standard_library.install_aliases()
@@ -446,7 +447,9 @@ class KeepClient(object):
                 return None
             return self._result['body']
 
                 return None
             return self._result['body']
 
-        def put(self, hash_s, body, timeout=None):
+        def put(self, hash_s, body, timeout=None, headers={}):
+            put_headers = copy.copy(self.put_headers)
+            put_headers.update(headers)
             url = self.root + hash_s
             _logger.debug("Request: PUT %s", url)
             curl = self._get_user_agent()
             url = self.root + hash_s
             _logger.debug("Request: PUT %s", url)
             curl = self._get_user_agent()
@@ -470,7 +473,7 @@ class KeepClient(object):
                     curl.setopt(pycurl.INFILESIZE, len(body))
                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
                     curl.setopt(pycurl.HTTPHEADER, [
                     curl.setopt(pycurl.INFILESIZE, len(body))
                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
                     curl.setopt(pycurl.HTTPHEADER, [
-                        '{}: {}'.format(k,v) for k,v in self.put_headers.items()])
+                        '{}: {}'.format(k,v) for k,v in put_headers.items()])
                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
                     if self.insecure:
                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
                     if self.insecure:
@@ -566,18 +569,23 @@ class KeepClient(object):
             self.successful_copies = 0
             self.confirmed_storage_classes = {}
             self.response = None
             self.successful_copies = 0
             self.confirmed_storage_classes = {}
             self.response = None
-            self.queue_data_lock = threading.Lock()
-            self.pending_tries = copies
+            self.storage_classes_tracking = True
+            self.queue_data_lock = threading.RLock()
+            self.pending_tries = max(copies, len(classes))
             self.pending_tries_notification = threading.Condition()
 
         def write_success(self, response, replicas_nr, classes_confirmed):
             with self.queue_data_lock:
                 self.successful_copies += replicas_nr
             self.pending_tries_notification = threading.Condition()
 
         def write_success(self, response, replicas_nr, classes_confirmed):
             with self.queue_data_lock:
                 self.successful_copies += replicas_nr
-                for st_class, st_copies in classes_confirmed.items():
-                    try:
-                        self.confirmed_storage_classes[st_class] += st_copies
-                    except KeyError:
-                        self.confirmed_storage_classes[st_class] = st_copies
+                if classes_confirmed is None:
+                    self.storage_classes_tracking = False
+                elif self.storage_classes_tracking:
+                    for st_class, st_copies in classes_confirmed.items():
+                        try:
+                            self.confirmed_storage_classes[st_class] += st_copies
+                        except KeyError:
+                            self.confirmed_storage_classes[st_class] = st_copies
+                    self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
                 self.response = response
             with self.pending_tries_notification:
                 self.pending_tries_notification.notify_all()
                 self.response = response
             with self.pending_tries_notification:
                 self.pending_tries_notification.notify_all()
@@ -591,12 +599,22 @@ class KeepClient(object):
             with self.queue_data_lock:
                 return self.wanted_copies - self.successful_copies
 
             with self.queue_data_lock:
                 return self.wanted_copies - self.successful_copies
 
+        def satisfied_classes(self):
+            with self.queue_data_lock:
+                if not self.storage_classes_tracking:
+                    # Notifies disabled storage classes expectation to
+                    # the outer loop.
+                    return None
+            return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
+
         def pending_classes(self):
             with self.queue_data_lock:
         def pending_classes(self):
             with self.queue_data_lock:
-                unsatisfied_classes = []
+                if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
+                    return []
+                unsatisfied_classes = copy.copy(self.wanted_storage_classes)
                 for st_class, st_copies in self.confirmed_storage_classes.items():
                 for st_class, st_copies in self.confirmed_storage_classes.items():
-                    if st_class in self.wanted_storage_classes and st_copies < self.wanted_copies:
-                        unsatisfied_classes.append(st_class)
+                    if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
+                        unsatisfied_classes.remove(st_class)
                 return unsatisfied_classes
 
         def get_next_task(self):
                 return unsatisfied_classes
 
         def get_next_task(self):
@@ -614,7 +632,7 @@ class KeepClient(object):
                         while True:
                             self.get_nowait()
                             self.task_done()
                         while True:
                             self.get_nowait()
                             self.task_done()
-                    elif self.pending_tries > 0 or len(self.pending_classes()) > 0:
+                    elif self.pending_tries > 0:
                         service, service_root = self.get_nowait()
                         if service.finished():
                             self.task_done()
                         service, service_root = self.get_nowait()
                         if service.finished():
                             self.task_done()
@@ -648,7 +666,7 @@ class KeepClient(object):
             self.total_task_nr += 1
 
         def done(self):
             self.total_task_nr += 1
 
         def done(self):
-            return self.queue.successful_copies
+            return self.queue.successful_copies, self.queue.satisfied_classes()
 
         def join(self):
             # Start workers
 
         def join(self):
             # Start workers
@@ -690,9 +708,15 @@ class KeepClient(object):
                     self.queue.task_done()
 
         def do_task(self, service, service_root):
                     self.queue.task_done()
 
         def do_task(self, service, service_root):
+            classes = self.queue.pending_classes()
+            headers = {}
+            if len(classes) > 0:
+                classes.sort()
+                headers['X-Keep-Storage-Classes'] = ', '.join(classes)
             success = bool(service.put(self.data_hash,
                                         self.data,
             success = bool(service.put(self.data_hash,
                                         self.data,
-                                        timeout=self.timeout))
+                                        timeout=self.timeout,
+                                        headers=headers))
             result = service.last_result()
 
             if not success:
             result = service.last_result()
 
             if not success:
@@ -722,7 +746,7 @@ class KeepClient(object):
                         classes_confirmed[stored_class] = int(stored_copies)
             except (KeyError, ValueError):
                 # Storage classes confirmed header missing or corrupt
                         classes_confirmed[stored_class] = int(stored_copies)
             except (KeyError, ValueError):
                 # Storage classes confirmed header missing or corrupt
-                classes_confirmed = {}
+                classes_confirmed = None
 
             return result['body'].strip(), replicas_stored, classes_confirmed
 
 
             return result['body'].strip(), replicas_stored, classes_confirmed
 
@@ -817,6 +841,8 @@ class KeepClient(object):
         self.get_counter = Counter()
         self.hits_counter = Counter()
         self.misses_counter = Counter()
         self.get_counter = Counter()
         self.hits_counter = Counter()
         self.misses_counter = Counter()
+        self._storage_classes_unsupported_warning = False
+        self._default_classes = []
 
         if local_store:
             self.local_store = local_store
 
         if local_store:
             self.local_store = local_store
@@ -857,6 +883,12 @@ class KeepClient(object):
                 self._writable_services = None
                 self.using_proxy = None
                 self._static_services_list = False
                 self._writable_services = None
                 self.using_proxy = None
                 self._static_services_list = False
+                try:
+                    self._default_classes = [
+                        k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
+                except KeyError:
+                    # We're talking to an old cluster
+                    pass
 
     def current_timeout(self, attempt_number):
         """Return the appropriate timeout to use for this client.
 
     def current_timeout(self, attempt_number):
         """Return the appropriate timeout to use for this client.
@@ -1149,7 +1181,7 @@ class KeepClient(object):
                 "failed to read {} after {}".format(loc_s, loop.attempts_str()), service_errors, label="service")
 
     @retry.retry_method
                 "failed to read {} after {}".format(loc_s, loop.attempts_str()), service_errors, label="service")
 
     @retry.retry_method
-    def put(self, data, copies=2, num_retries=None, request_id=None, classes=[]):
+    def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
         """Save data in Keep.
 
         This method will get a list of Keep services from the API server, and
         """Save data in Keep.
 
         This method will get a list of Keep services from the API server, and
@@ -1170,6 +1202,8 @@ class KeepClient(object):
           be written.
         """
 
           be written.
         """
 
+        classes = classes or self._default_classes
+
         if not isinstance(data, bytes):
             data = data.encode()
 
         if not isinstance(data, bytes):
             data = data.encode()
 
@@ -1187,12 +1221,11 @@ class KeepClient(object):
                              arvados.util.new_request_id()),
             'X-Keep-Desired-Replicas': str(copies),
         }
                              arvados.util.new_request_id()),
             'X-Keep-Desired-Replicas': str(copies),
         }
-        if len(classes) > 0:
-            headers['X-Keep-Storage-Classes'] = ', '.join(classes)
         roots_map = {}
         loop = retry.RetryLoop(num_retries, self._check_loop_result,
                                backoff_start=2)
         roots_map = {}
         loop = retry.RetryLoop(num_retries, self._check_loop_result,
                                backoff_start=2)
-        done = 0
+        done_copies = 0
+        done_classes = []
         for tries_left in loop:
             try:
                 sorted_roots = self.map_new_services(
         for tries_left in loop:
             try:
                 sorted_roots = self.map_new_services(
@@ -1204,20 +1237,39 @@ class KeepClient(object):
                 loop.save_result(error)
                 continue
 
                 loop.save_result(error)
                 continue
 
+            pending_classes = []
+            if done_classes is not None:
+                pending_classes = list(set(classes) - set(done_classes))
             writer_pool = KeepClient.KeepWriterThreadPool(data=data,
                                                         data_hash=data_hash,
             writer_pool = KeepClient.KeepWriterThreadPool(data=data,
                                                         data_hash=data_hash,
-                                                        copies=copies - done,
+                                                        copies=copies - done_copies,
                                                         max_service_replicas=self.max_replicas_per_service,
                                                         timeout=self.current_timeout(num_retries - tries_left),
                                                         max_service_replicas=self.max_replicas_per_service,
                                                         timeout=self.current_timeout(num_retries - tries_left),
-                                                        classes=classes)
+                                                        classes=pending_classes)
             for service_root, ks in [(root, roots_map[root])
                                      for root in sorted_roots]:
                 if ks.finished():
                     continue
                 writer_pool.add_task(ks, service_root)
             writer_pool.join()
             for service_root, ks in [(root, roots_map[root])
                                      for root in sorted_roots]:
                 if ks.finished():
                     continue
                 writer_pool.add_task(ks, service_root)
             writer_pool.join()
-            done += writer_pool.done()
-            loop.save_result((done >= copies, writer_pool.total_task_nr))
+            pool_copies, pool_classes = writer_pool.done()
+            done_copies += pool_copies
+            if (done_classes is not None) and (pool_classes is not None):
+                done_classes += pool_classes
+                loop.save_result(
+                    (done_copies >= copies and set(done_classes) == set(classes),
+                    writer_pool.total_task_nr))
+            else:
+                # Old keepstore contacted without storage classes support:
+                # success is determined only by successful copies.
+                #
+                # Disable storage classes tracking from this point forward.
+                if not self._storage_classes_unsupported_warning:
+                    self._storage_classes_unsupported_warning = True
+                    _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
+                done_classes = None
+                loop.save_result(
+                    (done_copies >= copies, writer_pool.total_task_nr))
 
         if loop.success():
             return writer_pool.response()
 
         if loop.success():
             return writer_pool.response()
@@ -1231,9 +1283,9 @@ class KeepClient(object):
                               if roots_map[key].last_result()['error'])
             raise arvados.errors.KeepWriteError(
                 "failed to write {} after {} (wanted {} copies but wrote {})".format(
                               if roots_map[key].last_result()['error'])
             raise arvados.errors.KeepWriteError(
                 "failed to write {} after {} (wanted {} copies but wrote {})".format(
-                    data_hash, loop.attempts_str(), copies, writer_pool.done()), service_errors, label="service")
+                    data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
 
 
-    def local_store_put(self, data, copies=1, num_retries=None):
+    def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
         """A stub for put().
 
         This method is used in place of the real put() method when
         """A stub for put().
 
         This method is used in place of the real put() method when