Closes #7235. Instead of setting KeepService's pycurl.TIMEOUT_MS, set pycurl.LOW_SPEE...
[arvados.git] / sdk / python / arvados / keep.py
index ec9f6f6422e4736133ada98e971213da6c014e9b..e01fec412bc4e9cb83d47930b850b1a101e68438 100644 (file)
@@ -213,11 +213,13 @@ class KeepBlockCache(object):
 class KeepClient(object):
 
     # Default Keep server connection timeout:  2 seconds
-    # Default Keep server read timeout:      300 seconds
+    # Default Keep server read timeout:       64 seconds
+    # Default Keep server bandwidth minimum:  32768 bytes per second
     # Default Keep proxy connection timeout:  20 seconds
-    # Default Keep proxy read timeout:       300 seconds
-    DEFAULT_TIMEOUT = (2, 300)
-    DEFAULT_PROXY_TIMEOUT = (20, 300)
+    # Default Keep proxy read timeout:        64 seconds
+    # Default Keep proxy bandwidth minimum:   32768 bytes per second
+    DEFAULT_TIMEOUT = (2, 64, 32768)
+    DEFAULT_PROXY_TIMEOUT = (20, 64, 32768)
 
     class ThreadLimiter(object):
         """Limit the number of threads writing to Keep at once.
@@ -478,11 +480,17 @@ class KeepClient(object):
             if not timeouts:
                 return
             elif isinstance(timeouts, tuple):
-                conn_t, xfer_t = timeouts
+                if len(timeouts) == 2:
+                    conn_t, xfer_t = timeouts
+                    bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
+                else:
+                    conn_t, xfer_t, bandwidth_bps = timeouts
             else:
                 conn_t, xfer_t = (timeouts, timeouts)
+                bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
-            curl.setopt(pycurl.TIMEOUT_MS, int(xfer_t*1000))
+            curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
+            curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
 
         def _headerfunction(self, header_line):
             header_line = header_line.decode('iso-8859-1')
@@ -586,20 +594,22 @@ class KeepClient(object):
 
         :timeout:
           The initial timeout (in seconds) for HTTP requests to Keep
-          non-proxy servers.  A tuple of two floats is interpreted as
-          (connection_timeout, read_timeout): see
-          http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
-          Because timeouts are often a result of transient server load, the
-          actual connection timeout will be increased by a factor of two on
-          each retry.
-          Default: (2, 300).
+          non-proxy servers.  A tuple of three floats is interpreted as
+          (connection_timeout, read_timeout, minimum_bandwidth). A connection
+          will be aborted if the average traffic rate falls below
+          minimum_bandwidth bytes per second over an interval of read_timeout
+          seconds. Because timeouts are often a result of transient server
+          load, the actual connection timeout will be increased by a factor
+          of two on each retry.
+          Default: (2, 64, 32768).
 
         :proxy_timeout:
           The initial timeout (in seconds) for HTTP requests to
-          Keep proxies. A tuple of two floats is interpreted as
-          (connection_timeout, read_timeout). The behavior described
-          above for adjusting connection timeouts on retry also applies.
-          Default: (20, 300).
+          Keep proxies. A tuple of three floats is interpreted as
+          (connection_timeout, read_timeout, minimum_bandwidth). The behavior
+          described above for adjusting connection timeouts on retry also
+          applies.
+          Default: (20, 64, 32768).
 
         :api_token:
           If you're not using an API client, but only talking
@@ -686,8 +696,10 @@ class KeepClient(object):
         # TODO(twp): the timeout should be a property of a
         # KeepService, not a KeepClient. See #4488.
         t = self.proxy_timeout if self.using_proxy else self.timeout
-        return (t[0] * (1 << attempt_number), t[1])
-
+        if len(t) == 2:
+            return (t[0] * (1 << attempt_number), t[1])
+        else:
+            return (t[0] * (1 << attempt_number), t[1], t[2])
     def _any_nondisk_services(self, service_list):
         return any(ks.get('service_type', 'disk') != 'disk'
                    for ks in service_list)