Merge branch 'master' into 7253-datamanager-test-errors
[arvados.git] / sdk / python / tests / test_keep_client.py
index 90468924a668b09e9116084adf20581878a382b2..2b380f066641ec25ac980b6f9183df94b728bb5d 100644 (file)
@@ -29,14 +29,21 @@ class KeepTestCase(run_test_server.TestCaseWithServers):
                                              proxy='', local_store='')
 
     def test_KeepBasicRWTest(self):
+        self.assertEqual(0, self.keep_client.upload_counter.get())
         foo_locator = self.keep_client.put('foo')
         self.assertRegexpMatches(
             foo_locator,
             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
+
+        # 6 bytes because uploaded 2 copies
+        self.assertEqual(6, self.keep_client.upload_counter.get())
+
+        self.assertEqual(0, self.keep_client.download_counter.get())
         self.assertEqual(self.keep_client.get(foo_locator),
                          'foo',
                          'wrong content from Keep.get(md5("foo"))')
+        self.assertEqual(3, self.keep_client.download_counter.get())
 
     def test_KeepBinaryRWTest(self):
         blob_str = '\xff\xfe\xf7\x00\x01\x02'
@@ -287,8 +294,11 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
             self.assertEqual(
-                mock.responses[0].getopt(pycurl.TIMEOUT_MS),
-                int(arvados.KeepClient.DEFAULT_TIMEOUT[1]*1000))
+                mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
+                int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
+            self.assertEqual(
+                mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
+                int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
 
     def test_put_timeout(self):
         api_client = self.mock_keep_services(count=1)
@@ -301,8 +311,11 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
             self.assertEqual(
-                mock.responses[0].getopt(pycurl.TIMEOUT_MS),
-                int(arvados.KeepClient.DEFAULT_TIMEOUT[1]*1000))
+                mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
+                int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
+            self.assertEqual(
+                mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
+                int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
 
     def test_proxy_get_timeout(self):
         api_client = self.mock_keep_services(service_type='proxy', count=1)
@@ -315,8 +328,11 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
             self.assertEqual(
-                mock.responses[0].getopt(pycurl.TIMEOUT_MS),
-                int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]*1000))
+                mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
+                int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
+            self.assertEqual(
+                mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
+                int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
 
     def test_proxy_put_timeout(self):
         api_client = self.mock_keep_services(service_type='proxy', count=1)
@@ -329,8 +345,11 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
             self.assertEqual(
-                mock.responses[0].getopt(pycurl.TIMEOUT_MS),
-                int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]*1000))
+                mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
+                int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
+            self.assertEqual(
+                mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
+                int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
 
     def check_no_services_error(self, verb, exc_class):
         api_client = mock.MagicMock(name='api_client')
@@ -368,7 +387,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
 
     def test_put_error_does_not_include_successful_puts(self):
         data = 'partial failure test'
-        data_loc = '{}+{}'.format(hashlib.md5(data).hexdigest(), len(data))
+        data_loc = tutil.str_keep_locator(data)
         api_client = self.mock_keep_services(count=3)
         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
@@ -378,7 +397,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
 
     def test_proxy_put_with_no_writable_services(self):
         data = 'test with no writable services'
-        data_loc = '{}+{}'.format(hashlib.md5(data).hexdigest(), len(data))
+        data_loc = tutil.str_keep_locator(data)
         api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
@@ -387,6 +406,36 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
         self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
         self.assertEqual(0, len(exc_check.exception.request_errors()))
 
+    def test_oddball_service_get(self):
+        body = 'oddball service get'
+        api_client = self.mock_keep_services(service_type='fancynewblobstore')
+        with tutil.mock_keep_responses(body, 200):
+            keep_client = arvados.KeepClient(api_client=api_client)
+            actual = keep_client.get(tutil.str_keep_locator(body))
+        self.assertEqual(body, actual)
+
+    def test_oddball_service_put(self):
+        body = 'oddball service put'
+        pdh = tutil.str_keep_locator(body)
+        api_client = self.mock_keep_services(service_type='fancynewblobstore')
+        with tutil.mock_keep_responses(pdh, 200):
+            keep_client = arvados.KeepClient(api_client=api_client)
+            actual = keep_client.put(body, copies=1)
+        self.assertEqual(pdh, actual)
+
+    def test_oddball_service_writer_count(self):
+        body = 'oddball service writer count'
+        pdh = tutil.str_keep_locator(body)
+        api_client = self.mock_keep_services(service_type='fancynewblobstore',
+                                             count=4)
+        headers = {'x-keep-replicas-stored': 3}
+        with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
+                                       **headers) as req_mock:
+            keep_client = arvados.KeepClient(api_client=api_client)
+            actual = keep_client.put(body, copies=2)
+        self.assertEqual(pdh, actual)
+        self.assertEqual(1, req_mock.call_count)
+
 
 @tutil.skip_sleep
 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
@@ -518,7 +567,7 @@ class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
     def check_64_zeros_error_order(self, verb, exc_class):
         data = '0' * 64
         if verb == 'get':
-            data = hashlib.md5(data).hexdigest() + '+1234'
+            data = tutil.str_keep_locator(data)
         # Arbitrary port number:
         aport = random.randint(1024,65535)
         api_client = self.mock_keep_services(service_port=aport, count=self.services)
@@ -540,7 +589,12 @@ class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
 
 
 class KeepClientTimeout(unittest.TestCase, tutil.ApiClientMock):
-    DATA = 'x' * 2**10
+    # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
+    # 1s worth of data and then trigger bandwidth errors before running
+    # out of data.
+    DATA = 'x'*2**11
+    BANDWIDTH_LOW_LIM = 1024
+    TIMEOUT_TIME = 1.0
 
     class assertTakesBetween(unittest.TestCase):
         def __init__(self, tmin, tmax):
@@ -551,8 +605,22 @@ class KeepClientTimeout(unittest.TestCase, tutil.ApiClientMock):
             self.t0 = time.time()
 
         def __exit__(self, *args, **kwargs):
-            self.assertGreater(time.time() - self.t0, self.tmin)
-            self.assertLess(time.time() - self.t0, self.tmax)
+            # Round times to milliseconds, like CURL. Otherwise, we
+            # fail when CURL reaches a 1s timeout at 0.9998s.
+            delta = round(time.time() - self.t0, 3)
+            self.assertGreaterEqual(delta, self.tmin)
+            self.assertLessEqual(delta, self.tmax)
+
+    class assertTakesGreater(unittest.TestCase):
+        def __init__(self, tmin):
+            self.tmin = tmin
+
+        def __enter__(self):
+            self.t0 = time.time()
+
+        def __exit__(self, *args, **kwargs):
+            delta = round(time.time() - self.t0, 3)
+            self.assertGreaterEqual(delta, self.tmin)
 
     def setUp(self):
         sock = socket.socket()
@@ -572,7 +640,7 @@ class KeepClientTimeout(unittest.TestCase, tutil.ApiClientMock):
     def tearDown(self):
         self.server.shutdown()
 
-    def keepClient(self, timeouts=(0.1, 1.0)):
+    def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
         return arvados.KeepClient(
             api_client=self.api_client,
             timeout=timeouts)
@@ -587,39 +655,89 @@ class KeepClientTimeout(unittest.TestCase, tutil.ApiClientMock):
         )
         with self.assertTakesBetween(0.1, 0.5):
             with self.assertRaises(arvados.errors.KeepWriteError):
-                self.keepClient((0.1, 1)).put(self.DATA, copies=1, num_retries=0)
+                self.keepClient().put(self.DATA, copies=1, num_retries=0)
+
+    def test_low_bandwidth_no_delays_success(self):
+        self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
+        kc = self.keepClient()
+        loc = kc.put(self.DATA, copies=1, num_retries=0)
+        self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
+
+    def test_too_low_bandwidth_no_delays_failure(self):
+        # Check that lessening bandwidth corresponds to failing
+        kc = self.keepClient()
+        loc = kc.put(self.DATA, copies=1, num_retries=0)
+        self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
+        with self.assertTakesGreater(self.TIMEOUT_TIME):
+            with self.assertRaises(arvados.errors.KeepReadError) as e:
+                kc.get(loc, num_retries=0)
+        with self.assertTakesGreater(self.TIMEOUT_TIME):
+            with self.assertRaises(arvados.errors.KeepWriteError):
+                kc.put(self.DATA, copies=1, num_retries=0)
+
+    def test_low_bandwidth_with_server_response_delay_failure(self):
+        kc = self.keepClient()
+        loc = kc.put(self.DATA, copies=1, num_retries=0)
+        self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
+        self.server.setdelays(response=self.TIMEOUT_TIME)
+        with self.assertTakesGreater(self.TIMEOUT_TIME):
+            with self.assertRaises(arvados.errors.KeepReadError) as e:
+                kc.get(loc, num_retries=0)
+        with self.assertTakesGreater(self.TIMEOUT_TIME):
+            with self.assertRaises(arvados.errors.KeepWriteError):
+                kc.put(self.DATA, copies=1, num_retries=0)
+
+    def test_low_bandwidth_with_server_mid_delay_failure(self):
+        kc = self.keepClient()
+        loc = kc.put(self.DATA, copies=1, num_retries=0)
+        self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
+        self.server.setdelays(mid_write=self.TIMEOUT_TIME, mid_read=self.TIMEOUT_TIME)
+        with self.assertTakesGreater(self.TIMEOUT_TIME):
+            with self.assertRaises(arvados.errors.KeepReadError) as e:
+                kc.get(loc, num_retries=0)
+        with self.assertTakesGreater(self.TIMEOUT_TIME):
+            with self.assertRaises(arvados.errors.KeepWriteError):
+                kc.put(self.DATA, copies=1, num_retries=0)
 
     def test_timeout_slow_request(self):
-        self.server.setdelays(request=0.2)
-        self._test_200ms()
+        loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
+        self.server.setdelays(request=.2)
+        self._test_connect_timeout_under_200ms(loc)
+        self.server.setdelays(request=2)
+        self._test_response_timeout_under_2s(loc)
 
     def test_timeout_slow_response(self):
-        self.server.setdelays(response=0.2)
-        self._test_200ms()
+        loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
+        self.server.setdelays(response=.2)
+        self._test_connect_timeout_under_200ms(loc)
+        self.server.setdelays(response=2)
+        self._test_response_timeout_under_2s(loc)
 
     def test_timeout_slow_response_body(self):
-        self.server.setdelays(response_body=0.2)
-        self._test_200ms()
-
-    def _test_200ms(self):
-        """Connect should be t<100ms, request should be 200ms <= t < 300ms"""
+        loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
+        self.server.setdelays(response_body=.2)
+        self._test_connect_timeout_under_200ms(loc)
+        self.server.setdelays(response_body=2)
+        self._test_response_timeout_under_2s(loc)
 
+    def _test_connect_timeout_under_200ms(self, loc):
         # Allow 100ms to connect, then 1s for response. Everything
         # should work, and everything should take at least 200ms to
         # return.
-        kc = self.keepClient((.1, 1))
+        kc = self.keepClient(timeouts=(.1, 1))
         with self.assertTakesBetween(.2, .3):
-            loc = kc.put(self.DATA, copies=1, num_retries=0)
+            kc.put(self.DATA, copies=1, num_retries=0)
         with self.assertTakesBetween(.2, .3):
             self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
 
-        # Allow 1s to connect, then 100ms for response. Nothing should
-        # work, and everything should take at least 100ms to return.
-        kc = self.keepClient((1, .1))
-        with self.assertTakesBetween(.1, .2):
+    def _test_response_timeout_under_2s(self, loc):
+        # Allow 10s to connect, then 1s for response. Nothing should
+        # work, and everything should take at least 1s to return.
+        kc = self.keepClient(timeouts=(10, 1))
+        with self.assertTakesBetween(1, 1.9):
             with self.assertRaises(arvados.errors.KeepReadError):
                 kc.get(loc, num_retries=0)
-        with self.assertTakesBetween(.1, .2):
+        with self.assertTakesBetween(1, 1.9):
             with self.assertRaises(arvados.errors.KeepWriteError):
                 kc.put(self.DATA, copies=1, num_retries=0)