Merge branch '16419-ssl-cert-file' refs #16419
[arvados.git] / sdk / python / arvados / keep.py
index 9618ee5ce9ac5551931093b24695514379e1376f..bc43b849c3a01dd661c4ba080d83f65f597adde6 100644 (file)
@@ -375,9 +375,11 @@ class KeepClient(object):
                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
                     if self.insecure:
                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
+                    else:
+                        curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
                     if method == "HEAD":
                         curl.setopt(pycurl.NOBODY, True)
-                    self._setcurltimeouts(curl, timeout)
+                    self._setcurltimeouts(curl, timeout, method=="HEAD")
 
                     try:
                         curl.perform()
@@ -473,6 +475,8 @@ class KeepClient(object):
                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
                     if self.insecure:
                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
+                    else:
+                        curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
                     self._setcurltimeouts(curl, timeout)
                     try:
                         curl.perform()
@@ -516,7 +520,7 @@ class KeepClient(object):
                 self.upload_counter.add(len(body))
             return True
 
-        def _setcurltimeouts(self, curl, timeouts):
+        def _setcurltimeouts(self, curl, timeouts, ignore_bandwidth=False):
             if not timeouts:
                 return
             elif isinstance(timeouts, tuple):
@@ -529,8 +533,9 @@ class KeepClient(object):
                 conn_t, xfer_t = (timeouts, timeouts)
                 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
-            curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
-            curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
+            if not ignore_bandwidth:
+                curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
+                curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
 
         def _headerfunction(self, header_line):
             if isinstance(header_line, bytes):
@@ -790,6 +795,7 @@ class KeepClient(object):
 
         if local_store:
             self.local_store = local_store
+            self.head = self.local_store_head
             self.get = self.local_store_get
             self.put = self.local_store_put
         else:
@@ -1094,10 +1100,7 @@ class KeepClient(object):
 
             # Always cache the result, then return it if we succeeded.
             if loop.success():
-                if method == "HEAD":
-                    return blob or True
-                else:
-                    return blob
+                return blob
         finally:
             if slot is not None:
                 slot.set(blob)
@@ -1118,7 +1121,7 @@ class KeepClient(object):
                 "{} not found".format(loc_s), service_errors)
         else:
             raise arvados.errors.KeepReadError(
-                "failed to read {}".format(loc_s), service_errors, label="service")
+                "failed to read {} after {}".format(loc_s, loop.attempts_str()), service_errors, label="service")
 
     @retry.retry_method
     def put(self, data, copies=2, num_retries=None, request_id=None):
@@ -1197,8 +1200,8 @@ class KeepClient(object):
                               for key in sorted_roots
                               if roots_map[key].last_result()['error'])
             raise arvados.errors.KeepWriteError(
-                "failed to write {} (wanted {} copies but wrote {})".format(
-                    data_hash, copies, writer_pool.done()), service_errors, label="service")
+                "failed to write {} after {} (wanted {} copies but wrote {})".format(
+                    data_hash, loop.attempts_str(), copies, writer_pool.done()), service_errors, label="service")
 
     def local_store_put(self, data, copies=1, num_retries=None):
         """A stub for put().
@@ -1232,5 +1235,17 @@ class KeepClient(object):
         with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
             return f.read()
 
+    def local_store_head(self, loc_s, num_retries=None):
+        """Companion to local_store_put()."""
+        try:
+            locator = KeepLocator(loc_s)
+        except ValueError:
+            raise arvados.errors.NotFoundError(
+                "Invalid data locator: '%s'" % loc_s)
+        if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
+            return True
+        if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
+            return True
+
     def is_cached(self, locator):
         return self.block_cache.reserve_cache(expect_hash)