Merge branch '17246-salt-install-improvements'
[arvados.git] / sdk / python / arvados / keep.py
index 71e101cf4c5073d40e78f73c0bf46a9ff231f937..bd0e5dc1e686befa94e815396bb5d5a208ca9c1d 100644 (file)
@@ -375,9 +375,11 @@ class KeepClient(object):
                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
                     if self.insecure:
                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
                     if self.insecure:
                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
+                    else:
+                        curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
                     if method == "HEAD":
                         curl.setopt(pycurl.NOBODY, True)
                     if method == "HEAD":
                         curl.setopt(pycurl.NOBODY, True)
-                    self._setcurltimeouts(curl, timeout)
+                    self._setcurltimeouts(curl, timeout, method=="HEAD")
 
                     try:
                         curl.perform()
 
                     try:
                         curl.perform()
@@ -421,6 +423,10 @@ class KeepClient(object):
                 _logger.info("HEAD %s: %s bytes",
                          self._result['status_code'],
                          self._result.get('content-length'))
                 _logger.info("HEAD %s: %s bytes",
                          self._result['status_code'],
                          self._result.get('content-length'))
+                if self._result['headers'].get('x-keep-locator'):
+                    # This is a response to a remote block copy request, return
+                    # the local copy block locator.
+                    return self._result['headers'].get('x-keep-locator')
                 return True
 
             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
                 return True
 
             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
@@ -469,6 +475,8 @@ class KeepClient(object):
                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
                     if self.insecure:
                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
                     if self.insecure:
                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
+                    else:
+                        curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
                     self._setcurltimeouts(curl, timeout)
                     try:
                         curl.perform()
                     self._setcurltimeouts(curl, timeout)
                     try:
                         curl.perform()
@@ -512,7 +520,7 @@ class KeepClient(object):
                 self.upload_counter.add(len(body))
             return True
 
                 self.upload_counter.add(len(body))
             return True
 
-        def _setcurltimeouts(self, curl, timeouts):
+        def _setcurltimeouts(self, curl, timeouts, ignore_bandwidth=False):
             if not timeouts:
                 return
             elif isinstance(timeouts, tuple):
             if not timeouts:
                 return
             elif isinstance(timeouts, tuple):
@@ -525,8 +533,9 @@ class KeepClient(object):
                 conn_t, xfer_t = (timeouts, timeouts)
                 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
                 conn_t, xfer_t = (timeouts, timeouts)
                 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
-            curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
-            curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
+            if not ignore_bandwidth:
+                curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
+                curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
 
         def _headerfunction(self, header_line):
             if isinstance(header_line, bytes):
 
         def _headerfunction(self, header_line):
             if isinstance(header_line, bytes):
@@ -639,7 +648,7 @@ class KeepClient(object):
 
 
     class KeepWriterThread(threading.Thread):
 
 
     class KeepWriterThread(threading.Thread):
-        TaskFailed = RuntimeError()
+        class TaskFailed(RuntimeError): pass
 
         def __init__(self, queue, data, data_hash, timeout=None):
             super(KeepClient.KeepWriterThread, self).__init__()
 
         def __init__(self, queue, data, data_hash, timeout=None):
             super(KeepClient.KeepWriterThread, self).__init__()
@@ -658,7 +667,7 @@ class KeepClient(object):
                 try:
                     locator, copies = self.do_task(service, service_root)
                 except Exception as e:
                 try:
                     locator, copies = self.do_task(service, service_root)
                 except Exception as e:
-                    if e is not self.TaskFailed:
+                    if not isinstance(e, self.TaskFailed):
                         _logger.exception("Exception in KeepWriterThread")
                     self.queue.write_fail(service)
                 else:
                         _logger.exception("Exception in KeepWriterThread")
                     self.queue.write_fail(service)
                 else:
@@ -678,7 +687,7 @@ class KeepClient(object):
                                   self.data_hash,
                                   result['status_code'],
                                   result['body'])
                                   self.data_hash,
                                   result['status_code'],
                                   result['body'])
-                raise self.TaskFailed
+                raise self.TaskFailed()
 
             _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
                           str(threading.current_thread()),
 
             _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
                           str(threading.current_thread()),
@@ -786,6 +795,7 @@ class KeepClient(object):
 
         if local_store:
             self.local_store = local_store
 
         if local_store:
             self.local_store = local_store
+            self.head = self.local_store_head
             self.get = self.local_store_get
             self.put = self.local_store_put
         else:
             self.get = self.local_store_get
             self.put = self.local_store_put
         else:
@@ -975,6 +985,11 @@ class KeepClient(object):
         else:
             return None
 
         else:
             return None
 
+    def refresh_signature(self, loc):
+        """Ask Keep to get the remote block and return its local signature"""
+        now = datetime.datetime.utcnow().isoformat("T") + 'Z'
+        return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
+
     @retry.retry_method
     def head(self, loc_s, **kwargs):
         return self._get_or_head(loc_s, method="HEAD", **kwargs)
     @retry.retry_method
     def head(self, loc_s, **kwargs):
         return self._get_or_head(loc_s, method="HEAD", **kwargs)
@@ -983,7 +998,7 @@ class KeepClient(object):
     def get(self, loc_s, **kwargs):
         return self._get_or_head(loc_s, method="GET", **kwargs)
 
     def get(self, loc_s, **kwargs):
         return self._get_or_head(loc_s, method="GET", **kwargs)
 
-    def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None):
+    def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None):
         """Get data from Keep.
 
         This method fetches one or more blocks of data from Keep.  It
         """Get data from Keep.
 
         This method fetches one or more blocks of data from Keep.  It
@@ -1024,11 +1039,11 @@ class KeepClient(object):
 
             self.misses_counter.add(1)
 
 
             self.misses_counter.add(1)
 
-            headers = {
-                'X-Request-Id': (request_id or
-                                 (hasattr(self, 'api_client') and self.api_client.request_id) or
-                                 arvados.util.new_request_id()),
-            }
+            if headers is None:
+                headers = {}
+            headers['X-Request-Id'] = (request_id or
+                                        (hasattr(self, 'api_client') and self.api_client.request_id) or
+                                        arvados.util.new_request_id())
 
             # If the locator has hints specifying a prefix (indicating a
             # remote keepproxy) or the UUID of a local gateway service,
 
             # If the locator has hints specifying a prefix (indicating a
             # remote keepproxy) or the UUID of a local gateway service,
@@ -1085,10 +1100,7 @@ class KeepClient(object):
 
             # Always cache the result, then return it if we succeeded.
             if loop.success():
 
             # Always cache the result, then return it if we succeeded.
             if loop.success():
-                if method == "HEAD":
-                    return True
-                else:
-                    return blob
+                return blob
         finally:
             if slot is not None:
                 slot.set(blob)
         finally:
             if slot is not None:
                 slot.set(blob)
@@ -1109,7 +1121,7 @@ class KeepClient(object):
                 "{} not found".format(loc_s), service_errors)
         else:
             raise arvados.errors.KeepReadError(
                 "{} not found".format(loc_s), service_errors)
         else:
             raise arvados.errors.KeepReadError(
-                "failed to read {}".format(loc_s), service_errors, label="service")
+                "failed to read {} after {}".format(loc_s, loop.attempts_str()), service_errors, label="service")
 
     @retry.retry_method
     def put(self, data, copies=2, num_retries=None, request_id=None):
 
     @retry.retry_method
     def put(self, data, copies=2, num_retries=None, request_id=None):
@@ -1188,8 +1200,8 @@ class KeepClient(object):
                               for key in sorted_roots
                               if roots_map[key].last_result()['error'])
             raise arvados.errors.KeepWriteError(
                               for key in sorted_roots
                               if roots_map[key].last_result()['error'])
             raise arvados.errors.KeepWriteError(
-                "failed to write {} (wanted {} copies but wrote {})".format(
-                    data_hash, copies, writer_pool.done()), service_errors, label="service")
+                "failed to write {} after {} (wanted {} copies but wrote {})".format(
+                    data_hash, loop.attempts_str(), copies, writer_pool.done()), service_errors, label="service")
 
     def local_store_put(self, data, copies=1, num_retries=None):
         """A stub for put().
 
     def local_store_put(self, data, copies=1, num_retries=None):
         """A stub for put().
@@ -1223,5 +1235,17 @@ class KeepClient(object):
         with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
             return f.read()
 
         with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
             return f.read()
 
+    def local_store_head(self, loc_s, num_retries=None):
+        """Companion to local_store_put()."""
+        try:
+            locator = KeepLocator(loc_s)
+        except ValueError:
+            raise arvados.errors.NotFoundError(
+                "Invalid data locator: '%s'" % loc_s)
+        if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
+            return True
+        if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
+            return True
+
     def is_cached(self, locator):
         return self.block_cache.reserve_cache(expect_hash)
     def is_cached(self, locator):
         return self.block_cache.reserve_cache(expect_hash)