def test_KeepLongBinaryRWTest(self):
blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
- for i in range(0,23):
+ for i in range(0, 23):
blob_data = blob_data + blob_data
blob_locator = self.keep_client.put(blob_data)
self.assertRegex(
# First reponse was not cached because it was from a HEAD request.
self.assertNotEqual(head_resp, get_resp)
+@tutil.skip_sleep
+class KeepStorageClassesTestCase(unittest.TestCase, tutil.ApiClientMock):
+ def setUp(self):
+ self.api_client = self.mock_keep_services(count=2)
+ self.keep_client = arvados.KeepClient(api_client=self.api_client)
+ self.data = b'xyzzy'
+ self.locator = '1271ed5ef305aadabc605b1609e24c52'
+
+ def test_multiple_default_storage_classes_req_header(self):
+ api_mock = self.api_client_mock()
+ api_mock.config.return_value = {
+ 'StorageClasses': {
+ 'foo': { 'Default': True },
+ 'bar': { 'Default': True },
+ 'baz': { 'Default': False }
+ }
+ }
+ api_client = self.mock_keep_services(api_mock=api_mock, count=2)
+ keep_client = arvados.KeepClient(api_client=api_client)
+ resp_hdr = {
+ 'x-keep-storage-classes-confirmed': 'foo=1, bar=1',
+ 'x-keep-replicas-stored': 1
+ }
+ with tutil.mock_keep_responses(self.locator, 200, **resp_hdr) as mock:
+ keep_client.put(self.data, copies=1)
+ req_hdr = mock.responses[0]
+ self.assertIn(
+ 'X-Keep-Storage-Classes: bar, foo', req_hdr.getopt(pycurl.HTTPHEADER))
+
+ def test_storage_classes_req_header(self):
+ self.assertEqual(
+ self.api_client.config()['StorageClasses'],
+ {'default': {'Default': True}})
+ cases = [
+ # requested, expected
+ [['foo'], 'X-Keep-Storage-Classes: foo'],
+ [['bar', 'foo'], 'X-Keep-Storage-Classes: bar, foo'],
+ [[], 'X-Keep-Storage-Classes: default'],
+ [None, 'X-Keep-Storage-Classes: default'],
+ ]
+ for req_classes, expected_header in cases:
+ headers = {'x-keep-replicas-stored': 1}
+ if req_classes is None or len(req_classes) == 0:
+ confirmed_hdr = 'default=1'
+ elif len(req_classes) > 0:
+ confirmed_hdr = ', '.join(["{}=1".format(cls) for cls in req_classes])
+ headers.update({'x-keep-storage-classes-confirmed': confirmed_hdr})
+ with tutil.mock_keep_responses(self.locator, 200, **headers) as mock:
+ self.keep_client.put(self.data, copies=1, classes=req_classes)
+ req_hdr = mock.responses[0]
+ self.assertIn(expected_header, req_hdr.getopt(pycurl.HTTPHEADER))
+
+ def test_partial_storage_classes_put(self):
+ headers = {
+ 'x-keep-replicas-stored': 1,
+ 'x-keep-storage-classes-confirmed': 'foo=1'}
+ with tutil.mock_keep_responses(self.locator, 200, 503, **headers) as mock:
+ with self.assertRaises(arvados.errors.KeepWriteError):
+ self.keep_client.put(self.data, copies=1, classes=['foo', 'bar'])
+ # 1st request, both classes pending
+ req1_headers = mock.responses[0].getopt(pycurl.HTTPHEADER)
+ self.assertIn('X-Keep-Storage-Classes: bar, foo', req1_headers)
+ # 2nd try, 'foo' class already satisfied
+ req2_headers = mock.responses[1].getopt(pycurl.HTTPHEADER)
+ self.assertIn('X-Keep-Storage-Classes: bar', req2_headers)
+
+ def test_successful_storage_classes_put_requests(self):
+ cases = [
+ # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, expected_requests
+ [ 1, ['foo'], 1, 'foo=1', 1],
+ [ 1, ['foo'], 2, 'foo=2', 1],
+ [ 2, ['foo'], 2, 'foo=2', 1],
+ [ 2, ['foo'], 1, 'foo=1', 2],
+ [ 1, ['foo', 'bar'], 1, 'foo=1, bar=1', 1],
+ [ 1, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
+ [ 2, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
+ [ 2, ['foo', 'bar'], 1, 'foo=1, bar=1', 2],
+ [ 1, ['foo', 'bar'], 1, None, 1],
+ [ 1, ['foo'], 1, None, 1],
+ [ 2, ['foo'], 2, None, 1],
+ [ 2, ['foo'], 1, None, 2],
+ ]
+ for w_copies, w_classes, c_copies, c_classes, e_reqs in cases:
+ headers = {'x-keep-replicas-stored': c_copies}
+ if c_classes is not None:
+ headers.update({'x-keep-storage-classes-confirmed': c_classes})
+ with tutil.mock_keep_responses(self.locator, 200, 200, **headers) as mock:
+ case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}", expected_requests={}'.format(w_copies, ', '.join(w_classes), c_copies, c_classes, e_reqs)
+ self.assertEqual(self.locator,
+ self.keep_client.put(self.data, copies=w_copies, classes=w_classes),
+ case_desc)
+ self.assertEqual(e_reqs, mock.call_count, case_desc)
+
+ def test_failed_storage_classes_put_requests(self):
+ cases = [
+ # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, return_code
+ [ 1, ['foo'], 1, 'bar=1', 200],
+ [ 1, ['foo'], 1, None, 503],
+ [ 2, ['foo'], 1, 'bar=1, foo=0', 200],
+ [ 3, ['foo'], 1, 'bar=1, foo=1', 200],
+ [ 3, ['foo', 'bar'], 1, 'bar=2, foo=1', 200],
+ ]
+ for w_copies, w_classes, c_copies, c_classes, return_code in cases:
+ headers = {'x-keep-replicas-stored': c_copies}
+ if c_classes is not None:
+ headers.update({'x-keep-storage-classes-confirmed': c_classes})
+ with tutil.mock_keep_responses(self.locator, return_code, return_code, **headers):
+ case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}"'.format(w_copies, ', '.join(w_classes), c_copies, c_classes)
+ with self.assertRaises(arvados.errors.KeepWriteError, msg=case_desc):
+ self.keep_client.put(self.data, copies=w_copies, classes=w_classes)
@tutil.skip_sleep
class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock):
self.keep_client.head(self.locator)
self.assertAutomaticRequestId(mock.responses[0])
+ def test_request_id_in_exception(self):
+ with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
+ with self.assertRaisesRegex(arvados.errors.KeepReadError, self.test_id):
+ self.keep_client.head(self.locator, request_id=self.test_id)
+
+ with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
+ with self.assertRaisesRegex(arvados.errors.KeepReadError, r'req-[a-z0-9]{20}'):
+ self.keep_client.get(self.locator)
+
+ with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
+ with self.assertRaisesRegex(arvados.errors.KeepWriteError, self.test_id):
+ self.keep_client.put(self.data, request_id=self.test_id)
+
+ with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
+ with self.assertRaisesRegex(arvados.errors.KeepWriteError, r'req-[a-z0-9]{20}'):
+ self.keep_client.put(self.data)
+
def assertAutomaticRequestId(self, resp):
hdr = [x for x in resp.getopt(pycurl.HTTPHEADER)
if x.startswith('X-Request-Id: ')][0]
kc = self.keepClient()
loc = kc.put(self.DATA, copies=1, num_retries=0)
self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
- self.server.setdelays(response=self.TIMEOUT_TIME)
+ # Note the actual delay must be 1s longer than the low speed
+ # limit interval in order for curl to detect it reliably.
+ self.server.setdelays(response=self.TIMEOUT_TIME+1)
with self.assertTakesGreater(self.TIMEOUT_TIME):
with self.assertRaises(arvados.errors.KeepReadError):
kc.get(loc, num_retries=0)
kc = self.keepClient()
loc = kc.put(self.DATA, copies=1, num_retries=0)
self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
- self.server.setdelays(mid_write=self.TIMEOUT_TIME, mid_read=self.TIMEOUT_TIME)
+ # Note the actual delay must be 1s longer than the low speed
+ # limit interval in order for curl to detect it reliably.
+ self.server.setdelays(mid_write=self.TIMEOUT_TIME+1, mid_read=self.TIMEOUT_TIME+1)
with self.assertTakesGreater(self.TIMEOUT_TIME):
with self.assertRaises(arvados.errors.KeepReadError) as e:
kc.get(loc, num_retries=0)
self._result = {}
self._result['headers'] = {}
self._result['headers']['x-keep-replicas-stored'] = str(replicas)
+ self._result['headers']['x-keep-storage-classes-confirmed'] = 'default={}'.format(replicas)
self._result['body'] = 'foobar'
- def put(self, data_hash, data, timeout):
+ def put(self, data_hash, data, timeout, headers):
time.sleep(self.delay)
if self.will_raise is not None:
raise self.will_raise
def last_result(self):
if self.will_succeed:
return self._result
+ else:
+ return {"status_code": 500, "body": "didn't succeed"}
def finished(self):
return False
ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
self.pool.add_task(ks, None)
self.pool.join()
- self.assertEqual(self.pool.done(), self.copies)
+ self.assertEqual(self.pool.done(), (self.copies, []))
def test_only_write_enough_on_partial_success(self):
for i in range(5):
ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
self.pool.add_task(ks, None)
self.pool.join()
- self.assertEqual(self.pool.done(), self.copies)
+ self.assertEqual(self.pool.done(), (self.copies, []))
def test_only_write_enough_when_some_crash(self):
for i in range(5):
ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
self.pool.add_task(ks, None)
self.pool.join()
- self.assertEqual(self.pool.done(), self.copies)
+ self.assertEqual(self.pool.done(), (self.copies, []))
def test_fail_when_too_many_crash(self):
for i in range(self.copies+1):
ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
self.pool.add_task(ks, None)
self.pool.join()
- self.assertEqual(self.pool.done(), self.copies-1)
+ self.assertEqual(self.pool.done(), (self.copies-1, []))
@tutil.skip_sleep
return "abc"
elif r == "insecure":
return False
+ elif r == "config":
+ return lambda: {}
else:
raise arvados.errors.KeepReadError()
keep_client = arvados.KeepClient(api_client=ApiMock(),