Merge branch '6833-test-token-expiry' closes #6833
[arvados.git] / sdk / python / arvados / keep.py
index ec9f6f6422e4736133ada98e971213da6c014e9b..4fa8a4f68399b9eda6345430f1e1bec1ea4fa474 100644 (file)
@@ -148,6 +148,8 @@ class KeepBlockCache(object):
         self._cache_lock = threading.Lock()
 
     class CacheSlot(object):
+        __slots__ = ("locator", "ready", "content")
+
         def __init__(self, locator):
             self.locator = locator
             self.ready = threading.Event()
@@ -210,14 +212,31 @@ class KeepBlockCache(object):
                 self._cache.insert(0, n)
                 return n, True
 
+
+class Counter(object):
+    def __init__(self, v=0):
+        self._lk = threading.Lock()
+        self._val = v
+
+    def add(self, v):
+        with self._lk:
+            self._val += v
+
+    def get(self):
+        with self._lk:
+            return self._val
+
+
 class KeepClient(object):
 
     # Default Keep server connection timeout:  2 seconds
-    # Default Keep server read timeout:      300 seconds
+    # Default Keep server read timeout:       256 seconds
+    # Default Keep server bandwidth minimum:  32768 bytes per second
     # Default Keep proxy connection timeout:  20 seconds
-    # Default Keep proxy read timeout:       300 seconds
-    DEFAULT_TIMEOUT = (2, 300)
-    DEFAULT_PROXY_TIMEOUT = (20, 300)
+    # Default Keep proxy read timeout:        256 seconds
+    # Default Keep proxy bandwidth minimum:   32768 bytes per second
+    DEFAULT_TIMEOUT = (2, 256, 32768)
+    DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
 
     class ThreadLimiter(object):
         """Limit the number of threads writing to Keep at once.
@@ -290,7 +309,6 @@ class KeepClient(object):
             with self._done_lock:
                 return self._done
 
-
     class KeepService(object):
         """Make requests to a single Keep service, and track results.
 
@@ -309,7 +327,9 @@ class KeepClient(object):
             arvados.errors.HttpError,
         )
 
-        def __init__(self, root, user_agent_pool=Queue.LifoQueue(), **headers):
+        def __init__(self, root, user_agent_pool=Queue.LifoQueue(),
+                     upload_counter=None,
+                     download_counter=None, **headers):
             self.root = root
             self._user_agent_pool = user_agent_pool
             self._result = {'error': None}
@@ -318,6 +338,8 @@ class KeepClient(object):
             self.get_headers = {'Accept': 'application/octet-stream'}
             self.get_headers.update(headers)
             self.put_headers = headers
+            self.upload_counter = upload_counter
+            self.download_counter = download_counter
 
         def usable(self):
             """Is it worth attempting a request?"""
@@ -403,11 +425,13 @@ class KeepClient(object):
                 _logger.debug("Request fail: GET %s => %s: %s",
                               url, type(self._result['error']), str(self._result['error']))
                 return None
-            _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
+            _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
                          self._result['status_code'],
                          len(self._result['body']),
                          t.msecs,
                          (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
+            if self.download_counter:
+                self.download_counter.add(len(self._result['body']))
             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
             if resp_md5 != locator.md5sum:
                 _logger.warning("Checksum fail: md5(%s) = %s",
@@ -422,36 +446,37 @@ class KeepClient(object):
             _logger.debug("Request: PUT %s", url)
             curl = self._get_user_agent()
             try:
-                self._headers = {}
-                body_reader = cStringIO.StringIO(body)
-                response_body = cStringIO.StringIO()
-                curl.setopt(pycurl.NOSIGNAL, 1)
-                curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
-                curl.setopt(pycurl.URL, url.encode('utf-8'))
-                # Using UPLOAD tells cURL to wait for a "go ahead" from the
-                # Keep server (in the form of a HTTP/1.1 "100 Continue"
-                # response) instead of sending the request body immediately.
-                # This allows the server to reject the request if the request
-                # is invalid or the server is read-only, without waiting for
-                # the client to send the entire block.
-                curl.setopt(pycurl.UPLOAD, True)
-                curl.setopt(pycurl.INFILESIZE, len(body))
-                curl.setopt(pycurl.READFUNCTION, body_reader.read)
-                curl.setopt(pycurl.HTTPHEADER, [
-                    '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
-                curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
-                curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
-                self._setcurltimeouts(curl, timeout)
-                try:
-                    curl.perform()
-                except Exception as e:
-                    raise arvados.errors.HttpError(0, str(e))
-                self._result = {
-                    'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
-                    'body': response_body.getvalue(),
-                    'headers': self._headers,
-                    'error': False,
-                }
+                with timer.Timer() as t:
+                    self._headers = {}
+                    body_reader = cStringIO.StringIO(body)
+                    response_body = cStringIO.StringIO()
+                    curl.setopt(pycurl.NOSIGNAL, 1)
+                    curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
+                    curl.setopt(pycurl.URL, url.encode('utf-8'))
+                    # Using UPLOAD tells cURL to wait for a "go ahead" from the
+                    # Keep server (in the form of a HTTP/1.1 "100 Continue"
+                    # response) instead of sending the request body immediately.
+                    # This allows the server to reject the request if the request
+                    # is invalid or the server is read-only, without waiting for
+                    # the client to send the entire block.
+                    curl.setopt(pycurl.UPLOAD, True)
+                    curl.setopt(pycurl.INFILESIZE, len(body))
+                    curl.setopt(pycurl.READFUNCTION, body_reader.read)
+                    curl.setopt(pycurl.HTTPHEADER, [
+                        '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
+                    curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
+                    curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
+                    self._setcurltimeouts(curl, timeout)
+                    try:
+                        curl.perform()
+                    except Exception as e:
+                        raise arvados.errors.HttpError(0, str(e))
+                    self._result = {
+                        'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
+                        'body': response_body.getvalue(),
+                        'headers': self._headers,
+                        'error': False,
+                    }
                 ok = retry.check_http_response_success(self._result['status_code'])
                 if not ok:
                     self._result['error'] = arvados.errors.HttpError(
@@ -472,17 +497,30 @@ class KeepClient(object):
                 _logger.debug("Request fail: PUT %s => %s: %s",
                               url, type(self._result['error']), str(self._result['error']))
                 return False
+            _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
+                         self._result['status_code'],
+                         len(body),
+                         t.msecs,
+                         (len(body)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
+            if self.upload_counter:
+                self.upload_counter.add(len(body))
             return True
 
         def _setcurltimeouts(self, curl, timeouts):
             if not timeouts:
                 return
             elif isinstance(timeouts, tuple):
-                conn_t, xfer_t = timeouts
+                if len(timeouts) == 2:
+                    conn_t, xfer_t = timeouts
+                    bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
+                else:
+                    conn_t, xfer_t, bandwidth_bps = timeouts
             else:
                 conn_t, xfer_t = (timeouts, timeouts)
+                bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
-            curl.setopt(pycurl.TIMEOUT_MS, int(xfer_t*1000))
+            curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
+            curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
 
         def _headerfunction(self, header_line):
             header_line = header_line.decode('iso-8859-1')
@@ -586,20 +624,22 @@ class KeepClient(object):
 
         :timeout:
           The initial timeout (in seconds) for HTTP requests to Keep
-          non-proxy servers.  A tuple of two floats is interpreted as
-          (connection_timeout, read_timeout): see
-          http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
-          Because timeouts are often a result of transient server load, the
-          actual connection timeout will be increased by a factor of two on
-          each retry.
-          Default: (2, 300).
+          non-proxy servers.  A tuple of three floats is interpreted as
+          (connection_timeout, read_timeout, minimum_bandwidth). A connection
+          will be aborted if the average traffic rate falls below
+          minimum_bandwidth bytes per second over an interval of read_timeout
+          seconds. Because timeouts are often a result of transient server
+          load, the actual connection timeout will be increased by a factor
+          of two on each retry.
+          Default: (2, 256, 32768).
 
         :proxy_timeout:
           The initial timeout (in seconds) for HTTP requests to
-          Keep proxies. A tuple of two floats is interpreted as
-          (connection_timeout, read_timeout). The behavior described
-          above for adjusting connection timeouts on retry also applies.
-          Default: (20, 300).
+          Keep proxies. A tuple of three floats is interpreted as
+          (connection_timeout, read_timeout, minimum_bandwidth). The behavior
+          described above for adjusting connection timeouts on retry also
+          applies.
+          Default: (20, 256, 32768).
 
         :api_token:
           If you're not using an API client, but only talking
@@ -639,6 +679,12 @@ class KeepClient(object):
         self.timeout = timeout
         self.proxy_timeout = proxy_timeout
         self._user_agent_pool = Queue.LifoQueue()
+        self.upload_counter = Counter()
+        self.download_counter = Counter()
+        self.put_counter = Counter()
+        self.get_counter = Counter()
+        self.hits_counter = Counter()
+        self.misses_counter = Counter()
 
         if local_store:
             self.local_store = local_store
@@ -686,8 +732,10 @@ class KeepClient(object):
         # TODO(twp): the timeout should be a property of a
         # KeepService, not a KeepClient. See #4488.
         t = self.proxy_timeout if self.using_proxy else self.timeout
-        return (t[0] * (1 << attempt_number), t[1])
-
+        if len(t) == 2:
+            return (t[0] * (1 << attempt_number), t[1])
+        else:
+            return (t[0] * (1 << attempt_number), t[1], t[2])
     def _any_nondisk_services(self, service_list):
         return any(ks.get('service_type', 'disk') != 'disk'
                    for ks in service_list)
@@ -790,7 +838,10 @@ class KeepClient(object):
         for root in local_roots:
             if root not in roots_map:
                 roots_map[root] = self.KeepService(
-                    root, self._user_agent_pool, **headers)
+                    root, self._user_agent_pool,
+                    upload_counter=self.upload_counter,
+                    download_counter=self.download_counter,
+                    **headers)
         return local_roots
 
     @staticmethod
@@ -842,12 +893,18 @@ class KeepClient(object):
         """
         if ',' in loc_s:
             return ''.join(self.get(x) for x in loc_s.split(','))
+
+        self.get_counter.add(1)
+
         locator = KeepLocator(loc_s)
         slot, first = self.block_cache.reserve_cache(locator.md5sum)
         if not first:
+            self.hits_counter.add(1)
             v = slot.get()
             return v
 
+        self.misses_counter.add(1)
+
         # If the locator has hints specifying a prefix (indicating a
         # remote keepproxy) or the UUID of a local gateway service,
         # read data from the indicated service(s) instead of the usual
@@ -862,7 +919,9 @@ class KeepClient(object):
                                    )])
         # Map root URLs to their KeepService objects.
         roots_map = {
-            root: self.KeepService(root, self._user_agent_pool)
+            root: self.KeepService(root, self._user_agent_pool,
+                                   upload_counter=self.upload_counter,
+                                   download_counter=self.download_counter)
             for root in hint_roots
         }
 
@@ -945,6 +1004,8 @@ class KeepClient(object):
         elif not isinstance(data, str):
             raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
 
+        self.put_counter.add(1)
+
         data_hash = hashlib.md5(data).hexdigest()
         loc_s = data_hash + '+' + str(len(data))
         if copies < 1: