1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 standard_library.install_aliases()
9 from builtins import str
10 from builtins import range
11 from builtins import object
30 from . import arvados_testutil as tutil
31 from . import keepstub
32 from . import run_test_server
34 from .arvados_testutil import make_block_cache
36 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
37 class KeepTestCase(run_test_server.TestCaseWithServers):
44 super(KeepTestCase, cls).setUpClass()
45 run_test_server.authorize_with("admin")
46 cls.api_client = arvados.api('v1')
47 cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
48 proxy='', local_store='',
49 block_cache=make_block_cache(cls.disk_cache))
51 def test_KeepBasicRWTest(self):
52 self.assertEqual(0, self.keep_client.upload_counter.get())
53 foo_locator = self.keep_client.put('foo')
56 '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
57 'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
59 # 6 bytes because uploaded 2 copies
60 self.assertEqual(6, self.keep_client.upload_counter.get())
62 self.assertEqual(0, self.keep_client.download_counter.get())
63 self.assertTrue(tutil.binary_compare(self.keep_client.get(foo_locator),
65 'wrong content from Keep.get(md5("foo"))')
66 self.assertEqual(3, self.keep_client.download_counter.get())
68 def test_KeepBinaryRWTest(self):
69 blob_str = b'\xff\xfe\xf7\x00\x01\x02'
70 blob_locator = self.keep_client.put(blob_str)
73 '^7fc7c53b45e53926ba52821140fef396\+6',
74 ('wrong locator from Keep.put(<binarydata>):' + blob_locator))
75 self.assertEqual(self.keep_client.get(blob_locator),
77 'wrong content from Keep.get(md5(<binarydata>))')
79 def test_KeepLongBinaryRWTest(self):
80 blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
81 for i in range(0, 23):
82 blob_data = blob_data + blob_data
83 blob_locator = self.keep_client.put(blob_data)
86 '^84d90fc0d8175dd5dcfab04b999bc956\+67108864',
87 ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
88 self.assertEqual(self.keep_client.get(blob_locator),
90 'wrong content from Keep.get(md5(<binarydata>))')
92 @unittest.skip("unreliable test - please fix and close #8752")
93 def test_KeepSingleCopyRWTest(self):
94 blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
95 blob_locator = self.keep_client.put(blob_data, copies=1)
98 '^c902006bc98a3eb4a3663b65ab4a6fab\+8',
99 ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
100 self.assertEqual(self.keep_client.get(blob_locator),
102 'wrong content from Keep.get(md5(<binarydata>))')
104 def test_KeepEmptyCollectionTest(self):
105 blob_locator = self.keep_client.put('', copies=1)
108 '^d41d8cd98f00b204e9800998ecf8427e\+0',
109 ('wrong locator from Keep.put(""): ' + blob_locator))
111 def test_unicode_must_be_ascii(self):
112 # If unicode type, must only consist of valid ASCII
113 foo_locator = self.keep_client.put(u'foo')
116 '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
117 'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
119 if sys.version_info < (3, 0):
120 with self.assertRaises(UnicodeEncodeError):
121 # Error if it is not ASCII
122 self.keep_client.put(u'\xe2')
124 with self.assertRaises(AttributeError):
125 # Must be bytes or have an encode() method
126 self.keep_client.put({})
128 def test_KeepHeadTest(self):
129 locator = self.keep_client.put('test_head')
132 '^b9a772c7049325feb7130fff1f8333e9\+9',
133 'wrong md5 hash from Keep.put for "test_head": ' + locator)
134 self.assertEqual(True, self.keep_client.head(locator))
135 self.assertEqual(self.keep_client.get(locator),
137 'wrong content from Keep.get for "test_head"')
139 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
140 class KeepPermissionTestCase(run_test_server.TestCaseWithServers):
143 KEEP_SERVER = {'blob_signing': True}
145 def test_KeepBasicRWTest(self):
146 run_test_server.authorize_with('active')
147 keep_client = arvados.KeepClient(block_cache=make_block_cache(self.disk_cache))
148 foo_locator = keep_client.put('foo')
151 r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
152 'invalid locator from Keep.put("foo"): ' + foo_locator)
153 self.assertEqual(keep_client.get(foo_locator),
155 'wrong content from Keep.get(md5("foo"))')
157 # GET with an unsigned locator => NotFound
158 bar_locator = keep_client.put('bar')
159 unsigned_bar_locator = "37b51d194a7513e45b56f6524f2d51f2+3"
162 r'^37b51d194a7513e45b56f6524f2d51f2\+3\+A[a-f0-9]+@[a-f0-9]+$',
163 'invalid locator from Keep.put("bar"): ' + bar_locator)
164 self.assertRaises(arvados.errors.NotFoundError,
166 unsigned_bar_locator)
168 # GET from a different user => NotFound
169 run_test_server.authorize_with('spectator')
170 self.assertRaises(arvados.errors.NotFoundError,
174 # Unauthenticated GET for a signed locator => NotFound
175 # Unauthenticated GET for an unsigned locator => NotFound
176 keep_client.api_token = ''
177 self.assertRaises(arvados.errors.NotFoundError,
180 self.assertRaises(arvados.errors.NotFoundError,
182 unsigned_bar_locator)
184 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
185 class KeepProxyTestCase(run_test_server.TestCaseWithServers):
189 KEEP_PROXY_SERVER = {}
193 super(KeepProxyTestCase, cls).setUpClass()
194 run_test_server.authorize_with('active')
195 cls.api_client = arvados.api('v1')
198 super(KeepProxyTestCase, self).tearDown()
200 def test_KeepProxyTest1(self):
201 # Will use ARVADOS_KEEP_SERVICES environment variable that
202 # is set by setUpClass().
203 keep_client = arvados.KeepClient(api_client=self.api_client,
204 local_store='', block_cache=make_block_cache(self.disk_cache))
205 baz_locator = keep_client.put('baz')
208 '^73feffa4b7f6bb68e44cf984c85f6e88\+3',
209 'wrong md5 hash from Keep.put("baz"): ' + baz_locator)
210 self.assertEqual(keep_client.get(baz_locator),
212 'wrong content from Keep.get(md5("baz"))')
213 self.assertTrue(keep_client.using_proxy)
215 def test_KeepProxyTestMultipleURIs(self):
216 # Test using ARVADOS_KEEP_SERVICES env var overriding any
217 # existing proxy setting and setting multiple proxies
218 arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
219 keep_client = arvados.KeepClient(api_client=self.api_client,
221 block_cache=make_block_cache(self.disk_cache))
222 uris = [x['_service_root'] for x in keep_client._keep_services]
223 self.assertEqual(uris, ['http://10.0.0.1/',
224 'https://foo.example.org:1234/'])
226 def test_KeepProxyTestInvalidURI(self):
227 arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org'
228 with self.assertRaises(arvados.errors.ArgumentError):
229 keep_client = arvados.KeepClient(api_client=self.api_client,
231 block_cache=make_block_cache(self.disk_cache))
233 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
234 class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
237 def get_service_roots(self, api_client):
238 keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
239 services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
240 return [urllib.parse.urlparse(url) for url in sorted(services)]
242 def test_ssl_flag_respected_in_roots(self):
243 for ssl_flag in [False, True]:
244 services = self.get_service_roots(self.mock_keep_services(
245 service_ssl_flag=ssl_flag))
247 ('https' if ssl_flag else 'http'), services[0].scheme)
249 def test_correct_ports_with_ipv6_addresses(self):
250 service = self.get_service_roots(self.mock_keep_services(
251 service_type='proxy', service_host='100::1', service_port=10, count=1))[0]
252 self.assertEqual('100::1', service.hostname)
253 self.assertEqual(10, service.port)
255 def test_recognize_proxy_services_in_controller_response(self):
256 keep_client = arvados.KeepClient(api_client=self.mock_keep_services(
257 service_type='proxy', service_host='localhost', service_port=9, count=1),
258 block_cache=make_block_cache(self.disk_cache))
260 # this will fail, but it ensures we get the service
262 keep_client.put('baz2')
265 self.assertTrue(keep_client.using_proxy)
267 def test_insecure_disables_tls_verify(self):
268 api_client = self.mock_keep_services(count=1)
269 force_timeout = socket.timeout("timed out")
271 api_client.insecure = True
272 with tutil.mock_keep_responses(b'foo', 200) as mock:
273 keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
274 keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
276 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
279 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
282 api_client.insecure = False
283 with tutil.mock_keep_responses(b'foo', 200) as mock:
284 keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
285 keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
286 # getopt()==None here means we didn't change the
287 # default. If we were using real pycurl instead of a mock,
288 # it would return the default value 1.
290 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
293 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
296 def test_refresh_signature(self):
297 blk_digest = '6f5902ac237024bdd0c176cb93063dc4+11'
298 blk_sig = 'da39a3ee5e6b4b0d3255bfef95601890afd80709@53bed294'
299 local_loc = blk_digest+'+A'+blk_sig
300 remote_loc = blk_digest+'+R'+blk_sig
301 api_client = self.mock_keep_services(count=1)
302 headers = {'X-Keep-Locator':local_loc}
303 with tutil.mock_keep_responses('', 200, **headers):
304 # Check that the translated locator gets returned
305 keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
306 self.assertEqual(local_loc, keep_client.refresh_signature(remote_loc))
307 # Check that refresh_signature() uses the correct method and headers
308 keep_client._get_or_head = mock.MagicMock()
309 keep_client.refresh_signature(remote_loc)
310 args, kwargs = keep_client._get_or_head.call_args_list[0]
311 self.assertIn(remote_loc, args)
312 self.assertEqual("HEAD", kwargs['method'])
313 self.assertIn('X-Keep-Signature', kwargs['headers'])
315 # test_*_timeout verify that KeepClient instructs pycurl to use
316 # the appropriate connection and read timeouts. They don't care
317 # whether pycurl actually exhibits the expected timeout behavior
318 # -- those tests are in the KeepClientTimeout test class.
320 def test_get_timeout(self):
321 api_client = self.mock_keep_services(count=1)
322 force_timeout = socket.timeout("timed out")
323 with tutil.mock_keep_responses(force_timeout, 0) as mock:
324 keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
325 with self.assertRaises(arvados.errors.KeepReadError):
326 keep_client.get('ffffffffffffffffffffffffffffffff')
328 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
329 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
331 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
332 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
334 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
335 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
337 def test_put_timeout(self):
338 api_client = self.mock_keep_services(count=1)
339 force_timeout = socket.timeout("timed out")
340 with tutil.mock_keep_responses(force_timeout, 0) as mock:
341 keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
342 with self.assertRaises(arvados.errors.KeepWriteError):
343 keep_client.put(b'foo')
345 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
346 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
348 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
349 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
351 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
352 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
354 def test_head_timeout(self):
355 api_client = self.mock_keep_services(count=1)
356 force_timeout = socket.timeout("timed out")
357 with tutil.mock_keep_responses(force_timeout, 0) as mock:
358 keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
359 with self.assertRaises(arvados.errors.KeepReadError):
360 keep_client.head('ffffffffffffffffffffffffffffffff')
362 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
363 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
365 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
368 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
371 def test_proxy_get_timeout(self):
372 api_client = self.mock_keep_services(service_type='proxy', count=1)
373 force_timeout = socket.timeout("timed out")
374 with tutil.mock_keep_responses(force_timeout, 0) as mock:
375 keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
376 with self.assertRaises(arvados.errors.KeepReadError):
377 keep_client.get('ffffffffffffffffffffffffffffffff')
379 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
380 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
382 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
383 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
385 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
386 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
388 def test_proxy_head_timeout(self):
389 api_client = self.mock_keep_services(service_type='proxy', count=1)
390 force_timeout = socket.timeout("timed out")
391 with tutil.mock_keep_responses(force_timeout, 0) as mock:
392 keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
393 with self.assertRaises(arvados.errors.KeepReadError):
394 keep_client.head('ffffffffffffffffffffffffffffffff')
396 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
397 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
399 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
402 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
405 def test_proxy_put_timeout(self):
406 api_client = self.mock_keep_services(service_type='proxy', count=1)
407 force_timeout = socket.timeout("timed out")
408 with tutil.mock_keep_responses(force_timeout, 0) as mock:
409 keep_client = arvados.KeepClient(api_client=api_client)
410 with self.assertRaises(arvados.errors.KeepWriteError):
411 keep_client.put('foo')
413 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
414 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
416 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
417 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
419 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
420 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
422 def check_no_services_error(self, verb, exc_class):
423 api_client = mock.MagicMock(name='api_client')
424 api_client.keep_services().accessible().execute.side_effect = (
425 arvados.errors.ApiError)
426 keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
427 with self.assertRaises(exc_class) as err_check:
428 getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
429 self.assertEqual(0, len(err_check.exception.request_errors()))
431 def test_get_error_with_no_services(self):
432 self.check_no_services_error('get', arvados.errors.KeepReadError)
434 def test_head_error_with_no_services(self):
435 self.check_no_services_error('head', arvados.errors.KeepReadError)
437 def test_put_error_with_no_services(self):
438 self.check_no_services_error('put', arvados.errors.KeepWriteError)
440 def check_errors_from_last_retry(self, verb, exc_class):
441 api_client = self.mock_keep_services(count=2)
442 req_mock = tutil.mock_keep_responses(
443 "retry error reporting test", 500, 500, 500, 500, 500, 500, 502, 502)
444 with req_mock, tutil.skip_sleep, \
445 self.assertRaises(exc_class) as err_check:
446 keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
447 getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
449 self.assertEqual([502, 502], [
450 getattr(error, 'status_code', None)
451 for error in err_check.exception.request_errors().values()])
452 self.assertRegex(str(err_check.exception), r'failed to (read|write) .* after 4 attempts')
454 def test_get_error_reflects_last_retry(self):
455 self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
457 def test_head_error_reflects_last_retry(self):
458 self.check_errors_from_last_retry('head', arvados.errors.KeepReadError)
460 def test_put_error_reflects_last_retry(self):
461 self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
463 def test_put_error_does_not_include_successful_puts(self):
464 data = 'partial failure test'
465 data_loc = tutil.str_keep_locator(data)
466 api_client = self.mock_keep_services(count=3)
467 with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
468 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
469 keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
470 keep_client.put(data)
471 self.assertEqual(2, len(exc_check.exception.request_errors()))
473 def test_proxy_put_with_no_writable_services(self):
474 data = 'test with no writable services'
475 data_loc = tutil.str_keep_locator(data)
476 api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
477 with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
478 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
479 keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
480 keep_client.put(data)
481 self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
482 self.assertEqual(0, len(exc_check.exception.request_errors()))
484 def test_oddball_service_get(self):
485 body = b'oddball service get'
486 api_client = self.mock_keep_services(service_type='fancynewblobstore')
487 with tutil.mock_keep_responses(body, 200):
488 keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
489 actual = keep_client.get(tutil.str_keep_locator(body))
490 self.assertEqual(body, actual)
492 def test_oddball_service_put(self):
493 body = b'oddball service put'
494 pdh = tutil.str_keep_locator(body)
495 api_client = self.mock_keep_services(service_type='fancynewblobstore')
496 with tutil.mock_keep_responses(pdh, 200):
497 keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
498 actual = keep_client.put(body, copies=1)
499 self.assertEqual(pdh, actual)
501 def test_oddball_service_writer_count(self):
502 body = b'oddball service writer count'
503 pdh = tutil.str_keep_locator(body)
504 api_client = self.mock_keep_services(service_type='fancynewblobstore',
506 headers = {'x-keep-replicas-stored': 3}
507 with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
508 **headers) as req_mock:
509 keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
510 actual = keep_client.put(body, copies=2)
511 self.assertEqual(pdh, actual)
512 self.assertEqual(1, req_mock.call_count)
515 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
516 class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
520 self.api_client = self.mock_keep_services(count=2)
521 self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=make_block_cache(self.disk_cache))
523 self.locator = '1271ed5ef305aadabc605b1609e24c52'
525 @mock.patch('arvados.KeepClient.KeepService.get')
526 def test_get_request_cache(self, get_mock):
527 with tutil.mock_keep_responses(self.data, 200, 200):
528 self.keep_client.get(self.locator)
529 self.keep_client.get(self.locator)
530 # Request already cached, don't require more than one request
531 get_mock.assert_called_once()
533 @mock.patch('arvados.KeepClient.KeepService.get')
534 def test_head_request_cache(self, get_mock):
535 with tutil.mock_keep_responses(self.data, 200, 200):
536 self.keep_client.head(self.locator)
537 self.keep_client.head(self.locator)
538 # Don't cache HEAD requests so that they're not confused with GET reqs
539 self.assertEqual(2, get_mock.call_count)
541 @mock.patch('arvados.KeepClient.KeepService.get')
542 def test_head_and_then_get_return_different_responses(self, get_mock):
545 get_mock.side_effect = [b'first response', b'second response']
546 with tutil.mock_keep_responses(self.data, 200, 200):
547 head_resp = self.keep_client.head(self.locator)
548 get_resp = self.keep_client.get(self.locator)
549 self.assertEqual(b'first response', head_resp)
550 # First reponse was not cached because it was from a HEAD request.
551 self.assertNotEqual(head_resp, get_resp)
554 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
555 class KeepStorageClassesTestCase(unittest.TestCase, tutil.ApiClientMock):
559 self.api_client = self.mock_keep_services(count=2)
560 self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=make_block_cache(self.disk_cache))
562 self.locator = '1271ed5ef305aadabc605b1609e24c52'
564 def test_multiple_default_storage_classes_req_header(self):
565 api_mock = self.api_client_mock()
566 api_mock.config.return_value = {
568 'foo': { 'Default': True },
569 'bar': { 'Default': True },
570 'baz': { 'Default': False }
573 api_client = self.mock_keep_services(api_mock=api_mock, count=2)
574 keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
576 'x-keep-storage-classes-confirmed': 'foo=1, bar=1',
577 'x-keep-replicas-stored': 1
579 with tutil.mock_keep_responses(self.locator, 200, **resp_hdr) as mock:
580 keep_client.put(self.data, copies=1)
581 req_hdr = mock.responses[0]
583 'X-Keep-Storage-Classes: bar, foo', req_hdr.getopt(pycurl.HTTPHEADER))
585 def test_storage_classes_req_header(self):
587 self.api_client.config()['StorageClasses'],
588 {'default': {'Default': True}})
590 # requested, expected
591 [['foo'], 'X-Keep-Storage-Classes: foo'],
592 [['bar', 'foo'], 'X-Keep-Storage-Classes: bar, foo'],
593 [[], 'X-Keep-Storage-Classes: default'],
594 [None, 'X-Keep-Storage-Classes: default'],
596 for req_classes, expected_header in cases:
597 headers = {'x-keep-replicas-stored': 1}
598 if req_classes is None or len(req_classes) == 0:
599 confirmed_hdr = 'default=1'
600 elif len(req_classes) > 0:
601 confirmed_hdr = ', '.join(["{}=1".format(cls) for cls in req_classes])
602 headers.update({'x-keep-storage-classes-confirmed': confirmed_hdr})
603 with tutil.mock_keep_responses(self.locator, 200, **headers) as mock:
604 self.keep_client.put(self.data, copies=1, classes=req_classes)
605 req_hdr = mock.responses[0]
606 self.assertIn(expected_header, req_hdr.getopt(pycurl.HTTPHEADER))
608 def test_partial_storage_classes_put(self):
610 'x-keep-replicas-stored': 1,
611 'x-keep-storage-classes-confirmed': 'foo=1'}
612 with tutil.mock_keep_responses(self.locator, 200, 503, **headers) as mock:
613 with self.assertRaises(arvados.errors.KeepWriteError):
614 self.keep_client.put(self.data, copies=1, classes=['foo', 'bar'])
615 # 1st request, both classes pending
616 req1_headers = mock.responses[0].getopt(pycurl.HTTPHEADER)
617 self.assertIn('X-Keep-Storage-Classes: bar, foo', req1_headers)
618 # 2nd try, 'foo' class already satisfied
619 req2_headers = mock.responses[1].getopt(pycurl.HTTPHEADER)
620 self.assertIn('X-Keep-Storage-Classes: bar', req2_headers)
622 def test_successful_storage_classes_put_requests(self):
624 # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, expected_requests
625 [ 1, ['foo'], 1, 'foo=1', 1],
626 [ 1, ['foo'], 2, 'foo=2', 1],
627 [ 2, ['foo'], 2, 'foo=2', 1],
628 [ 2, ['foo'], 1, 'foo=1', 2],
629 [ 1, ['foo', 'bar'], 1, 'foo=1, bar=1', 1],
630 [ 1, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
631 [ 2, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
632 [ 2, ['foo', 'bar'], 1, 'foo=1, bar=1', 2],
633 [ 1, ['foo', 'bar'], 1, None, 1],
634 [ 1, ['foo'], 1, None, 1],
635 [ 2, ['foo'], 2, None, 1],
636 [ 2, ['foo'], 1, None, 2],
638 for w_copies, w_classes, c_copies, c_classes, e_reqs in cases:
639 headers = {'x-keep-replicas-stored': c_copies}
640 if c_classes is not None:
641 headers.update({'x-keep-storage-classes-confirmed': c_classes})
642 with tutil.mock_keep_responses(self.locator, 200, 200, **headers) as mock:
643 case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}", expected_requests={}'.format(w_copies, ', '.join(w_classes), c_copies, c_classes, e_reqs)
644 self.assertEqual(self.locator,
645 self.keep_client.put(self.data, copies=w_copies, classes=w_classes),
647 self.assertEqual(e_reqs, mock.call_count, case_desc)
649 def test_failed_storage_classes_put_requests(self):
651 # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, return_code
652 [ 1, ['foo'], 1, 'bar=1', 200],
653 [ 1, ['foo'], 1, None, 503],
654 [ 2, ['foo'], 1, 'bar=1, foo=0', 200],
655 [ 3, ['foo'], 1, 'bar=1, foo=1', 200],
656 [ 3, ['foo', 'bar'], 1, 'bar=2, foo=1', 200],
658 for w_copies, w_classes, c_copies, c_classes, return_code in cases:
659 headers = {'x-keep-replicas-stored': c_copies}
660 if c_classes is not None:
661 headers.update({'x-keep-storage-classes-confirmed': c_classes})
662 with tutil.mock_keep_responses(self.locator, return_code, return_code, **headers):
663 case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}"'.format(w_copies, ', '.join(w_classes), c_copies, c_classes)
664 with self.assertRaises(arvados.errors.KeepWriteError, msg=case_desc):
665 self.keep_client.put(self.data, copies=w_copies, classes=w_classes)
668 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
669 class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock):
673 self.api_client = self.mock_keep_services(count=2)
674 self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=make_block_cache(self.disk_cache))
676 self.locator = '1271ed5ef305aadabc605b1609e24c52'
677 self.test_id = arvados.util.new_request_id()
678 self.assertRegex(self.test_id, r'^req-[a-z0-9]{20}$')
679 # If we don't set request_id to None explicitly here, it will
680 # return <MagicMock name='api_client_mock.request_id'
682 self.api_client.request_id = None
684 def test_default_to_api_client_request_id(self):
685 self.api_client.request_id = self.test_id
686 with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
687 self.keep_client.put(self.data)
688 self.assertEqual(2, len(mock.responses))
689 for resp in mock.responses:
690 self.assertProvidedRequestId(resp)
692 with tutil.mock_keep_responses(self.data, 200) as mock:
693 self.keep_client.get(self.locator)
694 self.assertProvidedRequestId(mock.responses[0])
696 with tutil.mock_keep_responses(b'', 200) as mock:
697 self.keep_client.head(self.locator)
698 self.assertProvidedRequestId(mock.responses[0])
700 def test_explicit_request_id(self):
701 with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
702 self.keep_client.put(self.data, request_id=self.test_id)
703 self.assertEqual(2, len(mock.responses))
704 for resp in mock.responses:
705 self.assertProvidedRequestId(resp)
707 with tutil.mock_keep_responses(self.data, 200) as mock:
708 self.keep_client.get(self.locator, request_id=self.test_id)
709 self.assertProvidedRequestId(mock.responses[0])
711 with tutil.mock_keep_responses(b'', 200) as mock:
712 self.keep_client.head(self.locator, request_id=self.test_id)
713 self.assertProvidedRequestId(mock.responses[0])
715 def test_automatic_request_id(self):
716 with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
717 self.keep_client.put(self.data)
718 self.assertEqual(2, len(mock.responses))
719 for resp in mock.responses:
720 self.assertAutomaticRequestId(resp)
722 with tutil.mock_keep_responses(self.data, 200) as mock:
723 self.keep_client.get(self.locator)
724 self.assertAutomaticRequestId(mock.responses[0])
726 with tutil.mock_keep_responses(b'', 200) as mock:
727 self.keep_client.head(self.locator)
728 self.assertAutomaticRequestId(mock.responses[0])
730 def test_request_id_in_exception(self):
731 with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
732 with self.assertRaisesRegex(arvados.errors.KeepReadError, self.test_id):
733 self.keep_client.head(self.locator, request_id=self.test_id)
735 with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
736 with self.assertRaisesRegex(arvados.errors.KeepReadError, r'req-[a-z0-9]{20}'):
737 self.keep_client.get(self.locator)
739 with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
740 with self.assertRaisesRegex(arvados.errors.KeepWriteError, self.test_id):
741 self.keep_client.put(self.data, request_id=self.test_id)
743 with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
744 with self.assertRaisesRegex(arvados.errors.KeepWriteError, r'req-[a-z0-9]{20}'):
745 self.keep_client.put(self.data)
747 def assertAutomaticRequestId(self, resp):
748 hdr = [x for x in resp.getopt(pycurl.HTTPHEADER)
749 if x.startswith('X-Request-Id: ')][0]
750 self.assertNotEqual(hdr, 'X-Request-Id: '+self.test_id)
751 self.assertRegex(hdr, r'^X-Request-Id: req-[a-z0-9]{20}$')
753 def assertProvidedRequestId(self, resp):
754 self.assertIn('X-Request-Id: '+self.test_id,
755 resp.getopt(pycurl.HTTPHEADER))
759 #@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
760 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
764 # expected_order[i] is the probe order for
765 # hash=md5(sprintf("%064x",i)) where there are 16 services
766 # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
767 # the first probe for the block consisting of 64 "0"
768 # characters is the service whose uuid is
769 # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
771 self.expected_order = [
772 list('3eab2d5fc9681074'),
773 list('097dba52e648f1c3'),
774 list('c5b4e023f8a7d691'),
775 list('9d81c02e76a3bf54'),
778 "{:064x}".format(x).encode()
779 for x in range(len(self.expected_order))]
781 hashlib.md5(self.blocks[x]).hexdigest()
782 for x in range(len(self.expected_order))]
783 self.api_client = self.mock_keep_services(count=self.services)
784 self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=make_block_cache(self.disk_cache))
786 def test_weighted_service_roots_against_reference_set(self):
787 # Confirm weighted_service_roots() returns the correct order
788 for i, hash in enumerate(self.hashes):
789 roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash))
791 re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
793 self.assertEqual(self.expected_order[i], got_order)
795 def test_get_probe_order_against_reference_set(self):
796 self._test_probe_order_against_reference_set(
797 lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
799 def test_head_probe_order_against_reference_set(self):
800 self._test_probe_order_against_reference_set(
801 lambda i: self.keep_client.head(self.hashes[i], num_retries=1))
803 def test_put_probe_order_against_reference_set(self):
804 # copies=1 prevents the test from being sensitive to races
805 # between writer threads.
806 self._test_probe_order_against_reference_set(
807 lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1))
809 def _test_probe_order_against_reference_set(self, op):
810 for i in range(len(self.blocks)):
811 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \
812 self.assertRaises(arvados.errors.KeepRequestError):
815 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
816 for resp in mock.responses]
817 self.assertEqual(self.expected_order[i]*2, got_order)
819 def test_put_probe_order_multiple_copies(self):
820 for copies in range(2, 4):
821 for i in range(len(self.blocks)):
822 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \
823 self.assertRaises(arvados.errors.KeepWriteError):
824 self.keep_client.put(self.blocks[i], num_retries=2, copies=copies)
826 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
827 for resp in mock.responses]
828 # With T threads racing to make requests, the position
829 # of a given server in the sequence of HTTP requests
830 # (got_order) cannot be more than T-1 positions
831 # earlier than that server's position in the reference
832 # probe sequence (expected_order).
834 # Loop invariant: we have accounted for +pos+ expected
835 # probes, either by seeing them in +got_order+ or by
836 # putting them in +pending+ in the hope of seeing them
837 # later. As long as +len(pending)<T+, we haven't
838 # started a request too early.
840 for pos, expected in enumerate(self.expected_order[i]*3):
841 got = got_order[pos-len(pending)]
842 while got in pending:
843 del pending[pending.index(got)]
844 got = got_order[pos-len(pending)]
846 pending.append(expected)
848 len(pending), copies,
849 "pending={}, with copies={}, got {}, expected {}".format(
850 pending, copies, repr(got_order), repr(self.expected_order[i]*3)))
852 def test_probe_waste_adding_one_server(self):
854 hashlib.md5("{:064x}".format(x).encode()).hexdigest() for x in range(100)]
855 initial_services = 12
856 self.api_client = self.mock_keep_services(count=initial_services)
857 self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=make_block_cache(self.disk_cache))
859 self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
860 for added_services in range(1, 12):
861 api_client = self.mock_keep_services(count=initial_services+added_services)
862 keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
864 for hash_index in range(len(hashes)):
865 probe_after = keep_client.weighted_service_roots(
866 arvados.KeepLocator(hashes[hash_index]))
867 penalty = probe_after.index(probes_before[hash_index][0])
868 self.assertLessEqual(penalty, added_services)
869 total_penalty += penalty
870 # Average penalty per block should not exceed
871 # N(added)/N(orig) by more than 20%, and should get closer
872 # to the ideal as we add data points.
875 len(hashes) / initial_services)
878 (120 - added_services)/100)
880 expect_penalty * 8/10)
882 min_penalty <= total_penalty <= max_penalty,
883 "With {}+{} services, {} blocks, penalty {} but expected {}..{}".format(
891 def check_64_zeros_error_order(self, verb, exc_class):
894 data = tutil.str_keep_locator(data)
895 # Arbitrary port number:
896 aport = random.randint(1024,65535)
897 api_client = self.mock_keep_services(service_port=aport, count=self.services)
898 keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
899 with mock.patch('pycurl.Curl') as curl_mock, \
900 self.assertRaises(exc_class) as err_check:
901 curl_mock.return_value = tutil.FakeCurl.make(code=500, body=b'')
902 getattr(keep_client, verb)(data)
903 urls = [urllib.parse.urlparse(url)
904 for url in err_check.exception.request_errors()]
905 self.assertEqual([('keep0x' + c, aport) for c in '3eab2d5fc9681074'],
906 [(url.hostname, url.port) for url in urls])
908 def test_get_error_shows_probe_order(self):
909 self.check_64_zeros_error_order('get', arvados.errors.KeepReadError)
911 def test_put_error_shows_probe_order(self):
912 self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
914 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
915 class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase):
918 # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
919 # 1s worth of data and then trigger bandwidth errors before running
922 BANDWIDTH_LOW_LIM = 1024
925 class assertTakesBetween(unittest.TestCase):
926 def __init__(self, tmin, tmax):
931 self.t0 = time.time()
933 def __exit__(self, *args, **kwargs):
934 # Round times to milliseconds, like CURL. Otherwise, we
935 # fail when CURL reaches a 1s timeout at 0.9998s.
936 delta = round(time.time() - self.t0, 3)
937 self.assertGreaterEqual(delta, self.tmin)
938 self.assertLessEqual(delta, self.tmax)
940 class assertTakesGreater(unittest.TestCase):
941 def __init__(self, tmin):
945 self.t0 = time.time()
947 def __exit__(self, *args, **kwargs):
948 delta = round(time.time() - self.t0, 3)
949 self.assertGreaterEqual(delta, self.tmin)
951 def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
952 return arvados.KeepClient(
953 api_client=self.api_client,
954 timeout=timeouts, block_cache=make_block_cache(self.disk_cache))
956 def test_timeout_slow_connect(self):
957 # Can't simulate TCP delays with our own socket. Leave our
958 # stub server running uselessly, and try to connect to an
959 # unroutable IP address instead.
960 self.api_client = self.mock_keep_services(
962 service_host='240.0.0.0',
964 with self.assertTakesBetween(0.1, 0.5):
965 with self.assertRaises(arvados.errors.KeepWriteError):
966 self.keepClient().put(self.DATA, copies=1, num_retries=0)
968 def test_low_bandwidth_no_delays_success(self):
969 self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
970 kc = self.keepClient()
971 loc = kc.put(self.DATA, copies=1, num_retries=0)
972 self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
974 def test_too_low_bandwidth_no_delays_failure(self):
975 # Check that lessening bandwidth corresponds to failing
976 kc = self.keepClient()
977 loc = kc.put(self.DATA, copies=1, num_retries=0)
978 self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
979 with self.assertTakesGreater(self.TIMEOUT_TIME):
980 with self.assertRaises(arvados.errors.KeepReadError):
981 kc.get(loc, num_retries=0)
982 with self.assertTakesGreater(self.TIMEOUT_TIME):
983 with self.assertRaises(arvados.errors.KeepWriteError):
984 kc.put(self.DATA, copies=1, num_retries=0)
986 def test_low_bandwidth_with_server_response_delay_failure(self):
987 kc = self.keepClient()
988 loc = kc.put(self.DATA, copies=1, num_retries=0)
989 self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
990 # Note the actual delay must be 1s longer than the low speed
991 # limit interval in order for curl to detect it reliably.
992 self.server.setdelays(response=self.TIMEOUT_TIME+1)
993 with self.assertTakesGreater(self.TIMEOUT_TIME):
994 with self.assertRaises(arvados.errors.KeepReadError):
995 kc.get(loc, num_retries=0)
996 with self.assertTakesGreater(self.TIMEOUT_TIME):
997 with self.assertRaises(arvados.errors.KeepWriteError):
998 kc.put(self.DATA, copies=1, num_retries=0)
999 with self.assertTakesGreater(self.TIMEOUT_TIME):
1000 kc.head(loc, num_retries=0)
1002 def test_low_bandwidth_with_server_mid_delay_failure(self):
1003 kc = self.keepClient()
1004 loc = kc.put(self.DATA, copies=1, num_retries=0)
1005 self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
1006 # Note the actual delay must be 1s longer than the low speed
1007 # limit interval in order for curl to detect it reliably.
1008 self.server.setdelays(mid_write=self.TIMEOUT_TIME+1, mid_read=self.TIMEOUT_TIME+1)
1009 with self.assertTakesGreater(self.TIMEOUT_TIME):
1010 with self.assertRaises(arvados.errors.KeepReadError) as e:
1011 kc.get(loc, num_retries=0)
1012 with self.assertTakesGreater(self.TIMEOUT_TIME):
1013 with self.assertRaises(arvados.errors.KeepWriteError):
1014 kc.put(self.DATA, copies=1, num_retries=0)
1016 def test_timeout_slow_request(self):
1017 loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
1018 self.server.setdelays(request=.2)
1019 self._test_connect_timeout_under_200ms(loc)
1020 self.server.setdelays(request=2)
1021 self._test_response_timeout_under_2s(loc)
1023 def test_timeout_slow_response(self):
1024 loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
1025 self.server.setdelays(response=.2)
1026 self._test_connect_timeout_under_200ms(loc)
1027 self.server.setdelays(response=2)
1028 self._test_response_timeout_under_2s(loc)
1030 def test_timeout_slow_response_body(self):
1031 loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
1032 self.server.setdelays(response_body=.2)
1033 self._test_connect_timeout_under_200ms(loc)
1034 self.server.setdelays(response_body=2)
1035 self._test_response_timeout_under_2s(loc)
1037 def _test_connect_timeout_under_200ms(self, loc):
1038 # Allow 100ms to connect, then 1s for response. Everything
1039 # should work, and everything should take at least 200ms to
1041 kc = self.keepClient(timeouts=(.1, 1))
1042 with self.assertTakesBetween(.2, .3):
1043 kc.put(self.DATA, copies=1, num_retries=0)
1044 with self.assertTakesBetween(.2, .3):
1045 self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
1047 def _test_response_timeout_under_2s(self, loc):
1048 # Allow 10s to connect, then 1s for response. Nothing should
1049 # work, and everything should take at least 1s to return.
1050 kc = self.keepClient(timeouts=(10, 1))
1051 with self.assertTakesBetween(1, 9):
1052 with self.assertRaises(arvados.errors.KeepReadError):
1053 kc.get(loc, num_retries=0)
1054 with self.assertTakesBetween(1, 9):
1055 with self.assertRaises(arvados.errors.KeepWriteError):
1056 kc.put(self.DATA, copies=1, num_retries=0)
1058 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1059 class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
1062 def mock_disks_and_gateways(self, disks=3, gateways=1):
1064 'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
1065 'owner_uuid': 'zzzzz-tpzed-000000000000000',
1066 'service_host': 'gatewayhost{}'.format(i),
1067 'service_port': 12345,
1068 'service_ssl_flag': True,
1069 'service_type': 'gateway:test',
1070 } for i in range(gateways)]
1071 self.gateway_roots = [
1072 "https://{service_host}:{service_port}/".format(**gw)
1073 for gw in self.gateways]
1074 self.api_client = self.mock_keep_services(
1075 count=disks, additional_services=self.gateways)
1076 self.keepClient = arvados.KeepClient(api_client=self.api_client, block_cache=make_block_cache(self.disk_cache))
1078 @mock.patch('pycurl.Curl')
1079 def test_get_with_gateway_hint_first(self, MockCurl):
1080 MockCurl.return_value = tutil.FakeCurl.make(
1081 code=200, body='foo', headers={'Content-Length': 3})
1082 self.mock_disks_and_gateways()
1083 locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@' + self.gateways[0]['uuid']
1084 self.assertEqual(b'foo', self.keepClient.get(locator))
1085 self.assertEqual(self.gateway_roots[0]+locator,
1086 MockCurl.return_value.getopt(pycurl.URL).decode())
1087 self.assertEqual(True, self.keepClient.head(locator))
1089 @mock.patch('pycurl.Curl')
1090 def test_get_with_gateway_hints_in_order(self, MockCurl):
1094 tutil.FakeCurl.make(code=404, body='')
1095 for _ in range(gateways+disks)
1097 MockCurl.side_effect = tutil.queue_with(mocks)
1098 self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1099 locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1100 ['K@'+gw['uuid'] for gw in self.gateways])
1101 with self.assertRaises(arvados.errors.NotFoundError):
1102 self.keepClient.get(locator)
1103 # Gateways are tried first, in the order given.
1104 for i, root in enumerate(self.gateway_roots):
1105 self.assertEqual(root+locator,
1106 mocks[i].getopt(pycurl.URL).decode())
1107 # Disk services are tried next.
1108 for i in range(gateways, gateways+disks):
1110 mocks[i].getopt(pycurl.URL).decode(),
1113 @mock.patch('pycurl.Curl')
1114 def test_head_with_gateway_hints_in_order(self, MockCurl):
1118 tutil.FakeCurl.make(code=404, body=b'')
1119 for _ in range(gateways+disks)
1121 MockCurl.side_effect = tutil.queue_with(mocks)
1122 self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1123 locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1124 ['K@'+gw['uuid'] for gw in self.gateways])
1125 with self.assertRaises(arvados.errors.NotFoundError):
1126 self.keepClient.head(locator)
1127 # Gateways are tried first, in the order given.
1128 for i, root in enumerate(self.gateway_roots):
1129 self.assertEqual(root+locator,
1130 mocks[i].getopt(pycurl.URL).decode())
1131 # Disk services are tried next.
1132 for i in range(gateways, gateways+disks):
1134 mocks[i].getopt(pycurl.URL).decode(),
1137 @mock.patch('pycurl.Curl')
1138 def test_get_with_remote_proxy_hint(self, MockCurl):
1139 MockCurl.return_value = tutil.FakeCurl.make(
1140 code=200, body=b'foo', headers={'Content-Length': 3})
1141 self.mock_disks_and_gateways()
1142 locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1143 self.assertEqual(b'foo', self.keepClient.get(locator))
1144 self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1145 MockCurl.return_value.getopt(pycurl.URL).decode())
1147 @mock.patch('pycurl.Curl')
1148 def test_head_with_remote_proxy_hint(self, MockCurl):
1149 MockCurl.return_value = tutil.FakeCurl.make(
1150 code=200, body=b'foo', headers={'Content-Length': 3})
1151 self.mock_disks_and_gateways()
1152 locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1153 self.assertEqual(True, self.keepClient.head(locator))
1154 self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1155 MockCurl.return_value.getopt(pycurl.URL).decode())
1157 class KeepClientRetryTestMixin(object):
1160 # Testing with a local Keep store won't exercise the retry behavior.
1161 # Instead, our strategy is:
1162 # * Create a client with one proxy specified (pointed at a black
1163 # hole), so there's no need to instantiate an API client, and
1164 # all HTTP requests come from one place.
1165 # * Mock httplib's request method to provide simulated responses.
1166 # This lets us test the retry logic extensively without relying on any
1167 # supporting servers, and prevents side effects in case something hiccups.
1168 # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
1171 # Test classes must define TEST_PATCHER to a method that mocks
1172 # out appropriate methods in the client.
1174 PROXY_ADDR = 'http://[%s]:65535/' % (tutil.TEST_HOST,)
1175 TEST_DATA = b'testdata'
1176 TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
1179 self.client_kwargs = {'proxy': self.PROXY_ADDR, 'local_store': ''}
1181 def new_client(self, **caller_kwargs):
1182 kwargs = self.client_kwargs.copy()
1183 kwargs.update(caller_kwargs)
1184 kwargs['block_cache'] = make_block_cache(self.disk_cache)
1185 return arvados.KeepClient(**kwargs)
1187 def run_method(self, *args, **kwargs):
1188 raise NotImplementedError("test subclasses must define run_method")
1190 def check_success(self, expected=None, *args, **kwargs):
1191 if expected is None:
1192 expected = self.DEFAULT_EXPECT
1193 self.assertEqual(expected, self.run_method(*args, **kwargs))
1195 def check_exception(self, error_class=None, *args, **kwargs):
1196 if error_class is None:
1197 error_class = self.DEFAULT_EXCEPTION
1198 with self.assertRaises(error_class) as err:
1199 self.run_method(*args, **kwargs)
1202 def test_immediate_success(self):
1203 with self.TEST_PATCHER(self.DEFAULT_EXPECT, 200):
1204 self.check_success()
1206 def test_retry_then_success(self):
1207 with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1208 self.check_success(num_retries=3)
1210 def test_exception_then_success(self):
1211 with self.TEST_PATCHER(self.DEFAULT_EXPECT, Exception('mock err'), 200):
1212 self.check_success(num_retries=3)
1214 def test_no_default_retry(self):
1215 with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1216 self.check_exception()
1218 def test_no_retry_after_permanent_error(self):
1219 with self.TEST_PATCHER(self.DEFAULT_EXPECT, 403, 200):
1220 self.check_exception(num_retries=3)
1222 def test_error_after_retries_exhausted(self):
1223 with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 500, 200):
1224 err = self.check_exception(num_retries=1)
1225 self.assertRegex(str(err.exception), r'failed to .* after 2 attempts')
1227 def test_num_retries_instance_fallback(self):
1228 self.client_kwargs['num_retries'] = 3
1229 with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1230 self.check_success()
1234 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1235 class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1236 DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
1237 DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1238 HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1239 TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1241 def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1243 return self.new_client().get(locator, *args, **kwargs)
1245 def test_specific_exception_when_not_found(self):
1246 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1247 self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1249 def test_general_exception_with_mixed_errors(self):
1250 # get should raise a NotFoundError if no server returns the block,
1251 # and a high threshold of servers report that it's not found.
1252 # This test rigs up 50/50 disagreement between two servers, and
1253 # checks that it does not become a NotFoundError.
1254 client = self.new_client()
1255 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1256 with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1257 client.get(self.HINTED_LOCATOR)
1258 self.assertNotIsInstance(
1259 exc_check.exception, arvados.errors.NotFoundError,
1260 "mixed errors raised NotFoundError")
1262 def test_hint_server_can_succeed_without_retries(self):
1263 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1264 self.check_success(locator=self.HINTED_LOCATOR)
1266 def test_try_next_server_after_timeout(self):
1267 with tutil.mock_keep_responses(
1268 (socket.timeout("timed out"), 200),
1269 (self.DEFAULT_EXPECT, 200)):
1270 self.check_success(locator=self.HINTED_LOCATOR)
1272 def test_retry_data_with_wrong_checksum(self):
1273 with tutil.mock_keep_responses(
1275 (self.DEFAULT_EXPECT, 200)):
1276 self.check_success(locator=self.HINTED_LOCATOR)
1279 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1280 class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1281 DEFAULT_EXPECT = True
1282 DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1283 HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1284 TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1286 def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1288 return self.new_client().head(locator, *args, **kwargs)
1290 def test_specific_exception_when_not_found(self):
1291 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1292 self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1294 def test_general_exception_with_mixed_errors(self):
1295 # head should raise a NotFoundError if no server returns the block,
1296 # and a high threshold of servers report that it's not found.
1297 # This test rigs up 50/50 disagreement between two servers, and
1298 # checks that it does not become a NotFoundError.
1299 client = self.new_client()
1300 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1301 with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1302 client.head(self.HINTED_LOCATOR)
1303 self.assertNotIsInstance(
1304 exc_check.exception, arvados.errors.NotFoundError,
1305 "mixed errors raised NotFoundError")
1307 def test_hint_server_can_succeed_without_retries(self):
1308 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1309 self.check_success(locator=self.HINTED_LOCATOR)
1311 def test_try_next_server_after_timeout(self):
1312 with tutil.mock_keep_responses(
1313 (socket.timeout("timed out"), 200),
1314 (self.DEFAULT_EXPECT, 200)):
1315 self.check_success(locator=self.HINTED_LOCATOR)
1318 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1319 class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1320 DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
1321 DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
1322 TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1324 def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
1325 copies=1, *args, **kwargs):
1326 return self.new_client().put(data, copies, *args, **kwargs)
1328 def test_do_not_send_multiple_copies_to_same_server(self):
1329 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200):
1330 self.check_exception(copies=2, num_retries=3)
1333 class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
1335 class FakeKeepService(object):
1336 def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
1338 self.will_succeed = will_succeed
1339 self.will_raise = will_raise
1341 self._result['headers'] = {}
1342 self._result['headers']['x-keep-replicas-stored'] = str(replicas)
1343 self._result['headers']['x-keep-storage-classes-confirmed'] = 'default={}'.format(replicas)
1344 self._result['body'] = 'foobar'
1346 def put(self, data_hash, data, timeout, headers):
1347 time.sleep(self.delay)
1348 if self.will_raise is not None:
1349 raise self.will_raise
1350 return self.will_succeed
1352 def last_result(self):
1353 if self.will_succeed:
1356 return {"status_code": 500, "body": "didn't succeed"}
1363 self.pool = arvados.KeepClient.KeepWriterThreadPool(
1365 data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1366 max_service_replicas = self.copies,
1367 copies = self.copies
1370 def test_only_write_enough_on_success(self):
1372 ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1373 self.pool.add_task(ks, None)
1375 self.assertEqual(self.pool.done(), (self.copies, []))
1377 def test_only_write_enough_on_partial_success(self):
1379 ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
1380 self.pool.add_task(ks, None)
1381 ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1382 self.pool.add_task(ks, None)
1384 self.assertEqual(self.pool.done(), (self.copies, []))
1386 def test_only_write_enough_when_some_crash(self):
1388 ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1389 self.pool.add_task(ks, None)
1390 ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1391 self.pool.add_task(ks, None)
1393 self.assertEqual(self.pool.done(), (self.copies, []))
1395 def test_fail_when_too_many_crash(self):
1396 for i in range(self.copies+1):
1397 ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1398 self.pool.add_task(ks, None)
1399 for i in range(self.copies-1):
1400 ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1401 self.pool.add_task(ks, None)
1403 self.assertEqual(self.pool.done(), (self.copies-1, []))
1407 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1408 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock):
1411 # Test put()s that need two distinct servers to succeed, possibly
1412 # requiring multiple passes through the retry loop.
1415 self.api_client = self.mock_keep_services(count=2)
1416 self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=make_block_cache(self.disk_cache))
1418 def test_success_after_exception(self):
1419 with tutil.mock_keep_responses(
1420 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1421 Exception('mock err'), 200, 200) as req_mock:
1422 self.keep_client.put('foo', num_retries=1, copies=2)
1423 self.assertEqual(3, req_mock.call_count)
1425 def test_success_after_retryable_error(self):
1426 with tutil.mock_keep_responses(
1427 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1428 500, 200, 200) as req_mock:
1429 self.keep_client.put('foo', num_retries=1, copies=2)
1430 self.assertEqual(3, req_mock.call_count)
1432 def test_fail_after_final_error(self):
1433 # First retry loop gets a 200 (can't achieve replication by
1434 # storing again on that server) and a 400 (can't retry that
1435 # server at all), so we shouldn't try a third request.
1436 with tutil.mock_keep_responses(
1437 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1438 200, 400, 200) as req_mock:
1439 with self.assertRaises(arvados.errors.KeepWriteError):
1440 self.keep_client.put('foo', num_retries=1, copies=2)
1441 self.assertEqual(2, req_mock.call_count)
1443 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1444 class KeepClientAPIErrorTest(unittest.TestCase):
1447 def test_api_fail(self):
1448 class ApiMock(object):
1449 def __getattr__(self, r):
1450 if r == "api_token":
1452 elif r == "insecure":
1457 raise arvados.errors.KeepReadError()
1458 keep_client = arvados.KeepClient(api_client=ApiMock(),
1459 proxy='', local_store='',
1460 block_cache=make_block_cache(self.disk_cache))
1462 # The bug this is testing for is that if an API (not
1463 # keepstore) exception is thrown as part of a get(), the next
1464 # attempt to get that same block will result in a deadlock.
1465 # This is why there are two get()s in a row. Unfortunately,
1466 # the failure mode for this test is that the test suite
1467 # deadlocks, there isn't a good way to avoid that without
1468 # adding a special case that has no use except for this test.
1470 with self.assertRaises(arvados.errors.KeepReadError):
1471 keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1472 with self.assertRaises(arvados.errors.KeepReadError):
1473 keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")