def test_KeepLongBinaryRWTest(self):
blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
- for i in range(0,23):
+ for i in range(0, 23):
blob_data = blob_data + blob_data
blob_locator = self.keep_client.put(blob_data)
self.assertRegex(
def check_errors_from_last_retry(self, verb, exc_class):
api_client = self.mock_keep_services(count=2)
req_mock = tutil.mock_keep_responses(
- "retry error reporting test", 500, 500, 403, 403)
+ "retry error reporting test", 500, 500, 500, 500, 500, 500, 502, 502)
with req_mock, tutil.skip_sleep, \
self.assertRaises(exc_class) as err_check:
keep_client = arvados.KeepClient(api_client=api_client)
getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
num_retries=3)
- self.assertEqual([403, 403], [
+ self.assertEqual([502, 502], [
getattr(error, 'status_code', None)
for error in err_check.exception.request_errors().values()])
+ self.assertRegex(str(err_check.exception), r'failed to (read|write) .* after 4 attempts')
def test_get_error_reflects_last_retry(self):
self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
# First reponse was not cached because it was from a HEAD request.
self.assertNotEqual(head_resp, get_resp)
+@tutil.skip_sleep
+class KeepStorageClassesTestCase(unittest.TestCase, tutil.ApiClientMock):
+ def setUp(self):
+ self.api_client = self.mock_keep_services(count=2)
+ self.keep_client = arvados.KeepClient(api_client=self.api_client)
+ self.data = b'xyzzy'
+ self.locator = '1271ed5ef305aadabc605b1609e24c52'
+
+ def test_multiple_default_storage_classes_req_header(self):
+ api_mock = self.api_client_mock()
+ api_mock.config.return_value = {
+ 'StorageClasses': {
+ 'foo': { 'Default': True },
+ 'bar': { 'Default': True },
+ 'baz': { 'Default': False }
+ }
+ }
+ api_client = self.mock_keep_services(api_mock=api_mock, count=2)
+ keep_client = arvados.KeepClient(api_client=api_client)
+ resp_hdr = {
+ 'x-keep-storage-classes-confirmed': 'foo=1, bar=1',
+ 'x-keep-replicas-stored': 1
+ }
+ with tutil.mock_keep_responses(self.locator, 200, **resp_hdr) as mock:
+ keep_client.put(self.data, copies=1)
+ req_hdr = mock.responses[0]
+ self.assertIn(
+ 'X-Keep-Storage-Classes: bar, foo', req_hdr.getopt(pycurl.HTTPHEADER))
+
+ def test_storage_classes_req_header(self):
+ self.assertEqual(
+ self.api_client.config()['StorageClasses'],
+ {'default': {'Default': True}})
+ cases = [
+ # requested, expected
+ [['foo'], 'X-Keep-Storage-Classes: foo'],
+ [['bar', 'foo'], 'X-Keep-Storage-Classes: bar, foo'],
+ [[], 'X-Keep-Storage-Classes: default'],
+ [None, 'X-Keep-Storage-Classes: default'],
+ ]
+ for req_classes, expected_header in cases:
+ headers = {'x-keep-replicas-stored': 1}
+ if req_classes is None or len(req_classes) == 0:
+ confirmed_hdr = 'default=1'
+ elif len(req_classes) > 0:
+ confirmed_hdr = ', '.join(["{}=1".format(cls) for cls in req_classes])
+ headers.update({'x-keep-storage-classes-confirmed': confirmed_hdr})
+ with tutil.mock_keep_responses(self.locator, 200, **headers) as mock:
+ self.keep_client.put(self.data, copies=1, classes=req_classes)
+ req_hdr = mock.responses[0]
+ self.assertIn(expected_header, req_hdr.getopt(pycurl.HTTPHEADER))
+
+ def test_partial_storage_classes_put(self):
+ headers = {
+ 'x-keep-replicas-stored': 1,
+ 'x-keep-storage-classes-confirmed': 'foo=1'}
+ with tutil.mock_keep_responses(self.locator, 200, 503, **headers) as mock:
+ with self.assertRaises(arvados.errors.KeepWriteError):
+ self.keep_client.put(self.data, copies=1, classes=['foo', 'bar'])
+ # 1st request, both classes pending
+ req1_headers = mock.responses[0].getopt(pycurl.HTTPHEADER)
+ self.assertIn('X-Keep-Storage-Classes: bar, foo', req1_headers)
+ # 2nd try, 'foo' class already satisfied
+ req2_headers = mock.responses[1].getopt(pycurl.HTTPHEADER)
+ self.assertIn('X-Keep-Storage-Classes: bar', req2_headers)
+
+ def test_successful_storage_classes_put_requests(self):
+ cases = [
+ # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, expected_requests
+ [ 1, ['foo'], 1, 'foo=1', 1],
+ [ 1, ['foo'], 2, 'foo=2', 1],
+ [ 2, ['foo'], 2, 'foo=2', 1],
+ [ 2, ['foo'], 1, 'foo=1', 2],
+ [ 1, ['foo', 'bar'], 1, 'foo=1, bar=1', 1],
+ [ 1, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
+ [ 2, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
+ [ 2, ['foo', 'bar'], 1, 'foo=1, bar=1', 2],
+ [ 1, ['foo', 'bar'], 1, None, 1],
+ [ 1, ['foo'], 1, None, 1],
+ [ 2, ['foo'], 2, None, 1],
+ [ 2, ['foo'], 1, None, 2],
+ ]
+ for w_copies, w_classes, c_copies, c_classes, e_reqs in cases:
+ headers = {'x-keep-replicas-stored': c_copies}
+ if c_classes is not None:
+ headers.update({'x-keep-storage-classes-confirmed': c_classes})
+ with tutil.mock_keep_responses(self.locator, 200, 200, **headers) as mock:
+ case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}", expected_requests={}'.format(w_copies, ', '.join(w_classes), c_copies, c_classes, e_reqs)
+ self.assertEqual(self.locator,
+ self.keep_client.put(self.data, copies=w_copies, classes=w_classes),
+ case_desc)
+ self.assertEqual(e_reqs, mock.call_count, case_desc)
+
+ def test_failed_storage_classes_put_requests(self):
+ cases = [
+ # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, return_code
+ [ 1, ['foo'], 1, 'bar=1', 200],
+ [ 1, ['foo'], 1, None, 503],
+ [ 2, ['foo'], 1, 'bar=1, foo=0', 200],
+ [ 3, ['foo'], 1, 'bar=1, foo=1', 200],
+ [ 3, ['foo', 'bar'], 1, 'bar=2, foo=1', 200],
+ ]
+ for w_copies, w_classes, c_copies, c_classes, return_code in cases:
+ headers = {'x-keep-replicas-stored': c_copies}
+ if c_classes is not None:
+ headers.update({'x-keep-storage-classes-confirmed': c_classes})
+ with tutil.mock_keep_responses(self.locator, return_code, return_code, **headers):
+ case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}"'.format(w_copies, ', '.join(w_classes), c_copies, c_classes)
+ with self.assertRaises(arvados.errors.KeepWriteError, msg=case_desc):
+ self.keep_client.put(self.data, copies=w_copies, classes=w_classes)
@tutil.skip_sleep
class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock):
kc = self.keepClient()
loc = kc.put(self.DATA, copies=1, num_retries=0)
self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
- self.server.setdelays(response=self.TIMEOUT_TIME)
+ # Note the actual delay must be 1s longer than the low speed
+ # limit interval in order for curl to detect it reliably.
+ self.server.setdelays(response=self.TIMEOUT_TIME+1)
with self.assertTakesGreater(self.TIMEOUT_TIME):
with self.assertRaises(arvados.errors.KeepReadError):
kc.get(loc, num_retries=0)
kc = self.keepClient()
loc = kc.put(self.DATA, copies=1, num_retries=0)
self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
- self.server.setdelays(mid_write=self.TIMEOUT_TIME, mid_read=self.TIMEOUT_TIME)
+ # Note the actual delay must be 1s longer than the low speed
+ # limit interval in order for curl to detect it reliably.
+ self.server.setdelays(mid_write=self.TIMEOUT_TIME+1, mid_read=self.TIMEOUT_TIME+1)
with self.assertTakesGreater(self.TIMEOUT_TIME):
with self.assertRaises(arvados.errors.KeepReadError) as e:
kc.get(loc, num_retries=0)
def check_exception(self, error_class=None, *args, **kwargs):
if error_class is None:
error_class = self.DEFAULT_EXCEPTION
- self.assertRaises(error_class, self.run_method, *args, **kwargs)
+ with self.assertRaises(error_class) as err:
+ self.run_method(*args, **kwargs)
+ return err
def test_immediate_success(self):
with self.TEST_PATCHER(self.DEFAULT_EXPECT, 200):
def test_error_after_retries_exhausted(self):
with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 500, 200):
- self.check_exception(num_retries=1)
+ err = self.check_exception(num_retries=1)
+ self.assertRegex(str(err.exception), r'failed to .* after 2 attempts')
def test_num_retries_instance_fallback(self):
self.client_kwargs['num_retries'] = 3
self._result = {}
self._result['headers'] = {}
self._result['headers']['x-keep-replicas-stored'] = str(replicas)
+ self._result['headers']['x-keep-storage-classes-confirmed'] = 'default={}'.format(replicas)
self._result['body'] = 'foobar'
- def put(self, data_hash, data, timeout):
+ def put(self, data_hash, data, timeout, headers):
time.sleep(self.delay)
if self.will_raise is not None:
raise self.will_raise
ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
self.pool.add_task(ks, None)
self.pool.join()
- self.assertEqual(self.pool.done(), self.copies)
+ self.assertEqual(self.pool.done(), (self.copies, []))
def test_only_write_enough_on_partial_success(self):
for i in range(5):
ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
self.pool.add_task(ks, None)
self.pool.join()
- self.assertEqual(self.pool.done(), self.copies)
+ self.assertEqual(self.pool.done(), (self.copies, []))
def test_only_write_enough_when_some_crash(self):
for i in range(5):
ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
self.pool.add_task(ks, None)
self.pool.join()
- self.assertEqual(self.pool.done(), self.copies)
+ self.assertEqual(self.pool.done(), (self.copies, []))
def test_fail_when_too_many_crash(self):
for i in range(self.copies+1):
ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
self.pool.add_task(ks, None)
self.pool.join()
- self.assertEqual(self.pool.done(), self.copies-1)
+ self.assertEqual(self.pool.done(), (self.copies-1, []))
@tutil.skip_sleep
return "abc"
elif r == "insecure":
return False
+ elif r == "config":
+ return lambda: {}
else:
raise arvados.errors.KeepReadError()
keep_client = arvados.KeepClient(api_client=ApiMock(),