5305: Add heuristics to choose name when collection is referenced by PDH instead...
[arvados.git] / sdk / python / arvados / keep.py
index 4c288f94d60488aea4102e691ac16403a9de4b9f..71dc7ce7af863f60fb9e741b7bbc1231c22b146b 100644 (file)
@@ -1,6 +1,4 @@
 import gflags
 import gflags
-import httplib
-import httplib2
 import logging
 import os
 import pprint
 import logging
 import os
 import pprint
@@ -21,6 +19,7 @@ import timer
 import datetime
 import ssl
 import socket
 import datetime
 import ssl
 import socket
+import requests
 
 import arvados
 import arvados.config as config
 
 import arvados
 import arvados.config as config
@@ -210,6 +209,14 @@ class KeepBlockCache(object):
             self._cache_lock.release()
 
 class KeepClient(object):
             self._cache_lock.release()
 
 class KeepClient(object):
+
+    # Default Keep server connection timeout:  2 seconds
+    # Default Keep server read timeout:      300 seconds
+    # Default Keep proxy connection timeout:  20 seconds
+    # Default Keep proxy read timeout:       300 seconds
+    DEFAULT_TIMEOUT = (2, 300)
+    DEFAULT_PROXY_TIMEOUT = (20, 300)
+
     class ThreadLimiter(object):
         """
         Limit the number of threads running at a given time to
     class ThreadLimiter(object):
         """
         Limit the number of threads running at a given time to
@@ -269,7 +276,7 @@ class KeepClient(object):
 
     class KeepService(object):
         # Make requests to a single Keep service, and track results.
 
     class KeepService(object):
         # Make requests to a single Keep service, and track results.
-        HTTP_ERRORS = (httplib2.HttpLib2Error, httplib.HTTPException,
+        HTTP_ERRORS = (requests.exceptions.RequestException,
                        socket.error, ssl.SSLError)
 
         def __init__(self, root, **headers):
                        socket.error, ssl.SSLError)
 
         def __init__(self, root, **headers):
@@ -288,19 +295,19 @@ class KeepClient(object):
 
         def last_status(self):
             try:
 
         def last_status(self):
             try:
-                return int(self.last_result[0].status)
-            except (AttributeError, IndexError, ValueError, TypeError):
+                return self.last_result.status_code
+            except AttributeError:
                 return None
 
                 return None
 
-        def get(self, http, locator):
-            # http is an httplib2.Http object.
+        def get(self, locator, timeout=None):
             # locator is a KeepLocator object.
             url = self.root + str(locator)
             _logger.debug("Request: GET %s", url)
             try:
                 with timer.Timer() as t:
             # locator is a KeepLocator object.
             url = self.root + str(locator)
             _logger.debug("Request: GET %s", url)
             try:
                 with timer.Timer() as t:
-                    result = http.request(url.encode('utf-8'), 'GET',
-                                          headers=self.get_headers)
+                    result = requests.get(url.encode('utf-8'),
+                                          headers=self.get_headers,
+                                          timeout=timeout)
             except self.HTTP_ERRORS as e:
                 _logger.debug("Request fail: GET %s => %s: %s",
                               url, type(e), str(e))
             except self.HTTP_ERRORS as e:
                 _logger.debug("Request fail: GET %s => %s: %s",
                               url, type(e), str(e))
@@ -308,7 +315,7 @@ class KeepClient(object):
             else:
                 self.last_result = result
                 self.success_flag = retry.check_http_response_success(result)
             else:
                 self.last_result = result
                 self.success_flag = retry.check_http_response_success(result)
-                content = result[1]
+                content = result.content
                 _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
                              self.last_status(), len(content), t.msecs,
                              (len(content)/(1024.0*1024))/t.secs)
                 _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
                              self.last_status(), len(content), t.msecs,
                              (len(content)/(1024.0*1024))/t.secs)
@@ -320,12 +327,14 @@ class KeepClient(object):
                                     url, resp_md5)
             return None
 
                                     url, resp_md5)
             return None
 
-        def put(self, http, hash_s, body):
+        def put(self, hash_s, body, timeout=None):
             url = self.root + hash_s
             _logger.debug("Request: PUT %s", url)
             try:
             url = self.root + hash_s
             _logger.debug("Request: PUT %s", url)
             try:
-                result = http.request(url.encode('utf-8'), 'PUT',
-                                      headers=self.put_headers, body=body)
+                result = requests.put(url.encode('utf-8'),
+                                      data=body,
+                                      headers=self.put_headers,
+                                      timeout=timeout)
             except self.HTTP_ERRORS as e:
                 _logger.debug("Request fail: PUT %s => %s: %s",
                               url, type(e), str(e))
             except self.HTTP_ERRORS as e:
                 _logger.debug("Request fail: PUT %s => %s: %s",
                               url, type(e), str(e))
