18941: Add --threads option to arv-get
[arvados.git] / sdk / python / arvados / keep.py
index 9dfe0436dec9bdf22eb71ad9bfe2e8a201ee3ab6..94104586deb46a4c24c05c41e683a50b28c69d1d 100644 (file)
@@ -376,6 +376,7 @@ class KeepClient(object):
                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
                     if self.insecure:
                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
+                        curl.setopt(pycurl.SSL_VERIFYHOST, 0)
                     else:
                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
                     if method == "HEAD":
@@ -478,6 +479,7 @@ class KeepClient(object):
                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
                     if self.insecure:
                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
+                        curl.setopt(pycurl.SSL_VERIFYHOST, 0)
                     else:
                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
                     self._setcurltimeouts(curl, timeout)
@@ -720,11 +722,11 @@ class KeepClient(object):
             result = service.last_result()
 
             if not success:
-                if result.get('status_code', None):
+                if result.get('status_code'):
                     _logger.debug("Request fail: PUT %s => %s %s",
                                   self.data_hash,
-                                  result['status_code'],
-                                  result['body'])
+                                  result.get('status_code'),
+                                  result.get('body'))
                 raise self.TaskFailed()
 
             _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
@@ -1034,14 +1036,19 @@ class KeepClient(object):
         else:
             return None
 
-    def get_from_cache(self, loc):
+    def get_from_cache(self, loc_s):
         """Fetch a block only if is in the cache, otherwise return None."""
-        slot = self.block_cache.get(loc)
+        locator = KeepLocator(loc_s)
+        slot = self.block_cache.get(locator.md5sum)
         if slot is not None and slot.ready.is_set():
             return slot.get()
         else:
             return None
 
+    def has_cache_slot(self, loc_s):
+        locator = KeepLocator(loc_s)
+        return self.block_cache.get(locator.md5sum) is not None
+
     def refresh_signature(self, loc):
         """Ask Keep to get the remote block and return its local signature"""
         now = datetime.datetime.utcnow().isoformat("T") + 'Z'
@@ -1080,6 +1087,13 @@ class KeepClient(object):
 
         self.get_counter.add(1)
 
+        request_id = (request_id or
+                      (hasattr(self, 'api_client') and self.api_client.request_id) or
+                      arvados.util.new_request_id())
+        if headers is None:
+            headers = {}
+        headers['X-Request-Id'] = request_id
+
         slot = None
         blob = None
         try:
@@ -1096,12 +1110,6 @@ class KeepClient(object):
 
             self.misses_counter.add(1)
 
-            if headers is None:
-                headers = {}
-            headers['X-Request-Id'] = (request_id or
-                                        (hasattr(self, 'api_client') and self.api_client.request_id) or
-                                        arvados.util.new_request_id())
-
             # If the locator has hints specifying a prefix (indicating a
             # remote keepproxy) or the UUID of a local gateway service,
             # read data from the indicated service(s) instead of the usual
@@ -1171,14 +1179,14 @@ class KeepClient(object):
                           for key in sorted_roots)
         if not roots_map:
             raise arvados.errors.KeepReadError(
-                "failed to read {}: no Keep services available ({})".format(
-                    loc_s, loop.last_result()))
+                "[{}] failed to read {}: no Keep services available ({})".format(
+                    request_id, loc_s, loop.last_result()))
         elif not_founds == len(sorted_roots):
             raise arvados.errors.NotFoundError(
-                "{} not found".format(loc_s), service_errors)
+                "[{}] {} not found".format(request_id, loc_s), service_errors)
         else:
             raise arvados.errors.KeepReadError(
-                "failed to read {} after {}".format(loc_s, loop.attempts_str()), service_errors, label="service")
+                "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
 
     @retry.retry_method
     def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
@@ -1215,10 +1223,11 @@ class KeepClient(object):
             return loc_s
         locator = KeepLocator(loc_s)
 
+        request_id = (request_id or
+                      (hasattr(self, 'api_client') and self.api_client.request_id) or
+                      arvados.util.new_request_id())
         headers = {
-            'X-Request-Id': (request_id or
-                             (hasattr(self, 'api_client') and self.api_client.request_id) or
-                             arvados.util.new_request_id()),
+            'X-Request-Id': request_id,
             'X-Keep-Desired-Replicas': str(copies),
         }
         roots_map = {}
@@ -1275,15 +1284,15 @@ class KeepClient(object):
             return writer_pool.response()
         if not roots_map:
             raise arvados.errors.KeepWriteError(
-                "failed to write {}: no Keep services available ({})".format(
-                    data_hash, loop.last_result()))
+                "[{}] failed to write {}: no Keep services available ({})".format(
+                    request_id, data_hash, loop.last_result()))
         else:
             service_errors = ((key, roots_map[key].last_result()['error'])
                               for key in sorted_roots
                               if roots_map[key].last_result()['error'])
             raise arvados.errors.KeepWriteError(
-                "failed to write {} after {} (wanted {} copies but wrote {})".format(
-                    data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
+                "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
+                    request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
 
     def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
         """A stub for put().
@@ -1328,6 +1337,3 @@ class KeepClient(object):
             return True
         if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
             return True
-
-    def is_cached(self, locator):
-        return self.block_cache.reserve_cache(expect_hash)