+
+
+class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
+
+ class FakeKeepService(object):
+ def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
+ self.delay = delay
+ self.will_succeed = will_succeed
+ self.will_raise = will_raise
+ self._result = {}
+ self._result['headers'] = {}
+ self._result['headers']['x-keep-replicas-stored'] = str(replicas)
+ self._result['body'] = 'foobar'
+
+ def put(self, data_hash, data, timeout):
+ time.sleep(self.delay)
+ if self.will_raise is not None:
+ raise self.will_raise
+ return self.will_succeed
+
+ def last_result(self):
+ if self.will_succeed:
+ return self._result
+
+ def finished(self):
+ return False
+
+ def setUp(self):
+ self.copies = 3
+ self.pool = arvados.KeepClient.KeepWriterThreadPool(
+ data = 'foo',
+ data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
+ max_service_replicas = self.copies,
+ copies = self.copies
+ )
+
+ def test_only_write_enough_on_success(self):
+ for i in range(10):
+ ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
+ self.pool.add_task(ks, None)
+ self.pool.join()
+ self.assertEqual(self.pool.done(), self.copies)
+
+ def test_only_write_enough_on_partial_success(self):
+ for i in range(5):
+ ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
+ self.pool.add_task(ks, None)
+ ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
+ self.pool.add_task(ks, None)
+ self.pool.join()
+ self.assertEqual(self.pool.done(), self.copies)
+
+ def test_only_write_enough_when_some_crash(self):
+ for i in range(5):
+ ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
+ self.pool.add_task(ks, None)
+ ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
+ self.pool.add_task(ks, None)
+ self.pool.join()
+ self.assertEqual(self.pool.done(), self.copies)
+
+ def test_fail_when_too_many_crash(self):
+ for i in range(self.copies+1):
+ ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
+ self.pool.add_task(ks, None)
+ for i in range(self.copies-1):
+ ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
+ self.pool.add_task(ks, None)
+ self.pool.join()
+ self.assertEqual(self.pool.done(), self.copies-1)
+
+
+@tutil.skip_sleep
+class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock):
+ # Test put()s that need two distinct servers to succeed, possibly
+ # requiring multiple passes through the retry loop.
+
+ def setUp(self):
+ self.api_client = self.mock_keep_services(count=2)
+ self.keep_client = arvados.KeepClient(api_client=self.api_client)
+
+ def test_success_after_exception(self):
+ with tutil.mock_keep_responses(
+ 'acbd18db4cc2f85cedef654fccc4a4d8+3',
+ Exception('mock err'), 200, 200) as req_mock:
+ self.keep_client.put('foo', num_retries=1, copies=2)
+ self.assertEqual(3, req_mock.call_count)
+
+ def test_success_after_retryable_error(self):
+ with tutil.mock_keep_responses(
+ 'acbd18db4cc2f85cedef654fccc4a4d8+3',
+ 500, 200, 200) as req_mock:
+ self.keep_client.put('foo', num_retries=1, copies=2)
+ self.assertEqual(3, req_mock.call_count)
+
+ def test_fail_after_final_error(self):
+ # First retry loop gets a 200 (can't achieve replication by
+ # storing again on that server) and a 400 (can't retry that
+ # server at all), so we shouldn't try a third request.
+ with tutil.mock_keep_responses(
+ 'acbd18db4cc2f85cedef654fccc4a4d8+3',
+ 200, 400, 200) as req_mock:
+ with self.assertRaises(arvados.errors.KeepWriteError):
+ self.keep_client.put('foo', num_retries=1, copies=2)
+ self.assertEqual(2, req_mock.call_count)