1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
21 from unittest import mock
22 from unittest.mock import patch
29 from . import arvados_testutil as tutil
30 from . import keepstub
31 from . import run_test_server
33 from .arvados_testutil import DiskCacheBase
35 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
36 class KeepTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
40 block_cache_test = None
44 super(KeepTestCase, cls).setUpClass()
45 run_test_server.authorize_with("admin")
46 cls.api_client = arvados.api('v1')
47 cls.block_cache_test = DiskCacheBase()
48 cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
49 proxy='', local_store='',
50 block_cache=cls.block_cache_test.make_block_cache(cls.disk_cache))
53 def tearDownClass(cls):
54 super(KeepTestCase, cls).setUpClass()
55 cls.block_cache_test.tearDown()
57 def test_KeepBasicRWTest(self):
58 self.assertEqual(0, self.keep_client.upload_counter.get())
59 foo_locator = self.keep_client.put('foo')
62 r'^acbd18db4cc2f85cedef654fccc4a4d8\+3',
63 'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
65 # 6 bytes because uploaded 2 copies
66 self.assertEqual(6, self.keep_client.upload_counter.get())
68 self.assertEqual(0, self.keep_client.download_counter.get())
69 self.assertTrue(tutil.binary_compare(self.keep_client.get(foo_locator),
71 'wrong content from Keep.get(md5("foo"))')
72 self.assertEqual(3, self.keep_client.download_counter.get())
74 def test_KeepBinaryRWTest(self):
75 blob_str = b'\xff\xfe\xf7\x00\x01\x02'
76 blob_locator = self.keep_client.put(blob_str)
79 r'^7fc7c53b45e53926ba52821140fef396\+6',
80 ('wrong locator from Keep.put(<binarydata>):' + blob_locator))
81 self.assertEqual(self.keep_client.get(blob_locator),
83 'wrong content from Keep.get(md5(<binarydata>))')
85 def test_KeepLongBinaryRWTest(self):
86 blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
87 for i in range(0, 23):
88 blob_data = blob_data + blob_data
89 blob_locator = self.keep_client.put(blob_data)
92 r'^84d90fc0d8175dd5dcfab04b999bc956\+67108864',
93 ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
94 self.assertEqual(self.keep_client.get(blob_locator),
96 'wrong content from Keep.get(md5(<binarydata>))')
98 @unittest.skip("unreliable test - please fix and close #8752")
99 def test_KeepSingleCopyRWTest(self):
100 blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
101 blob_locator = self.keep_client.put(blob_data, copies=1)
104 r'^c902006bc98a3eb4a3663b65ab4a6fab\+8',
105 ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
106 self.assertEqual(self.keep_client.get(blob_locator),
108 'wrong content from Keep.get(md5(<binarydata>))')
110 def test_KeepEmptyCollectionTest(self):
111 blob_locator = self.keep_client.put('', copies=1)
114 r'^d41d8cd98f00b204e9800998ecf8427e\+0',
115 ('wrong locator from Keep.put(""): ' + blob_locator))
117 def test_KeepPutDataType(self):
118 with self.assertRaises(AttributeError):
119 # Must be bytes or have an encode() method
120 self.keep_client.put({})
122 def test_KeepHeadTest(self):
123 locator = self.keep_client.put('test_head')
126 r'^b9a772c7049325feb7130fff1f8333e9\+9',
127 'wrong md5 hash from Keep.put for "test_head": ' + locator)
128 self.assertEqual(True, self.keep_client.head(locator))
129 self.assertEqual(self.keep_client.get(locator),
131 'wrong content from Keep.get for "test_head"')
133 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
134 class KeepPermissionTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
137 KEEP_SERVER = {'blob_signing': True}
140 DiskCacheBase.tearDown(self)
142 def test_KeepBasicRWTest(self):
143 run_test_server.authorize_with('active')
144 keep_client = arvados.KeepClient(block_cache=self.make_block_cache(self.disk_cache))
145 foo_locator = keep_client.put('foo')
148 r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
149 'invalid locator from Keep.put("foo"): ' + foo_locator)
150 self.assertEqual(keep_client.get(foo_locator),
152 'wrong content from Keep.get(md5("foo"))')
154 # GET with an unsigned locator => bad request
155 bar_locator = keep_client.put('bar')
156 unsigned_bar_locator = "37b51d194a7513e45b56f6524f2d51f2+3"
159 r'^37b51d194a7513e45b56f6524f2d51f2\+3\+A[a-f0-9]+@[a-f0-9]+$',
160 'invalid locator from Keep.put("bar"): ' + bar_locator)
161 self.assertRaises(arvados.errors.KeepReadError,
163 unsigned_bar_locator)
165 # GET from a different user => bad request
166 run_test_server.authorize_with('spectator')
167 self.assertRaises(arvados.errors.KeepReadError,
171 # Unauthenticated GET for a signed locator => bad request
172 # Unauthenticated GET for an unsigned locator => bad request
173 keep_client.api_token = ''
174 self.assertRaises(arvados.errors.KeepReadError,
177 self.assertRaises(arvados.errors.KeepReadError,
179 unsigned_bar_locator)
181 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
182 class KeepProxyTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
186 KEEP_PROXY_SERVER = {}
190 super(KeepProxyTestCase, cls).setUpClass()
191 run_test_server.authorize_with('active')
192 cls.api_client = arvados.api('v1')
195 super(KeepProxyTestCase, self).tearDown()
196 DiskCacheBase.tearDown(self)
198 def test_KeepProxyTest1(self):
199 # Will use ARVADOS_KEEP_SERVICES environment variable that
200 # is set by setUpClass().
201 keep_client = arvados.KeepClient(api_client=self.api_client,
202 local_store='', block_cache=self.make_block_cache(self.disk_cache))
203 baz_locator = keep_client.put('baz')
206 r'^73feffa4b7f6bb68e44cf984c85f6e88\+3',
207 'wrong md5 hash from Keep.put("baz"): ' + baz_locator)
208 self.assertEqual(keep_client.get(baz_locator),
210 'wrong content from Keep.get(md5("baz"))')
211 self.assertTrue(keep_client.using_proxy)
213 def test_KeepProxyTestMultipleURIs(self):
214 # Test using ARVADOS_KEEP_SERVICES env var overriding any
215 # existing proxy setting and setting multiple proxies
216 arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
217 keep_client = arvados.KeepClient(api_client=self.api_client,
219 block_cache=self.make_block_cache(self.disk_cache))
220 uris = [x['_service_root'] for x in keep_client._keep_services]
221 self.assertEqual(uris, ['http://10.0.0.1/',
222 'https://foo.example.org:1234/'])
224 def test_KeepProxyTestInvalidURI(self):
225 arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org'
226 with self.assertRaises(arvados.errors.ArgumentError):
227 keep_client = arvados.KeepClient(api_client=self.api_client,
229 block_cache=self.make_block_cache(self.disk_cache))
231 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
232 class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
236 DiskCacheBase.tearDown(self)
238 def get_service_roots(self, api_client):
239 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
240 services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
241 return [urllib.parse.urlparse(url) for url in sorted(services)]
243 def test_ssl_flag_respected_in_roots(self):
244 for ssl_flag in [False, True]:
245 services = self.get_service_roots(self.mock_keep_services(
246 service_ssl_flag=ssl_flag))
248 ('https' if ssl_flag else 'http'), services[0].scheme)
250 def test_correct_ports_with_ipv6_addresses(self):
251 service = self.get_service_roots(self.mock_keep_services(
252 service_type='proxy', service_host='100::1', service_port=10, count=1))[0]
253 self.assertEqual('100::1', service.hostname)
254 self.assertEqual(10, service.port)
256 def test_recognize_proxy_services_in_controller_response(self):
257 keep_client = arvados.KeepClient(api_client=self.mock_keep_services(
258 service_type='proxy', service_host='localhost', service_port=9, count=1),
259 block_cache=self.make_block_cache(self.disk_cache))
261 # this will fail, but it ensures we get the service
263 keep_client.put('baz2', num_retries=0)
266 self.assertTrue(keep_client.using_proxy)
268 def test_insecure_disables_tls_verify(self):
269 api_client = self.mock_keep_services(count=1)
270 force_timeout = socket.timeout("timed out")
272 api_client.insecure = True
273 with tutil.mock_keep_responses(b'foo', 200) as mock:
274 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
275 keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
277 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
280 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
283 api_client.insecure = False
284 with tutil.mock_keep_responses(b'foo', 200) as mock:
285 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
286 keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
287 # getopt()==None here means we didn't change the
288 # default. If we were using real pycurl instead of a mock,
289 # it would return the default value 1.
291 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
294 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
297 def test_refresh_signature(self):
298 blk_digest = '6f5902ac237024bdd0c176cb93063dc4+11'
299 blk_sig = 'da39a3ee5e6b4b0d3255bfef95601890afd80709@53bed294'
300 local_loc = blk_digest+'+A'+blk_sig
301 remote_loc = blk_digest+'+R'+blk_sig
302 api_client = self.mock_keep_services(count=1)
303 headers = {'X-Keep-Locator':local_loc}
304 with tutil.mock_keep_responses('', 200, **headers):
305 # Check that the translated locator gets returned
306 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
307 self.assertEqual(local_loc, keep_client.refresh_signature(remote_loc))
308 # Check that refresh_signature() uses the correct method and headers
309 keep_client._get_or_head = mock.MagicMock()
310 keep_client.refresh_signature(remote_loc)
311 args, kwargs = keep_client._get_or_head.call_args_list[0]
312 self.assertIn(remote_loc, args)
313 self.assertEqual("HEAD", kwargs['method'])
314 self.assertIn('X-Keep-Signature', kwargs['headers'])
316 # test_*_timeout verify that KeepClient instructs pycurl to use
317 # the appropriate connection and read timeouts. They don't care
318 # whether pycurl actually exhibits the expected timeout behavior
319 # -- those tests are in the KeepClientTimeout test class.
321 def test_get_timeout(self):
322 api_client = self.mock_keep_services(count=1)
323 force_timeout = socket.timeout("timed out")
324 with tutil.mock_keep_responses(force_timeout, 0) as mock:
325 keep_client = arvados.KeepClient(
326 api_client=api_client,
327 block_cache=self.make_block_cache(self.disk_cache),
330 with self.assertRaises(arvados.errors.KeepReadError):
331 keep_client.get('ffffffffffffffffffffffffffffffff')
333 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
334 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
336 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
337 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
339 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
340 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
342 def test_put_timeout(self):
343 api_client = self.mock_keep_services(count=1)
344 force_timeout = socket.timeout("timed out")
345 with tutil.mock_keep_responses(force_timeout, 0) as mock:
346 keep_client = arvados.KeepClient(
347 api_client=api_client,
348 block_cache=self.make_block_cache(self.disk_cache),
351 with self.assertRaises(arvados.errors.KeepWriteError):
352 keep_client.put(b'foo')
354 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
355 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
357 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
358 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
360 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
361 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
363 def test_head_timeout(self):
364 api_client = self.mock_keep_services(count=1)
365 force_timeout = socket.timeout("timed out")
366 with tutil.mock_keep_responses(force_timeout, 0) as mock:
367 keep_client = arvados.KeepClient(
368 api_client=api_client,
369 block_cache=self.make_block_cache(self.disk_cache),
372 with self.assertRaises(arvados.errors.KeepReadError):
373 keep_client.head('ffffffffffffffffffffffffffffffff')
375 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
376 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
378 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
381 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
384 def test_proxy_get_timeout(self):
385 api_client = self.mock_keep_services(service_type='proxy', count=1)
386 force_timeout = socket.timeout("timed out")
387 with tutil.mock_keep_responses(force_timeout, 0) as mock:
388 keep_client = arvados.KeepClient(
389 api_client=api_client,
390 block_cache=self.make_block_cache(self.disk_cache),
393 with self.assertRaises(arvados.errors.KeepReadError):
394 keep_client.get('ffffffffffffffffffffffffffffffff')
396 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
397 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
399 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
400 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
402 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
403 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
405 def test_proxy_head_timeout(self):
406 api_client = self.mock_keep_services(service_type='proxy', count=1)
407 force_timeout = socket.timeout("timed out")
408 with tutil.mock_keep_responses(force_timeout, 0) as mock:
409 keep_client = arvados.KeepClient(
410 api_client=api_client,
411 block_cache=self.make_block_cache(self.disk_cache),
414 with self.assertRaises(arvados.errors.KeepReadError):
415 keep_client.head('ffffffffffffffffffffffffffffffff')
417 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
418 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
420 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
423 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
426 def test_proxy_put_timeout(self):
427 self.disk_cache_dir = None
428 api_client = self.mock_keep_services(service_type='proxy', count=1)
429 force_timeout = socket.timeout("timed out")
430 with tutil.mock_keep_responses(force_timeout, 0) as mock:
431 keep_client = arvados.KeepClient(
432 api_client=api_client,
435 with self.assertRaises(arvados.errors.KeepWriteError):
436 keep_client.put('foo')
438 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
439 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
441 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
442 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
444 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
445 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
447 def check_no_services_error(self, verb, exc_class):
448 api_client = mock.MagicMock(name='api_client')
449 api_client.keep_services().accessible().execute.side_effect = (
450 arvados.errors.ApiError)
451 keep_client = arvados.KeepClient(
452 api_client=api_client,
453 block_cache=self.make_block_cache(self.disk_cache),
456 with self.assertRaises(exc_class) as err_check:
457 getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
458 self.assertEqual(0, len(err_check.exception.request_errors()))
460 def test_get_error_with_no_services(self):
461 self.check_no_services_error('get', arvados.errors.KeepReadError)
463 def test_head_error_with_no_services(self):
464 self.check_no_services_error('head', arvados.errors.KeepReadError)
466 def test_put_error_with_no_services(self):
467 self.check_no_services_error('put', arvados.errors.KeepWriteError)
469 def check_errors_from_last_retry(self, verb, exc_class):
470 api_client = self.mock_keep_services(count=2)
471 req_mock = tutil.mock_keep_responses(
472 "retry error reporting test", 500, 500, 500, 500, 500, 500, 502, 502)
473 with req_mock, tutil.skip_sleep, \
474 self.assertRaises(exc_class) as err_check:
475 keep_client = arvados.KeepClient(
476 api_client=api_client,
477 block_cache=self.make_block_cache(self.disk_cache),
480 getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
482 self.assertEqual([502, 502], [
483 getattr(error, 'status_code', None)
484 for error in err_check.exception.request_errors().values()])
485 self.assertRegex(str(err_check.exception), r'failed to (read|write) .* after 4 attempts')
487 def test_get_error_reflects_last_retry(self):
488 self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
490 def test_head_error_reflects_last_retry(self):
491 self.check_errors_from_last_retry('head', arvados.errors.KeepReadError)
493 def test_put_error_reflects_last_retry(self):
494 self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
496 def test_put_error_does_not_include_successful_puts(self):
497 data = 'partial failure test'
498 data_loc = tutil.str_keep_locator(data)
499 api_client = self.mock_keep_services(count=3)
500 with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
501 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
502 keep_client = arvados.KeepClient(
503 api_client=api_client,
504 block_cache=self.make_block_cache(self.disk_cache),
507 keep_client.put(data)
508 self.assertEqual(2, len(exc_check.exception.request_errors()))
510 def test_proxy_put_with_no_writable_services(self):
511 data = 'test with no writable services'
512 data_loc = tutil.str_keep_locator(data)
513 api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
514 with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
515 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
516 keep_client = arvados.KeepClient(
517 api_client=api_client,
518 block_cache=self.make_block_cache(self.disk_cache),
521 keep_client.put(data)
522 self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
523 self.assertEqual(0, len(exc_check.exception.request_errors()))
525 def test_oddball_service_get(self):
526 body = b'oddball service get'
527 api_client = self.mock_keep_services(service_type='fancynewblobstore')
528 with tutil.mock_keep_responses(body, 200):
529 keep_client = arvados.KeepClient(
530 api_client=api_client,
531 block_cache=self.make_block_cache(self.disk_cache),
534 actual = keep_client.get(tutil.str_keep_locator(body))
535 self.assertEqual(body, actual)
537 def test_oddball_service_put(self):
538 body = b'oddball service put'
539 pdh = tutil.str_keep_locator(body)
540 api_client = self.mock_keep_services(service_type='fancynewblobstore')
541 with tutil.mock_keep_responses(pdh, 200):
542 keep_client = arvados.KeepClient(
543 api_client=api_client,
544 block_cache=self.make_block_cache(self.disk_cache),
547 actual = keep_client.put(body, copies=1)
548 self.assertEqual(pdh, actual)
550 def test_oddball_service_writer_count(self):
551 body = b'oddball service writer count'
552 pdh = tutil.str_keep_locator(body)
553 api_client = self.mock_keep_services(service_type='fancynewblobstore',
555 headers = {'x-keep-replicas-stored': 3}
556 with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
557 **headers) as req_mock:
558 keep_client = arvados.KeepClient(
559 api_client=api_client,
560 block_cache=self.make_block_cache(self.disk_cache),
563 actual = keep_client.put(body, copies=2)
564 self.assertEqual(pdh, actual)
565 self.assertEqual(1, req_mock.call_count)
568 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
569 class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
573 self.api_client = self.mock_keep_services(count=2)
574 self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
576 self.locator = '1271ed5ef305aadabc605b1609e24c52'
579 DiskCacheBase.tearDown(self)
581 @mock.patch('arvados.KeepClient.KeepService.get')
582 def test_get_request_cache(self, get_mock):
583 with tutil.mock_keep_responses(self.data, 200, 200):
584 self.keep_client.get(self.locator)
585 self.keep_client.get(self.locator)
586 # Request already cached, don't require more than one request
587 get_mock.assert_called_once()
589 @mock.patch('arvados.KeepClient.KeepService.get')
590 def test_head_request_cache(self, get_mock):
591 with tutil.mock_keep_responses(self.data, 200, 200):
592 self.keep_client.head(self.locator)
593 self.keep_client.head(self.locator)
594 # Don't cache HEAD requests so that they're not confused with GET reqs
595 self.assertEqual(2, get_mock.call_count)
597 @mock.patch('arvados.KeepClient.KeepService.get')
598 def test_head_and_then_get_return_different_responses(self, get_mock):
601 get_mock.side_effect = [b'first response', b'second response']
602 with tutil.mock_keep_responses(self.data, 200, 200):
603 head_resp = self.keep_client.head(self.locator)
604 get_resp = self.keep_client.get(self.locator)
605 self.assertEqual(b'first response', head_resp)
606 # First reponse was not cached because it was from a HEAD request.
607 self.assertNotEqual(head_resp, get_resp)
614 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
615 class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
619 self.api_client = self.mock_keep_services(count=2)
620 self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
622 self.locator = '1271ed5ef305aadabc605b1609e24c52'
623 self.test_id = arvados.util.new_request_id()
624 self.assertRegex(self.test_id, r'^req-[a-z0-9]{20}$')
625 # If we don't set request_id to None explicitly here, it will
626 # return <MagicMock name='api_client_mock.request_id'
628 self.api_client.request_id = None
631 DiskCacheBase.tearDown(self)
633 def test_default_to_api_client_request_id(self):
634 self.api_client.request_id = self.test_id
635 with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
636 self.keep_client.put(self.data)
637 self.assertEqual(2, len(mock.responses))
638 for resp in mock.responses:
639 self.assertProvidedRequestId(resp)
641 with tutil.mock_keep_responses(self.data, 200) as mock:
642 self.keep_client.get(self.locator)
643 self.assertProvidedRequestId(mock.responses[0])
645 with tutil.mock_keep_responses(b'', 200) as mock:
646 self.keep_client.head(self.locator)
647 self.assertProvidedRequestId(mock.responses[0])
649 def test_explicit_request_id(self):
650 with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
651 self.keep_client.put(self.data, request_id=self.test_id)
652 self.assertEqual(2, len(mock.responses))
653 for resp in mock.responses:
654 self.assertProvidedRequestId(resp)
656 with tutil.mock_keep_responses(self.data, 200) as mock:
657 self.keep_client.get(self.locator, request_id=self.test_id)
658 self.assertProvidedRequestId(mock.responses[0])
660 with tutil.mock_keep_responses(b'', 200) as mock:
661 self.keep_client.head(self.locator, request_id=self.test_id)
662 self.assertProvidedRequestId(mock.responses[0])
664 def test_automatic_request_id(self):
665 with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
666 self.keep_client.put(self.data)
667 self.assertEqual(2, len(mock.responses))
668 for resp in mock.responses:
669 self.assertAutomaticRequestId(resp)
671 with tutil.mock_keep_responses(self.data, 200) as mock:
672 self.keep_client.get(self.locator)
673 self.assertAutomaticRequestId(mock.responses[0])
675 with tutil.mock_keep_responses(b'', 200) as mock:
676 self.keep_client.head(self.locator)
677 self.assertAutomaticRequestId(mock.responses[0])
679 def test_request_id_in_exception(self):
680 with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
681 with self.assertRaisesRegex(arvados.errors.KeepReadError, self.test_id):
682 self.keep_client.head(self.locator, request_id=self.test_id)
684 with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
685 with self.assertRaisesRegex(arvados.errors.KeepReadError, r'req-[a-z0-9]{20}'):
686 self.keep_client.get(self.locator)
688 with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
689 with self.assertRaisesRegex(arvados.errors.KeepWriteError, self.test_id):
690 self.keep_client.put(self.data, request_id=self.test_id)
692 with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
693 with self.assertRaisesRegex(arvados.errors.KeepWriteError, r'req-[a-z0-9]{20}'):
694 self.keep_client.put(self.data)
696 def assertAutomaticRequestId(self, resp):
697 hdr = [x for x in resp.getopt(pycurl.HTTPHEADER)
698 if x.startswith('X-Request-Id: ')][0]
699 self.assertNotEqual(hdr, 'X-Request-Id: '+self.test_id)
700 self.assertRegex(hdr, r'^X-Request-Id: req-[a-z0-9]{20}$')
702 def assertProvidedRequestId(self, resp):
703 self.assertIn('X-Request-Id: '+self.test_id,
704 resp.getopt(pycurl.HTTPHEADER))
708 #@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
709 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
713 # expected_order[i] is the probe order for
714 # hash=md5(sprintf("%064x",i)) where there are 16 services
715 # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
716 # the first probe for the block consisting of 64 "0"
717 # characters is the service whose uuid is
718 # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
720 self.expected_order = [
721 list('3eab2d5fc9681074'),
722 list('097dba52e648f1c3'),
723 list('c5b4e023f8a7d691'),
724 list('9d81c02e76a3bf54'),
727 "{:064x}".format(x).encode()
728 for x in range(len(self.expected_order))]
730 hashlib.md5(self.blocks[x]).hexdigest()
731 for x in range(len(self.expected_order))]
732 self.api_client = self.mock_keep_services(count=self.services)
733 self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
736 DiskCacheBase.tearDown(self)
738 def test_weighted_service_roots_against_reference_set(self):
739 # Confirm weighted_service_roots() returns the correct order
740 for i, hash in enumerate(self.hashes):
741 roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash))
743 re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
745 self.assertEqual(self.expected_order[i], got_order)
747 def test_get_probe_order_against_reference_set(self):
748 self._test_probe_order_against_reference_set(
749 lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
751 def test_head_probe_order_against_reference_set(self):
752 self._test_probe_order_against_reference_set(
753 lambda i: self.keep_client.head(self.hashes[i], num_retries=1))
755 def test_put_probe_order_against_reference_set(self):
756 # copies=1 prevents the test from being sensitive to races
757 # between writer threads.
758 self._test_probe_order_against_reference_set(
759 lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1))
761 def _test_probe_order_against_reference_set(self, op):
762 for i in range(len(self.blocks)):
763 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \
764 self.assertRaises(arvados.errors.KeepRequestError):
767 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
768 for resp in mock.responses]
769 self.assertEqual(self.expected_order[i]*2, got_order)
771 def test_put_probe_order_multiple_copies(self):
772 for copies in range(2, 4):
773 for i in range(len(self.blocks)):
774 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \
775 self.assertRaises(arvados.errors.KeepWriteError):
776 self.keep_client.put(self.blocks[i], num_retries=2, copies=copies)
778 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
779 for resp in mock.responses]
780 # With T threads racing to make requests, the position
781 # of a given server in the sequence of HTTP requests
782 # (got_order) cannot be more than T-1 positions
783 # earlier than that server's position in the reference
784 # probe sequence (expected_order).
786 # Loop invariant: we have accounted for +pos+ expected
787 # probes, either by seeing them in +got_order+ or by
788 # putting them in +pending+ in the hope of seeing them
789 # later. As long as +len(pending)<T+, we haven't
790 # started a request too early.
792 for pos, expected in enumerate(self.expected_order[i]*3):
793 got = got_order[pos-len(pending)]
794 while got in pending:
795 del pending[pending.index(got)]
796 got = got_order[pos-len(pending)]
798 pending.append(expected)
800 len(pending), copies,
801 "pending={}, with copies={}, got {}, expected {}".format(
802 pending, copies, repr(got_order), repr(self.expected_order[i]*3)))
804 def test_probe_waste_adding_one_server(self):
806 hashlib.md5("{:064x}".format(x).encode()).hexdigest() for x in range(100)]
807 initial_services = 12
808 self.api_client = self.mock_keep_services(count=initial_services)
809 self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
811 self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
812 for added_services in range(1, 12):
813 api_client = self.mock_keep_services(count=initial_services+added_services)
814 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
816 for hash_index in range(len(hashes)):
817 probe_after = keep_client.weighted_service_roots(
818 arvados.KeepLocator(hashes[hash_index]))
819 penalty = probe_after.index(probes_before[hash_index][0])
820 self.assertLessEqual(penalty, added_services)
821 total_penalty += penalty
822 # Average penalty per block should not exceed
823 # N(added)/N(orig) by more than 20%, and should get closer
824 # to the ideal as we add data points.
827 len(hashes) / initial_services)
830 (120 - added_services)/100)
832 expect_penalty * 8/10)
834 min_penalty <= total_penalty <= max_penalty,
835 "With {}+{} services, {} blocks, penalty {} but expected {}..{}".format(
843 def check_64_zeros_error_order(self, verb, exc_class):
846 data = tutil.str_keep_locator(data)
847 # Arbitrary port number:
848 aport = random.randint(1024,65535)
849 api_client = self.mock_keep_services(service_port=aport, count=self.services)
850 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
851 with mock.patch('pycurl.Curl') as curl_mock, \
852 self.assertRaises(exc_class) as err_check:
853 curl_mock.return_value = tutil.FakeCurl.make(code=500, body=b'')
854 getattr(keep_client, verb)(data)
855 urls = [urllib.parse.urlparse(url)
856 for url in err_check.exception.request_errors()]
857 self.assertEqual([('keep0x' + c, aport) for c in '3eab2d5fc9681074'],
858 [(url.hostname, url.port) for url in urls])
860 def test_get_error_shows_probe_order(self):
861 self.check_64_zeros_error_order('get', arvados.errors.KeepReadError)
863 def test_put_error_shows_probe_order(self):
864 self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
866 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
867 class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase, DiskCacheBase):
870 # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
871 # 1s worth of data and then trigger bandwidth errors before running
874 BANDWIDTH_LOW_LIM = 1024
878 DiskCacheBase.tearDown(self)
880 class assertTakesBetween(unittest.TestCase):
881 def __init__(self, tmin, tmax):
886 self.t0 = time.time()
888 def __exit__(self, *args, **kwargs):
889 # Round times to milliseconds, like CURL. Otherwise, we
890 # fail when CURL reaches a 1s timeout at 0.9998s.
891 delta = round(time.time() - self.t0, 3)
892 self.assertGreaterEqual(delta, self.tmin)
893 self.assertLessEqual(delta, self.tmax)
895 class assertTakesGreater(unittest.TestCase):
896 def __init__(self, tmin):
900 self.t0 = time.time()
902 def __exit__(self, *args, **kwargs):
903 delta = round(time.time() - self.t0, 3)
904 self.assertGreaterEqual(delta, self.tmin)
906 def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
907 return arvados.KeepClient(
908 api_client=self.api_client,
909 timeout=timeouts, block_cache=self.make_block_cache(self.disk_cache))
911 def test_timeout_slow_connect(self):
912 # Can't simulate TCP delays with our own socket. Leave our
913 # stub server running uselessly, and try to connect to an
914 # unroutable IP address instead.
915 self.api_client = self.mock_keep_services(
917 service_host='240.0.0.0',
919 with self.assertTakesBetween(0.1, 0.5):
920 with self.assertRaises(arvados.errors.KeepWriteError):
921 self.keepClient().put(self.DATA, copies=1, num_retries=0)
923 def test_low_bandwidth_no_delays_success(self):
924 self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
925 kc = self.keepClient()
926 loc = kc.put(self.DATA, copies=1, num_retries=0)
927 self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
929 def test_too_low_bandwidth_no_delays_failure(self):
930 # Check that lessening bandwidth corresponds to failing
931 kc = self.keepClient()
932 loc = kc.put(self.DATA, copies=1, num_retries=0)
933 self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
934 with self.assertTakesGreater(self.TIMEOUT_TIME):
935 with self.assertRaises(arvados.errors.KeepReadError):
936 kc.get(loc, num_retries=0)
937 with self.assertTakesGreater(self.TIMEOUT_TIME):
938 with self.assertRaises(arvados.errors.KeepWriteError):
939 kc.put(self.DATA, copies=1, num_retries=0)
941 def test_low_bandwidth_with_server_response_delay_failure(self):
942 kc = self.keepClient()
943 loc = kc.put(self.DATA, copies=1, num_retries=0)
944 self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
945 # Note the actual delay must be 1s longer than the low speed
946 # limit interval in order for curl to detect it reliably.
947 self.server.setdelays(response=self.TIMEOUT_TIME+1)
948 with self.assertTakesGreater(self.TIMEOUT_TIME):
949 with self.assertRaises(arvados.errors.KeepReadError):
950 kc.get(loc, num_retries=0)
951 with self.assertTakesGreater(self.TIMEOUT_TIME):
952 with self.assertRaises(arvados.errors.KeepWriteError):
953 kc.put(self.DATA, copies=1, num_retries=0)
954 with self.assertTakesGreater(self.TIMEOUT_TIME):
955 kc.head(loc, num_retries=0)
957 def test_low_bandwidth_with_server_mid_delay_failure(self):
958 kc = self.keepClient()
959 loc = kc.put(self.DATA, copies=1, num_retries=0)
960 self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
961 # Note the actual delay must be 1s longer than the low speed
962 # limit interval in order for curl to detect it reliably.
963 self.server.setdelays(mid_write=self.TIMEOUT_TIME+1, mid_read=self.TIMEOUT_TIME+1)
964 with self.assertTakesGreater(self.TIMEOUT_TIME):
965 with self.assertRaises(arvados.errors.KeepReadError) as e:
966 kc.get(loc, num_retries=0)
967 with self.assertTakesGreater(self.TIMEOUT_TIME):
968 with self.assertRaises(arvados.errors.KeepWriteError):
969 kc.put(self.DATA, copies=1, num_retries=0)
971 def test_timeout_slow_request(self):
972 loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
973 self.server.setdelays(request=.2)
974 self._test_connect_timeout_under_200ms(loc)
975 self.server.setdelays(request=2)
976 self._test_response_timeout_under_2s(loc)
978 def test_timeout_slow_response(self):
979 loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
980 self.server.setdelays(response=.2)
981 self._test_connect_timeout_under_200ms(loc)
982 self.server.setdelays(response=2)
983 self._test_response_timeout_under_2s(loc)
985 def test_timeout_slow_response_body(self):
986 loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
987 self.server.setdelays(response_body=.2)
988 self._test_connect_timeout_under_200ms(loc)
989 self.server.setdelays(response_body=2)
990 self._test_response_timeout_under_2s(loc)
992 def _test_connect_timeout_under_200ms(self, loc):
993 # Allow 100ms to connect, then 1s for response. Everything
994 # should work, and everything should take at least 200ms to
996 kc = self.keepClient(timeouts=(.1, 1))
997 with self.assertTakesBetween(.2, .3):
998 kc.put(self.DATA, copies=1, num_retries=0)
999 with self.assertTakesBetween(.2, .3):
1000 self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
1002 def _test_response_timeout_under_2s(self, loc):
1003 # Allow 10s to connect, then 1s for response. Nothing should
1004 # work, and everything should take at least 1s to return.
1005 kc = self.keepClient(timeouts=(10, 1))
1006 with self.assertTakesBetween(1, 9):
1007 with self.assertRaises(arvados.errors.KeepReadError):
1008 kc.get(loc, num_retries=0)
1009 with self.assertTakesBetween(1, 9):
1010 with self.assertRaises(arvados.errors.KeepWriteError):
1011 kc.put(self.DATA, copies=1, num_retries=0)
1013 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1014 class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
1018 DiskCacheBase.tearDown(self)
1020 def mock_disks_and_gateways(self, disks=3, gateways=1):
1022 'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
1023 'owner_uuid': 'zzzzz-tpzed-000000000000000',
1024 'service_host': 'gatewayhost{}'.format(i),
1025 'service_port': 12345,
1026 'service_ssl_flag': True,
1027 'service_type': 'gateway:test',
1028 } for i in range(gateways)]
1029 self.gateway_roots = [
1030 "https://{service_host}:{service_port}/".format(**gw)
1031 for gw in self.gateways]
1032 self.api_client = self.mock_keep_services(
1033 count=disks, additional_services=self.gateways)
1034 self.keepClient = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
1036 @mock.patch('pycurl.Curl')
1037 def test_get_with_gateway_hint_first(self, MockCurl):
1038 MockCurl.return_value = tutil.FakeCurl.make(
1039 code=200, body='foo', headers={'Content-Length': 3})
1040 self.mock_disks_and_gateways()
1041 locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@' + self.gateways[0]['uuid']
1042 self.assertEqual(b'foo', self.keepClient.get(locator))
1043 self.assertEqual(self.gateway_roots[0]+locator,
1044 MockCurl.return_value.getopt(pycurl.URL).decode())
1045 self.assertEqual(True, self.keepClient.head(locator))
1047 @mock.patch('pycurl.Curl')
1048 def test_get_with_gateway_hints_in_order(self, MockCurl):
1052 tutil.FakeCurl.make(code=404, body='')
1053 for _ in range(gateways+disks)
1055 MockCurl.side_effect = tutil.queue_with(mocks)
1056 self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1057 locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1058 ['K@'+gw['uuid'] for gw in self.gateways])
1059 with self.assertRaises(arvados.errors.NotFoundError):
1060 self.keepClient.get(locator)
1061 # Gateways are tried first, in the order given.
1062 for i, root in enumerate(self.gateway_roots):
1063 self.assertEqual(root+locator,
1064 mocks[i].getopt(pycurl.URL).decode())
1065 # Disk services are tried next.
1066 for i in range(gateways, gateways+disks):
1068 mocks[i].getopt(pycurl.URL).decode(),
1071 @mock.patch('pycurl.Curl')
1072 def test_head_with_gateway_hints_in_order(self, MockCurl):
1076 tutil.FakeCurl.make(code=404, body=b'')
1077 for _ in range(gateways+disks)
1079 MockCurl.side_effect = tutil.queue_with(mocks)
1080 self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1081 locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1082 ['K@'+gw['uuid'] for gw in self.gateways])
1083 with self.assertRaises(arvados.errors.NotFoundError):
1084 self.keepClient.head(locator)
1085 # Gateways are tried first, in the order given.
1086 for i, root in enumerate(self.gateway_roots):
1087 self.assertEqual(root+locator,
1088 mocks[i].getopt(pycurl.URL).decode())
1089 # Disk services are tried next.
1090 for i in range(gateways, gateways+disks):
1092 mocks[i].getopt(pycurl.URL).decode(),
1095 @mock.patch('pycurl.Curl')
1096 def test_get_with_remote_proxy_hint(self, MockCurl):
1097 MockCurl.return_value = tutil.FakeCurl.make(
1098 code=200, body=b'foo', headers={'Content-Length': 3})
1099 self.mock_disks_and_gateways()
1100 locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1101 self.assertEqual(b'foo', self.keepClient.get(locator))
1102 self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1103 MockCurl.return_value.getopt(pycurl.URL).decode())
1105 @mock.patch('pycurl.Curl')
1106 def test_head_with_remote_proxy_hint(self, MockCurl):
1107 MockCurl.return_value = tutil.FakeCurl.make(
1108 code=200, body=b'foo', headers={'Content-Length': 3})
1109 self.mock_disks_and_gateways()
1110 locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1111 self.assertEqual(True, self.keepClient.head(locator))
1112 self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1113 MockCurl.return_value.getopt(pycurl.URL).decode())
1115 class KeepClientRetryTestMixin(object):
1118 # Testing with a local Keep store won't exercise the retry behavior.
1119 # Instead, our strategy is:
1120 # * Create a client with one proxy specified (pointed at a black
1121 # hole), so there's no need to instantiate an API client, and
1122 # all HTTP requests come from one place.
1123 # * Mock httplib's request method to provide simulated responses.
1124 # This lets us test the retry logic extensively without relying on any
1125 # supporting servers, and prevents side effects in case something hiccups.
1126 # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
1129 # Test classes must define TEST_PATCHER to a method that mocks
1130 # out appropriate methods in the client.
1132 PROXY_ADDR = 'http://[%s]:65535/' % (tutil.TEST_HOST,)
1133 TEST_DATA = b'testdata'
1134 TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
1137 self.client_kwargs = {'proxy': self.PROXY_ADDR, 'local_store': ''}
1139 def new_client(self, **caller_kwargs):
1140 kwargs = self.client_kwargs.copy()
1141 kwargs.update(caller_kwargs)
1142 kwargs['block_cache'] = self.make_block_cache(self.disk_cache)
1143 return arvados.KeepClient(**kwargs)
1145 def run_method(self, *args, **kwargs):
1146 raise NotImplementedError("test subclasses must define run_method")
1148 def check_success(self, expected=None, *args, **kwargs):
1149 if expected is None:
1150 expected = self.DEFAULT_EXPECT
1151 self.assertEqual(expected, self.run_method(*args, **kwargs))
1153 def check_exception(self, error_class=None, *args, **kwargs):
1154 if error_class is None:
1155 error_class = self.DEFAULT_EXCEPTION
1156 with self.assertRaises(error_class) as err:
1157 self.run_method(*args, **kwargs)
1160 def test_immediate_success(self):
1161 with self.TEST_PATCHER(self.DEFAULT_EXPECT, 200):
1162 self.check_success()
1164 def test_retry_then_success(self):
1165 with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1166 self.check_success(num_retries=3)
1168 def test_exception_then_success(self):
1169 with self.TEST_PATCHER(self.DEFAULT_EXPECT, Exception('mock err'), 200):
1170 self.check_success(num_retries=3)
1172 def test_no_retry_after_permanent_error(self):
1173 with self.TEST_PATCHER(self.DEFAULT_EXPECT, 403, 200):
1174 self.check_exception(num_retries=3)
1176 def test_error_after_retries_exhausted(self):
1177 with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 500, 200):
1178 err = self.check_exception(num_retries=1)
1179 self.assertRegex(str(err.exception), r'failed to .* after 2 attempts')
1181 def test_num_retries_instance_fallback(self):
1182 self.client_kwargs['num_retries'] = 3
1183 with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1184 self.check_success()
1188 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1189 class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1190 DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
1191 DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1192 HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1193 TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1196 DiskCacheBase.tearDown(self)
1198 def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1200 return self.new_client().get(locator, *args, **kwargs)
1202 def test_specific_exception_when_not_found(self):
1203 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1204 self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1206 def test_general_exception_with_mixed_errors(self):
1207 # get should raise a NotFoundError if no server returns the block,
1208 # and a high threshold of servers report that it's not found.
1209 # This test rigs up 50/50 disagreement between two servers, and
1210 # checks that it does not become a NotFoundError.
1211 client = self.new_client(num_retries=0)
1212 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1213 with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1214 client.get(self.HINTED_LOCATOR)
1215 self.assertNotIsInstance(
1216 exc_check.exception, arvados.errors.NotFoundError,
1217 "mixed errors raised NotFoundError")
1219 def test_hint_server_can_succeed_without_retries(self):
1220 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1221 self.check_success(locator=self.HINTED_LOCATOR)
1223 def test_try_next_server_after_timeout(self):
1224 with tutil.mock_keep_responses(
1225 (socket.timeout("timed out"), 200),
1226 (self.DEFAULT_EXPECT, 200)):
1227 self.check_success(locator=self.HINTED_LOCATOR)
1229 def test_retry_data_with_wrong_checksum(self):
1230 with tutil.mock_keep_responses(
1232 (self.DEFAULT_EXPECT, 200)):
1233 self.check_success(locator=self.HINTED_LOCATOR)
1236 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1237 class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1238 DEFAULT_EXPECT = True
1239 DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1240 HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1241 TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1244 DiskCacheBase.tearDown(self)
1246 def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1248 return self.new_client().head(locator, *args, **kwargs)
1250 def test_specific_exception_when_not_found(self):
1251 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1252 self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1254 def test_general_exception_with_mixed_errors(self):
1255 # head should raise a NotFoundError if no server returns the block,
1256 # and a high threshold of servers report that it's not found.
1257 # This test rigs up 50/50 disagreement between two servers, and
1258 # checks that it does not become a NotFoundError.
1259 client = self.new_client(num_retries=0)
1260 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1261 with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1262 client.head(self.HINTED_LOCATOR)
1263 self.assertNotIsInstance(
1264 exc_check.exception, arvados.errors.NotFoundError,
1265 "mixed errors raised NotFoundError")
1267 def test_hint_server_can_succeed_without_retries(self):
1268 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1269 self.check_success(locator=self.HINTED_LOCATOR)
1271 def test_try_next_server_after_timeout(self):
1272 with tutil.mock_keep_responses(
1273 (socket.timeout("timed out"), 200),
1274 (self.DEFAULT_EXPECT, 200)):
1275 self.check_success(locator=self.HINTED_LOCATOR)
1278 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1279 class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1280 DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
1281 DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
1282 TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1285 DiskCacheBase.tearDown(self)
1287 def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
1288 copies=1, *args, **kwargs):
1289 return self.new_client().put(data, copies, *args, **kwargs)
1291 def test_do_not_send_multiple_copies_to_same_server(self):
1292 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200):
1293 self.check_exception(copies=2, num_retries=3)
1296 class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
1298 class FakeKeepService(object):
1299 def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
1301 self.will_succeed = will_succeed
1302 self.will_raise = will_raise
1304 self._result['headers'] = {}
1305 self._result['headers']['x-keep-replicas-stored'] = str(replicas)
1306 self._result['headers']['x-keep-storage-classes-confirmed'] = 'default={}'.format(replicas)
1307 self._result['body'] = 'foobar'
1309 def put(self, data_hash, data, timeout, headers):
1310 time.sleep(self.delay)
1311 if self.will_raise is not None:
1312 raise self.will_raise
1313 return self.will_succeed
1315 def last_result(self):
1316 if self.will_succeed:
1319 return {"status_code": 500, "body": "didn't succeed"}
1326 self.pool = arvados.KeepClient.KeepWriterThreadPool(
1328 data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1329 max_service_replicas = self.copies,
1330 copies = self.copies
1333 def test_only_write_enough_on_success(self):
1335 ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1336 self.pool.add_task(ks, None)
1338 self.assertEqual(self.pool.done(), (self.copies, []))
1340 def test_only_write_enough_on_partial_success(self):
1342 ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
1343 self.pool.add_task(ks, None)
1344 ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1345 self.pool.add_task(ks, None)
1347 self.assertEqual(self.pool.done(), (self.copies, []))
1349 def test_only_write_enough_when_some_crash(self):
1351 ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1352 self.pool.add_task(ks, None)
1353 ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1354 self.pool.add_task(ks, None)
1356 self.assertEqual(self.pool.done(), (self.copies, []))
1358 def test_fail_when_too_many_crash(self):
1359 for i in range(self.copies+1):
1360 ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1361 self.pool.add_task(ks, None)
1362 for i in range(self.copies-1):
1363 ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1364 self.pool.add_task(ks, None)
1366 self.assertEqual(self.pool.done(), (self.copies-1, []))
1370 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1371 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
1374 # Test put()s that need two distinct servers to succeed, possibly
1375 # requiring multiple passes through the retry loop.
1378 self.api_client = self.mock_keep_services(count=2)
1379 self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
1382 DiskCacheBase.tearDown(self)
1384 def test_success_after_exception(self):
1385 with tutil.mock_keep_responses(
1386 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1387 Exception('mock err'), 200, 200) as req_mock:
1388 self.keep_client.put('foo', num_retries=1, copies=2)
1389 self.assertEqual(3, req_mock.call_count)
1391 def test_success_after_retryable_error(self):
1392 with tutil.mock_keep_responses(
1393 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1394 500, 200, 200) as req_mock:
1395 self.keep_client.put('foo', num_retries=1, copies=2)
1396 self.assertEqual(3, req_mock.call_count)
1398 def test_fail_after_final_error(self):
1399 # First retry loop gets a 200 (can't achieve replication by
1400 # storing again on that server) and a 400 (can't retry that
1401 # server at all), so we shouldn't try a third request.
1402 with tutil.mock_keep_responses(
1403 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1404 200, 400, 200) as req_mock:
1405 with self.assertRaises(arvados.errors.KeepWriteError):
1406 self.keep_client.put('foo', num_retries=1, copies=2)
1407 self.assertEqual(2, req_mock.call_count)
1409 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1410 class KeepClientAPIErrorTest(unittest.TestCase, DiskCacheBase):
1414 DiskCacheBase.tearDown(self)
1416 def test_api_fail(self):
1417 class ApiMock(object):
1418 def __getattr__(self, r):
1419 if r == "api_token":
1421 elif r == "insecure":
1426 raise arvados.errors.KeepReadError()
1427 keep_client = arvados.KeepClient(api_client=ApiMock(),
1428 proxy='', local_store='',
1429 block_cache=self.make_block_cache(self.disk_cache))
1431 # The bug this is testing for is that if an API (not
1432 # keepstore) exception is thrown as part of a get(), the next
1433 # attempt to get that same block will result in a deadlock.
1434 # This is why there are two get()s in a row. Unfortunately,
1435 # the failure mode for this test is that the test suite
1436 # deadlocks, there isn't a good way to avoid that without
1437 # adding a special case that has no use except for this test.
1439 with self.assertRaises(arvados.errors.KeepReadError):
1440 keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1441 with self.assertRaises(arvados.errors.KeepReadError):
1442 keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1445 class KeepDiskCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
1447 self.api_client = self.mock_keep_services(count=2)
1448 self.data = b'xyzzy'
1449 self.locator = '1271ed5ef305aadabc605b1609e24c52'
1450 self.disk_cache_dir = tempfile.mkdtemp()
1453 shutil.rmtree(self.disk_cache_dir)
1456 @mock.patch('arvados.KeepClient.KeepService.get')
1457 def test_disk_cache_read(self, get_mock):
1458 # confirm it finds an existing cache block when the cache is
1461 os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1462 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1465 # block cache should have found the existing block
1466 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1467 disk_cache_dir=self.disk_cache_dir)
1468 keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1470 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1472 get_mock.assert_not_called()
1475 @mock.patch('arvados.KeepClient.KeepService.get')
1476 def test_disk_cache_share(self, get_mock):
1477 # confirm it finds a cache block written after the disk cache
1480 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1481 disk_cache_dir=self.disk_cache_dir)
1482 keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1484 os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1485 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1488 # when we try to get the block, it'll check the disk and find it.
1489 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1491 get_mock.assert_not_called()
1494 def test_disk_cache_write(self):
1495 # confirm the cache block was created
1497 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1498 disk_cache_dir=self.disk_cache_dir)
1499 keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1501 with tutil.mock_keep_responses(self.data, 200) as mock:
1502 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1504 self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1506 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1507 self.assertTrue(tutil.binary_compare(f.read(), self.data))
1510 def test_disk_cache_clean(self):
1511 # confirm that a tmp file in the cache is cleaned up
1513 os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1514 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock"), "wb") as f:
1517 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC"), "wb") as f:
1520 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC"), "wb") as f:
1523 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1524 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1525 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1527 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1528 disk_cache_dir=self.disk_cache_dir)
1530 # The tmp still hasn't been deleted because it was created in the last 60 seconds
1531 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1532 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1533 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1535 # Set the mtime to 61s in the past
1536 os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock"), times=(time.time()-61, time.time()-61))
1537 os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC"), times=(time.time()-61, time.time()-61))
1538 os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC"), times=(time.time()-61, time.time()-61))
1540 block_cache2 = arvados.keep.KeepBlockCache(disk_cache=True,
1541 disk_cache_dir=self.disk_cache_dir)
1543 # Tmp should be gone but the other ones are safe.
1544 self.assertFalse(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1545 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1546 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1549 @mock.patch('arvados.KeepClient.KeepService.get')
1550 def test_disk_cache_cap(self, get_mock):
1551 # confirm that the cache is kept to the desired limit
1553 os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1554 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1557 os.makedirs(os.path.join(self.disk_cache_dir, "acb"))
1558 with open(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock"), "wb") as f:
1561 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1562 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1564 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1565 disk_cache_dir=self.disk_cache_dir,
1568 self.assertFalse(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1569 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1572 @mock.patch('arvados.KeepClient.KeepService.get')
1573 def test_disk_cache_share(self, get_mock):
1574 # confirm that a second cache doesn't delete files that belong to the first cache.
1576 os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1577 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1580 os.makedirs(os.path.join(self.disk_cache_dir, "acb"))
1581 with open(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock"), "wb") as f:
1584 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1585 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1587 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1588 disk_cache_dir=self.disk_cache_dir,
1591 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1592 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1594 block_cache2 = arvados.keep.KeepBlockCache(disk_cache=True,
1595 disk_cache_dir=self.disk_cache_dir,
1598 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1599 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1603 def test_disk_cache_error(self):
1604 os.chmod(self.disk_cache_dir, stat.S_IRUSR)
1606 # Fail during cache initialization.
1607 with self.assertRaises(OSError):
1608 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1609 disk_cache_dir=self.disk_cache_dir)
1612 def test_disk_cache_write_error(self):
1613 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1614 disk_cache_dir=self.disk_cache_dir)
1616 keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1618 # Make the cache dir read-only
1619 os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1620 os.chmod(os.path.join(self.disk_cache_dir, self.locator[0:3]), stat.S_IRUSR)
1623 with self.assertRaises(arvados.errors.KeepCacheError):
1624 with tutil.mock_keep_responses(self.data, 200) as mock:
1625 keep_client.get(self.locator)
1628 def test_disk_cache_retry_write_error(self):
1629 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1630 disk_cache_dir=self.disk_cache_dir)
1632 keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1635 realmmap = mmap.mmap
1636 def sideeffect_mmap(*args, **kwargs):
1640 raise OSError(errno.ENOSPC, "no space")
1642 return realmmap(*args, **kwargs)
1644 with patch('mmap.mmap') as mockmmap:
1645 mockmmap.side_effect = sideeffect_mmap
1647 cache_max_before = block_cache.cache_max
1649 with tutil.mock_keep_responses(self.data, 200) as mock:
1650 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1652 self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1654 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1655 self.assertTrue(tutil.binary_compare(f.read(), self.data))
1657 # shrank the cache in response to ENOSPC
1658 self.assertTrue(cache_max_before > block_cache.cache_max)
1661 def test_disk_cache_retry_write_error2(self):
1662 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1663 disk_cache_dir=self.disk_cache_dir)
1665 keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1668 realmmap = mmap.mmap
1669 def sideeffect_mmap(*args, **kwargs):
1673 raise OSError(errno.ENOMEM, "no memory")
1675 return realmmap(*args, **kwargs)
1677 with patch('mmap.mmap') as mockmmap:
1678 mockmmap.side_effect = sideeffect_mmap
1680 slots_before = block_cache._max_slots
1682 with tutil.mock_keep_responses(self.data, 200) as mock:
1683 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1685 self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1687 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1688 self.assertTrue(tutil.binary_compare(f.read(), self.data))
1690 # shrank the cache in response to ENOMEM
1691 self.assertTrue(slots_before > block_cache._max_slots)