@@ -366,12 +375,13 @@ class KeepClient(object):
                           str(threading.current_thread()),
                           self.args['data_hash'],
                           self.args['service_root'])
                           str(threading.current_thread()),
                           self.args['data_hash'],
                           self.args['service_root'])
-            h = httplib2.Http(timeout=self.args.get('timeout', None))
             self._success = bool(self.service.put(
             self._success = bool(self.service.put(
-                    h, self.args['data_hash'], self.args['data']))
+                self.args['data_hash'],
+                self.args['data'],
+                timeout=self.args.get('timeout', None)))
             status = self.service.last_status()
             if self._success:
             status = self.service.last_status()
             if self._success:
-                resp, body = self.service.last_result
+                result = self.service.last_result
                 _logger.debug("KeepWriterThread %s succeeded %s %s",
                               str(threading.current_thread()),
                               self.args['data_hash'],
                 _logger.debug("KeepWriterThread %s succeeded %s %s",
                               str(threading.current_thread()),
                               self.args['data_hash'],
@@ -381,17 +391,18 @@ class KeepClient(object):
                 # we're talking to a proxy or other backend that
                 # stores to multiple copies for us.
                 try:
                 # we're talking to a proxy or other backend that
                 # stores to multiple copies for us.
                 try:
-                    replicas_stored = int(resp['x-keep-replicas-stored'])
+                    replicas_stored = int(result.headers['x-keep-replicas-stored'])
                 except (KeyError, ValueError):
                     replicas_stored = 1
                 except (KeyError, ValueError):
                     replicas_stored = 1
-                limiter.save_response(body.strip(), replicas_stored)
+                limiter.save_response(result.content.strip(), replicas_stored)
             elif status is not None:
                 _logger.debug("Request fail: PUT %s => %s %s",
                               self.args['data_hash'], status,
             elif status is not None:
                 _logger.debug("Request fail: PUT %s => %s %s",
                               self.args['data_hash'], status,
-                              self.service.last_result[1])
+                              self.service.last_result.content)
 
 
 
 
-    def __init__(self, api_client=None, proxy=None, timeout=300,
+    def __init__(self, api_client=None, proxy=None,
+                 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
                  api_token=None, local_store=None, block_cache=None,
                  num_retries=0):
         """Initialize a new KeepClient.
                  api_token=None, local_store=None, block_cache=None,
                  num_retries=0):
         """Initialize a new KeepClient.
@@ -404,8 +415,14 @@ class KeepClient(object):
           Keep proxy.  Otherwise, KeepClient will fall back to the setting
           of the ARVADOS_KEEP_PROXY configuration setting.  If you want to
           ensure KeepClient does not use a proxy, pass in an empty string.
           Keep proxy.  Otherwise, KeepClient will fall back to the setting
           of the ARVADOS_KEEP_PROXY configuration setting.  If you want to
           ensure KeepClient does not use a proxy, pass in an empty string.
-        * timeout: The timeout for all HTTP requests, in seconds.  Default
-          300.
+        * timeout: The timeout (in seconds) for HTTP requests to Keep
+          non-proxy servers.  A tuple of two floats is interpreted as
+          (connection_timeout, read_timeout): see
+          http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
+          Default: (2, 300).
+        * proxy_timeout: The timeout (in seconds) for HTTP requests to
+          Keep proxies. A tuple of two floats is interpreted as
+          (connection_timeout, read_timeout). Default: (20, 300).
         * api_token: If you're not using an API client, but only talking
           directly to a Keep proxy, this parameter specifies an API token
           to authenticate Keep requests.  It is an error to specify both
         * api_token: If you're not using an API client, but only talking
           directly to a Keep proxy, this parameter specifies an API token
           to authenticate Keep requests.  It is an error to specify both
@@ -436,13 +453,14 @@ class KeepClient(object):
             local_store = os.environ.get('KEEP_LOCAL_STORE')
 
         self.block_cache = block_cache if block_cache else KeepBlockCache()
             local_store = os.environ.get('KEEP_LOCAL_STORE')
 
         self.block_cache = block_cache if block_cache else KeepBlockCache()
+        self.timeout = timeout
+        self.proxy_timeout = proxy_timeout
 
         if local_store:
             self.local_store = local_store
             self.get = self.local_store_get
             self.put = self.local_store_put
         else:
 
         if local_store:
             self.local_store = local_store
             self.get = self.local_store_get
             self.put = self.local_store_put
         else:
-            self.timeout = timeout
             self.num_retries = num_retries
             if proxy:
                 if not proxy.endswith('/'):
             self.num_retries = num_retries
             if proxy:
                 if not proxy.endswith('/'):
@@ -465,6 +483,15 @@ class KeepClient(object):
                 self.using_proxy = None
                 self._static_services_list = False
 
                 self.using_proxy = None
                 self._static_services_list = False
 
+    def current_timeout(self):
+        """Return the appropriate timeout to use for this client: the proxy
+        timeout setting if the backend service is currently a proxy,
+        the regular timeout setting otherwise.
+        """
+        # TODO(twp): the timeout should be a property of a
+        # KeepService, not a KeepClient. See #4488.
+        return self.proxy_timeout if self.using_proxy else self.timeout
+
     def build_services_list(self, force_rebuild=False):
         if (self._static_services_list or
               (self._keep_services and not force_rebuild)):
     def build_services_list(self, force_rebuild=False):
         if (self._static_services_list or
               (self._keep_services and not force_rebuild)):
@@ -490,16 +517,16 @@ class KeepClient(object):
                     r['service_port'])
             _logger.debug(str(self._keep_services))
 
                     r['service_port'])
             _logger.debug(str(self._keep_services))
 
-    def _service_weight(self, hash, service_uuid):
+    def _service_weight(self, data_hash, service_uuid):
         """Compute the weight of a Keep service endpoint for a data
         block with a known hash.
 
         The weight is md5(h + u) where u is the last 15 characters of
         the service endpoint's UUID.
         """
         """Compute the weight of a Keep service endpoint for a data
         block with a known hash.
 
         The weight is md5(h + u) where u is the last 15 characters of
         the service endpoint's UUID.
         """
-        return hashlib.md5(hash + service_uuid[-15:]).hexdigest()
+        return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
 
 
-    def weighted_service_roots(self, hash, force_rebuild=False):
+    def weighted_service_roots(self, data_hash, force_rebuild=False):
         """Return an array of Keep service endpoints, in the order in
         which they should be probed when reading or writing data with
         the given hash.
         """Return an array of Keep service endpoints, in the order in
         which they should be probed when reading or writing data with
         the given hash.
@@ -507,14 +534,14 @@ class KeepClient(object):
         self.build_services_list(force_rebuild)
 
         # Sort the available services by weight (heaviest first) for
         self.build_services_list(force_rebuild)
 
         # Sort the available services by weight (heaviest first) for
-        # this hash, and return their service_roots (base URIs) in
-        # that order.
+        # this data_hash, and return their service_roots (base URIs)
+        # in that order.
         sorted_roots = [
             svc['_service_root'] for svc in sorted(
                 self._keep_services,
                 reverse=True,
         sorted_roots = [
             svc['_service_root'] for svc in sorted(
                 self._keep_services,
                 reverse=True,
-                key=lambda svc: self._service_weight(hash, svc['uuid']))]
-        _logger.debug(hash + ': ' + str(sorted_roots))
+                key=lambda svc: self._service_weight(data_hash, svc['uuid']))]
+        _logger.debug(data_hash + ': ' + str(sorted_roots))
         return sorted_roots
 
     def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
         return sorted_roots
 
     def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
@@ -604,9 +631,8 @@ class KeepClient(object):
             services_to_try = [roots_map[root]
                                for root in (local_roots + hint_roots)
                                if roots_map[root].usable()]
             services_to_try = [roots_map[root]
                                for root in (local_roots + hint_roots)
                                if roots_map[root].usable()]
-            http = httplib2.Http(timeout=self.timeout)
             for keep_service in services_to_try:
             for keep_service in services_to_try:
-                blob = keep_service.get(http, locator)
+                blob = keep_service.get(locator, timeout=self.current_timeout())
                 if blob is not None:
                     break
             loop.save_result((blob, len(services_to_try)))
                 if blob is not None:
                     break
             loop.save_result((blob, len(services_to_try)))
@@ -617,17 +643,27 @@ class KeepClient(object):
         if loop.success():
             return blob
 
         if loop.success():
             return blob
 
-        # No servers fulfilled the request.  Count how many responded
-        # "not found;" if the ratio is high enough (currently 75%), report
-        # Not Found; otherwise a generic error.
+        try:
+            all_roots = local_roots + hint_roots
+        except NameError:
+            # We never successfully fetched local_roots.
+            all_roots = hint_roots
         # Q: Including 403 is necessary for the Keep tests to continue
         # passing, but maybe they should expect KeepReadError instead?
         # Q: Including 403 is necessary for the Keep tests to continue
         # passing, but maybe they should expect KeepReadError instead?
-        not_founds = sum(1 for ks in roots_map.values()
-                         if ks.last_status() in set([403, 404, 410]))
-        if roots_map and ((float(not_founds) / len(roots_map)) >= .75):
-            raise arvados.errors.NotFoundError(loc_s)
+        not_founds = sum(1 for key in all_roots
+                         if roots_map[key].last_status() in {403, 404, 410})
+        service_errors = ((key, roots_map[key].last_result)
+                          for key in all_roots)
+        if not roots_map:
+            raise arvados.errors.KeepReadError(
+                "failed to read {}: no Keep services available ({})".format(
+                    loc_s, loop.last_result()))
+        elif not_founds == len(all_roots):
+            raise arvados.errors.NotFoundError(
+                "{} not found".format(loc_s), service_errors)
         else:
         else:
-            raise arvados.errors.KeepReadError(loc_s)
+            raise arvados.errors.KeepReadError(
+                "failed to read {}".format(loc_s), service_errors)
 
     @retry.retry_method
     def put(self, data, copies=2, num_retries=None):
 
     @retry.retry_method
     def put(self, data, copies=2, num_retries=None):
@@ -648,6 +684,12 @@ class KeepClient(object):
           exponential backoff.  The default value is set when the
           KeepClient is initialized.
         """
           exponential backoff.  The default value is set when the
           KeepClient is initialized.
         """
+
+        if isinstance(data, unicode):
+            data = data.encode("ascii")
+        elif not isinstance(data, str):
+            raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put must be type 'str'")
+
         data_hash = hashlib.md5(data).hexdigest()
         if copies < 1:
             return data_hash
         data_hash = hashlib.md5(data).hexdigest()
         if copies < 1:
             return data_hash
@@ -679,7 +721,7 @@ class KeepClient(object):
                     data_hash=data_hash,
                     service_root=service_root,
                     thread_limiter=thread_limiter,
                     data_hash=data_hash,
                     service_root=service_root,
                     thread_limiter=thread_limiter,
-                    timeout=self.timeout)
+                    timeout=self.current_timeout())
                 t.start()
                 threads.append(t)
             for t in threads:
                 t.start()
                 threads.append(t)
             for t in threads:
@@ -688,14 +730,30 @@ class KeepClient(object):
 
         if loop.success():
             return thread_limiter.response()
 
         if loop.success():
             return thread_limiter.response()
-        raise arvados.errors.KeepWriteError(
-            "Write fail for %s: wanted %d but wrote %d" %
-            (data_hash, copies, thread_limiter.done()))
-
-    # Local storage methods need no-op num_retries arguments to keep
-    # integration tests happy.  With better isolation they could
-    # probably be removed again.
-    def local_store_put(self, data, num_retries=0):
+        if not roots_map:
+            raise arvados.errors.KeepWriteError(
+                "failed to write {}: no Keep services available ({})".format(
+                    data_hash, loop.last_result()))
+        else:
+            service_errors = ((key, roots_map[key].last_result)
+                              for key in local_roots
+                              if not roots_map[key].success_flag)
+            raise arvados.errors.KeepWriteError(
+                "failed to write {} (wanted {} copies but wrote {})".format(
+                    data_hash, copies, thread_limiter.done()), service_errors)
+
+    def local_store_put(self, data, copies=1, num_retries=None):
+        """A stub for put().
+
+        This method is used in place of the real put() method when
+        using local storage (see constructor's local_store argument).
+
+        copies and num_retries arguments are ignored: they are here
+        only for the sake of offering the same call signature as
+        put().
+
+        Data stored this way can be retrieved via local_store_get().
+        """
         md5 = hashlib.md5(data).hexdigest()
         locator = '%s+%d' % (md5, len(data))
         with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
         md5 = hashlib.md5(data).hexdigest()
         locator = '%s+%d' % (md5, len(data))
         with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
@@ -704,7 +762,8 @@ class KeepClient(object):
                   os.path.join(self.local_store, md5))
         return locator
 
                   os.path.join(self.local_store, md5))
         return locator
 
-    def local_store_get(self, loc_s, num_retries=0):
+    def local_store_get(self, loc_s, num_retries=None):
+        """Companion to local_store_put()."""
         try:
             locator = KeepLocator(loc_s)
         except ValueError:
         try:
             locator = KeepLocator(loc_s)
         except ValueError: