18890: when ARVADOS_API_HOST_INSECURE is set, also disable certificate
[arvados.git] / sdk / python / arvados / keep.py
index bc43b849c3a01dd661c4ba080d83f65f597adde6..1a83eae944c59f8dde5e3a7c63de8bbe9c62a9c9 100644 (file)
@@ -4,6 +4,7 @@
 
 from __future__ import absolute_import
 from __future__ import division
+import copy
 from future import standard_library
 from future.utils import native_str
 standard_library.install_aliases()
@@ -375,6 +376,7 @@ class KeepClient(object):
                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
                     if self.insecure:
                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
+                        curl.setopt(pycurl.SSL_VERIFYHOST, 0)
                     else:
                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
                     if method == "HEAD":
@@ -446,7 +448,9 @@ class KeepClient(object):
                 return None
             return self._result['body']
 
-        def put(self, hash_s, body, timeout=None):
+        def put(self, hash_s, body, timeout=None, headers={}):
+            put_headers = copy.copy(self.put_headers)
+            put_headers.update(headers)
             url = self.root + hash_s
             _logger.debug("Request: PUT %s", url)
             curl = self._get_user_agent()
@@ -470,11 +474,12 @@ class KeepClient(object):
                     curl.setopt(pycurl.INFILESIZE, len(body))
                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
                     curl.setopt(pycurl.HTTPHEADER, [
-                        '{}: {}'.format(k,v) for k,v in self.put_headers.items()])
+                        '{}: {}'.format(k,v) for k,v in put_headers.items()])
                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
                     if self.insecure:
                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
+                        curl.setopt(pycurl.SSL_VERIFYHOST, 0)
                     else:
                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
                     self._setcurltimeouts(curl, timeout)
@@ -559,18 +564,30 @@ class KeepClient(object):
 
 
     class KeepWriterQueue(queue.Queue):
-        def __init__(self, copies):
+        def __init__(self, copies, classes=[]):
             queue.Queue.__init__(self) # Old-style superclass
             self.wanted_copies = copies
+            self.wanted_storage_classes = classes
             self.successful_copies = 0
+            self.confirmed_storage_classes = {}
             self.response = None
-            self.successful_copies_lock = threading.Lock()
-            self.pending_tries = copies
+            self.storage_classes_tracking = True
+            self.queue_data_lock = threading.RLock()
+            self.pending_tries = max(copies, len(classes))
             self.pending_tries_notification = threading.Condition()
 
-        def write_success(self, response, replicas_nr):
-            with self.successful_copies_lock:
+        def write_success(self, response, replicas_nr, classes_confirmed):
+            with self.queue_data_lock:
                 self.successful_copies += replicas_nr
+                if classes_confirmed is None:
+                    self.storage_classes_tracking = False
+                elif self.storage_classes_tracking:
+                    for st_class, st_copies in classes_confirmed.items():
+                        try:
+                            self.confirmed_storage_classes[st_class] += st_copies
+                        except KeyError:
+                            self.confirmed_storage_classes[st_class] = st_copies
+                    self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
                 self.response = response
             with self.pending_tries_notification:
                 self.pending_tries_notification.notify_all()
@@ -581,13 +598,31 @@ class KeepClient(object):
                 self.pending_tries_notification.notify()
 
         def pending_copies(self):
-            with self.successful_copies_lock:
+            with self.queue_data_lock:
                 return self.wanted_copies - self.successful_copies
 
+        def satisfied_classes(self):
+            with self.queue_data_lock:
+                if not self.storage_classes_tracking:
+                    # Notifies disabled storage classes expectation to
+                    # the outer loop.
+                    return None
+            return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
+
+        def pending_classes(self):
+            with self.queue_data_lock:
+                if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
+                    return []
+                unsatisfied_classes = copy.copy(self.wanted_storage_classes)
+                for st_class, st_copies in self.confirmed_storage_classes.items():
+                    if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
+                        unsatisfied_classes.remove(st_class)
+                return unsatisfied_classes
+
         def get_next_task(self):
             with self.pending_tries_notification:
                 while True:
-                    if self.pending_copies() < 1:
+                    if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
                         # This notify_all() is unnecessary --
                         # write_success() already called notify_all()
                         # when pending<1 became true, so it's not
@@ -614,16 +649,15 @@ class KeepClient(object):
 
 
     class KeepWriterThreadPool(object):
-        def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None):
+        def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
             self.total_task_nr = 0
-            self.wanted_copies = copies
             if (not max_service_replicas) or (max_service_replicas >= copies):
                 num_threads = 1
             else:
                 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
             _logger.debug("Pool max threads is %d", num_threads)
             self.workers = []
-            self.queue = KeepClient.KeepWriterQueue(copies)
+            self.queue = KeepClient.KeepWriterQueue(copies, classes)
             # Create workers
             for _ in range(num_threads):
                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
@@ -634,7 +668,7 @@ class KeepClient(object):
             self.total_task_nr += 1
 
         def done(self):
-            return self.queue.successful_copies
+            return self.queue.successful_copies, self.queue.satisfied_classes()
 
         def join(self):
             # Start workers
@@ -648,7 +682,7 @@ class KeepClient(object):
 
 
     class KeepWriterThread(threading.Thread):
-        TaskFailed = RuntimeError()
+        class TaskFailed(RuntimeError): pass
 
         def __init__(self, queue, data, data_hash, timeout=None):
             super(KeepClient.KeepWriterThread, self).__init__()
@@ -665,29 +699,35 @@ class KeepClient(object):
                 except queue.Empty:
                     return
                 try:
-                    locator, copies = self.do_task(service, service_root)
+                    locator, copies, classes = self.do_task(service, service_root)
                 except Exception as e:
-                    if e is not self.TaskFailed:
+                    if not isinstance(e, self.TaskFailed):
                         _logger.exception("Exception in KeepWriterThread")
                     self.queue.write_fail(service)
                 else:
-                    self.queue.write_success(locator, copies)
+                    self.queue.write_success(locator, copies, classes)
                 finally:
                     self.queue.task_done()
 
         def do_task(self, service, service_root):
+            classes = self.queue.pending_classes()
+            headers = {}
+            if len(classes) > 0:
+                classes.sort()
+                headers['X-Keep-Storage-Classes'] = ', '.join(classes)
             success = bool(service.put(self.data_hash,
                                         self.data,
-                                        timeout=self.timeout))
+                                        timeout=self.timeout,
+                                        headers=headers))
             result = service.last_result()
 
             if not success:
-                if result.get('status_code', None):
+                if result.get('status_code'):
                     _logger.debug("Request fail: PUT %s => %s %s",
                                   self.data_hash,
-                                  result['status_code'],
-                                  result['body'])
-                raise self.TaskFailed
+                                  result.get('status_code'),
+                                  result.get('body'))
+                raise self.TaskFailed()
 
             _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
                           str(threading.current_thread()),
@@ -699,7 +739,18 @@ class KeepClient(object):
             except (KeyError, ValueError):
                 replicas_stored = 1
 
-            return result['body'].strip(), replicas_stored
+            classes_confirmed = {}
+            try:
+                scch = result['headers']['x-keep-storage-classes-confirmed']
+                for confirmation in scch.replace(' ', '').split(','):
+                    if '=' in confirmation:
+                        stored_class, stored_copies = confirmation.split('=')[:2]
+                        classes_confirmed[stored_class] = int(stored_copies)
+            except (KeyError, ValueError):
+                # Storage classes confirmed header missing or corrupt
+                classes_confirmed = None
+
+            return result['body'].strip(), replicas_stored, classes_confirmed
 
 
     def __init__(self, api_client=None, proxy=None,
@@ -792,6 +843,8 @@ class KeepClient(object):
         self.get_counter = Counter()
         self.hits_counter = Counter()
         self.misses_counter = Counter()
+        self._storage_classes_unsupported_warning = False
+        self._default_classes = []
 
         if local_store:
             self.local_store = local_store
@@ -832,6 +885,12 @@ class KeepClient(object):
                 self._writable_services = None
                 self.using_proxy = None
                 self._static_services_list = False
+                try:
+                    self._default_classes = [
+                        k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
+                except KeyError:
+                    # We're talking to an old cluster
+                    pass
 
     def current_timeout(self, attempt_number):
         """Return the appropriate timeout to use for this client.
@@ -1023,6 +1082,13 @@ class KeepClient(object):
 
         self.get_counter.add(1)
 
+        request_id = (request_id or
+                      (hasattr(self, 'api_client') and self.api_client.request_id) or
+                      arvados.util.new_request_id())
+        if headers is None:
+            headers = {}
+        headers['X-Request-Id'] = request_id
+
         slot = None
         blob = None
         try:
@@ -1039,12 +1105,6 @@ class KeepClient(object):
 
             self.misses_counter.add(1)
 
-            if headers is None:
-                headers = {}
-            headers['X-Request-Id'] = (request_id or
-                                        (hasattr(self, 'api_client') and self.api_client.request_id) or
-                                        arvados.util.new_request_id())
-
             # If the locator has hints specifying a prefix (indicating a
             # remote keepproxy) or the UUID of a local gateway service,
             # read data from the indicated service(s) instead of the usual
@@ -1114,17 +1174,17 @@ class KeepClient(object):
                           for key in sorted_roots)
         if not roots_map:
             raise arvados.errors.KeepReadError(
-                "failed to read {}: no Keep services available ({})".format(
-                    loc_s, loop.last_result()))
+                "[{}] failed to read {}: no Keep services available ({})".format(
+                    request_id, loc_s, loop.last_result()))
         elif not_founds == len(sorted_roots):
             raise arvados.errors.NotFoundError(
-                "{} not found".format(loc_s), service_errors)
+                "[{}] {} not found".format(request_id, loc_s), service_errors)
         else:
             raise arvados.errors.KeepReadError(
-                "failed to read {} after {}".format(loc_s, loop.attempts_str()), service_errors, label="service")
+                "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
 
     @retry.retry_method
-    def put(self, data, copies=2, num_retries=None, request_id=None):
+    def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
         """Save data in Keep.
 
         This method will get a list of Keep services from the API server, and
@@ -1141,8 +1201,12 @@ class KeepClient(object):
           *each* Keep server if it returns temporary failures, with
           exponential backoff.  The default value is set when the
           KeepClient is initialized.
+        * classes: An optional list of storage class names where copies should
+          be written.
         """
 
+        classes = classes or self._default_classes
+
         if not isinstance(data, bytes):
             data = data.encode()
 
@@ -1154,16 +1218,18 @@ class KeepClient(object):
             return loc_s
         locator = KeepLocator(loc_s)
 
+        request_id = (request_id or
+                      (hasattr(self, 'api_client') and self.api_client.request_id) or
+                      arvados.util.new_request_id())
         headers = {
-            'X-Request-Id': (request_id or
-                             (hasattr(self, 'api_client') and self.api_client.request_id) or
-                             arvados.util.new_request_id()),
+            'X-Request-Id': request_id,
             'X-Keep-Desired-Replicas': str(copies),
         }
         roots_map = {}
         loop = retry.RetryLoop(num_retries, self._check_loop_result,
                                backoff_start=2)
-        done = 0
+        done_copies = 0
+        done_classes = []
         for tries_left in loop:
             try:
                 sorted_roots = self.map_new_services(
@@ -1175,35 +1241,55 @@ class KeepClient(object):
                 loop.save_result(error)
                 continue
 
+            pending_classes = []
+            if done_classes is not None:
+                pending_classes = list(set(classes) - set(done_classes))
             writer_pool = KeepClient.KeepWriterThreadPool(data=data,
                                                         data_hash=data_hash,
-                                                        copies=copies - done,
+                                                        copies=copies - done_copies,
                                                         max_service_replicas=self.max_replicas_per_service,
-                                                        timeout=self.current_timeout(num_retries - tries_left))
+                                                        timeout=self.current_timeout(num_retries - tries_left),
+                                                        classes=pending_classes)
             for service_root, ks in [(root, roots_map[root])
                                      for root in sorted_roots]:
                 if ks.finished():
                     continue
                 writer_pool.add_task(ks, service_root)
             writer_pool.join()
-            done += writer_pool.done()
-            loop.save_result((done >= copies, writer_pool.total_task_nr))
+            pool_copies, pool_classes = writer_pool.done()
+            done_copies += pool_copies
+            if (done_classes is not None) and (pool_classes is not None):
+                done_classes += pool_classes
+                loop.save_result(
+                    (done_copies >= copies and set(done_classes) == set(classes),
+                    writer_pool.total_task_nr))
+            else:
+                # Old keepstore contacted without storage classes support:
+                # success is determined only by successful copies.
+                #
+                # Disable storage classes tracking from this point forward.
+                if not self._storage_classes_unsupported_warning:
+                    self._storage_classes_unsupported_warning = True
+                    _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
+                done_classes = None
+                loop.save_result(
+                    (done_copies >= copies, writer_pool.total_task_nr))
 
         if loop.success():
             return writer_pool.response()
         if not roots_map:
             raise arvados.errors.KeepWriteError(
-                "failed to write {}: no Keep services available ({})".format(
-                    data_hash, loop.last_result()))
+                "[{}] failed to write {}: no Keep services available ({})".format(
+                    request_id, data_hash, loop.last_result()))
         else:
             service_errors = ((key, roots_map[key].last_result()['error'])
                               for key in sorted_roots
                               if roots_map[key].last_result()['error'])
             raise arvados.errors.KeepWriteError(
-                "failed to write {} after {} (wanted {} copies but wrote {})".format(
-                    data_hash, loop.attempts_str(), copies, writer_pool.done()), service_errors, label="service")
+                "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
+                    request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
 
-    def local_store_put(self, data, copies=1, num_retries=None):
+    def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
         """A stub for put().
 
         This method is used in place of the real put() method when