1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
21 from unittest import mock
22 from unittest.mock import patch
29 from . import arvados_testutil as tutil
30 from . import keepstub
31 from . import run_test_server
33 from .arvados_testutil import DiskCacheBase
35 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
36 class KeepTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
40 block_cache_test = None
44 super(KeepTestCase, cls).setUpClass()
45 run_test_server.authorize_with("admin")
46 cls.api_client = arvados.api('v1')
47 cls.block_cache_test = DiskCacheBase()
48 cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
49 proxy='', local_store='',
50 block_cache=cls.block_cache_test.make_block_cache(cls.disk_cache))
53 def tearDownClass(cls):
54 super(KeepTestCase, cls).setUpClass()
55 cls.block_cache_test.tearDown()
57 def test_KeepBasicRWTest(self):
58 self.assertEqual(0, self.keep_client.upload_counter.get())
59 foo_locator = self.keep_client.put('foo')
62 r'^acbd18db4cc2f85cedef654fccc4a4d8\+3',
63 'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
65 # 6 bytes because uploaded 2 copies
66 self.assertEqual(6, self.keep_client.upload_counter.get())
68 self.assertEqual(0, self.keep_client.download_counter.get())
69 self.assertTrue(tutil.binary_compare(self.keep_client.get(foo_locator),
71 'wrong content from Keep.get(md5("foo"))')
72 self.assertEqual(3, self.keep_client.download_counter.get())
74 def test_KeepBinaryRWTest(self):
75 blob_str = b'\xff\xfe\xf7\x00\x01\x02'
76 blob_locator = self.keep_client.put(blob_str)
79 r'^7fc7c53b45e53926ba52821140fef396\+6',
80 ('wrong locator from Keep.put(<binarydata>):' + blob_locator))
81 self.assertEqual(self.keep_client.get(blob_locator),
83 'wrong content from Keep.get(md5(<binarydata>))')
85 def test_KeepLongBinaryRWTest(self):
86 blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
87 for i in range(0, 23):
88 blob_data = blob_data + blob_data
89 blob_locator = self.keep_client.put(blob_data)
92 r'^84d90fc0d8175dd5dcfab04b999bc956\+67108864',
93 ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
94 self.assertEqual(self.keep_client.get(blob_locator),
96 'wrong content from Keep.get(md5(<binarydata>))')
98 @unittest.skip("unreliable test - please fix and close #8752")
99 def test_KeepSingleCopyRWTest(self):
100 blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
101 blob_locator = self.keep_client.put(blob_data, copies=1)
104 r'^c902006bc98a3eb4a3663b65ab4a6fab\+8',
105 ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
106 self.assertEqual(self.keep_client.get(blob_locator),
108 'wrong content from Keep.get(md5(<binarydata>))')
110 def test_KeepEmptyCollectionTest(self):
111 blob_locator = self.keep_client.put('', copies=1)
114 r'^d41d8cd98f00b204e9800998ecf8427e\+0',
115 ('wrong locator from Keep.put(""): ' + blob_locator))
117 def test_KeepPutDataType(self):
118 with self.assertRaises(AttributeError):
119 # Must be bytes or have an encode() method
120 self.keep_client.put({})
122 def test_KeepHeadTest(self):
123 locator = self.keep_client.put('test_head')
126 r'^b9a772c7049325feb7130fff1f8333e9\+9',
127 'wrong md5 hash from Keep.put for "test_head": ' + locator)
128 self.assertEqual(True, self.keep_client.head(locator))
129 self.assertEqual(self.keep_client.get(locator),
131 'wrong content from Keep.get for "test_head"')
133 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
134 class KeepPermissionTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
137 KEEP_SERVER = {'blob_signing': True}
140 DiskCacheBase.tearDown(self)
142 def test_KeepBasicRWTest(self):
143 run_test_server.authorize_with('active')
144 keep_client = arvados.KeepClient(block_cache=self.make_block_cache(self.disk_cache))
145 foo_locator = keep_client.put('foo')
148 r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
149 'invalid locator from Keep.put("foo"): ' + foo_locator)
150 self.assertEqual(keep_client.get(foo_locator),
152 'wrong content from Keep.get(md5("foo"))')
154 # GET with an unsigned locator => bad request
155 bar_locator = keep_client.put('bar')
156 unsigned_bar_locator = "37b51d194a7513e45b56f6524f2d51f2+3"
159 r'^37b51d194a7513e45b56f6524f2d51f2\+3\+A[a-f0-9]+@[a-f0-9]+$',
160 'invalid locator from Keep.put("bar"): ' + bar_locator)
161 self.assertRaises(arvados.errors.KeepReadError,
163 unsigned_bar_locator)
165 # GET from a different user => bad request
166 run_test_server.authorize_with('spectator')
167 keep_client2 = arvados.KeepClient(block_cache=self.make_block_cache(self.disk_cache))
168 self.assertRaises(arvados.errors.KeepReadError,
172 # Unauthenticated GET for a signed locator => bad request
173 # Unauthenticated GET for an unsigned locator => bad request
174 keep_client.api_token = ''
175 self.assertRaises(arvados.errors.KeepReadError,
178 self.assertRaises(arvados.errors.KeepReadError,
180 unsigned_bar_locator)
182 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
183 class KeepProxyTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
187 KEEP_PROXY_SERVER = {}
191 super(KeepProxyTestCase, cls).setUpClass()
192 run_test_server.authorize_with('active')
193 cls.api_client = arvados.api('v1')
196 super(KeepProxyTestCase, self).tearDown()
197 DiskCacheBase.tearDown(self)
199 def test_KeepProxyTest1(self):
200 # Will use ARVADOS_KEEP_SERVICES environment variable that
201 # is set by setUpClass().
202 keep_client = arvados.KeepClient(api_client=self.api_client,
203 local_store='', block_cache=self.make_block_cache(self.disk_cache))
204 baz_locator = keep_client.put('baz')
207 r'^73feffa4b7f6bb68e44cf984c85f6e88\+3',
208 'wrong md5 hash from Keep.put("baz"): ' + baz_locator)
209 self.assertEqual(keep_client.get(baz_locator),
211 'wrong content from Keep.get(md5("baz"))')
212 self.assertTrue(keep_client.using_proxy)
214 def test_KeepProxyTestMultipleURIs(self):
215 # Test using ARVADOS_KEEP_SERVICES env var overriding any
216 # existing proxy setting and setting multiple proxies
217 arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
218 keep_client = arvados.KeepClient(api_client=self.api_client,
220 block_cache=self.make_block_cache(self.disk_cache))
221 uris = [x['_service_root'] for x in keep_client._keep_services]
222 self.assertEqual(uris, ['http://10.0.0.1/',
223 'https://foo.example.org:1234/'])
225 def test_KeepProxyTestInvalidURI(self):
226 arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org'
227 with self.assertRaises(arvados.errors.ArgumentError):
228 keep_client = arvados.KeepClient(api_client=self.api_client,
230 block_cache=self.make_block_cache(self.disk_cache))
232 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
233 class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
237 DiskCacheBase.tearDown(self)
239 def get_service_roots(self, api_client):
240 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
241 services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
242 return [urllib.parse.urlparse(url) for url in sorted(services)]
244 def test_ssl_flag_respected_in_roots(self):
245 for ssl_flag in [False, True]:
246 services = self.get_service_roots(self.mock_keep_services(
247 service_ssl_flag=ssl_flag))
249 ('https' if ssl_flag else 'http'), services[0].scheme)
251 def test_correct_ports_with_ipv6_addresses(self):
252 service = self.get_service_roots(self.mock_keep_services(
253 service_type='proxy', service_host='100::1', service_port=10, count=1))[0]
254 self.assertEqual('100::1', service.hostname)
255 self.assertEqual(10, service.port)
257 def test_recognize_proxy_services_in_controller_response(self):
258 keep_client = arvados.KeepClient(api_client=self.mock_keep_services(
259 service_type='proxy', service_host='localhost', service_port=9, count=1),
260 block_cache=self.make_block_cache(self.disk_cache))
262 # this will fail, but it ensures we get the service
264 keep_client.put('baz2', num_retries=0)
267 self.assertTrue(keep_client.using_proxy)
269 def test_insecure_disables_tls_verify(self):
270 api_client = self.mock_keep_services(count=1)
271 force_timeout = socket.timeout("timed out")
273 api_client.insecure = True
274 with tutil.mock_keep_responses(b'foo', 200) as mock:
275 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
276 keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
278 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
281 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
284 api_client.insecure = False
285 with tutil.mock_keep_responses(b'foo', 200) as mock:
286 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
287 keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
288 # getopt()==None here means we didn't change the
289 # default. If we were using real pycurl instead of a mock,
290 # it would return the default value 1.
292 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
295 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
298 def test_refresh_signature(self):
299 blk_digest = '6f5902ac237024bdd0c176cb93063dc4+11'
300 blk_sig = 'da39a3ee5e6b4b0d3255bfef95601890afd80709@53bed294'
301 local_loc = blk_digest+'+A'+blk_sig
302 remote_loc = blk_digest+'+R'+blk_sig
303 api_client = self.mock_keep_services(count=1)
304 headers = {'X-Keep-Locator':local_loc}
305 with tutil.mock_keep_responses('', 200, **headers):
306 # Check that the translated locator gets returned
307 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
308 self.assertEqual(local_loc, keep_client.refresh_signature(remote_loc))
309 # Check that refresh_signature() uses the correct method and headers
310 keep_client._get_or_head = mock.MagicMock()
311 keep_client.refresh_signature(remote_loc)
312 args, kwargs = keep_client._get_or_head.call_args_list[0]
313 self.assertIn(remote_loc, args)
314 self.assertEqual("HEAD", kwargs['method'])
315 self.assertIn('X-Keep-Signature', kwargs['headers'])
317 # test_*_timeout verify that KeepClient instructs pycurl to use
318 # the appropriate connection and read timeouts. They don't care
319 # whether pycurl actually exhibits the expected timeout behavior
320 # -- those tests are in the KeepClientTimeout test class.
322 def test_get_timeout(self):
323 api_client = self.mock_keep_services(count=1)
324 force_timeout = socket.timeout("timed out")
325 with tutil.mock_keep_responses(force_timeout, 0) as mock:
326 keep_client = arvados.KeepClient(
327 api_client=api_client,
328 block_cache=self.make_block_cache(self.disk_cache),
331 with self.assertRaises(arvados.errors.KeepReadError):
332 keep_client.get('ffffffffffffffffffffffffffffffff')
334 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
335 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
337 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
338 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
340 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
341 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
343 def test_put_timeout(self):
344 api_client = self.mock_keep_services(count=1)
345 force_timeout = socket.timeout("timed out")
346 with tutil.mock_keep_responses(force_timeout, 0) as mock:
347 keep_client = arvados.KeepClient(
348 api_client=api_client,
349 block_cache=self.make_block_cache(self.disk_cache),
352 with self.assertRaises(arvados.errors.KeepWriteError):
353 keep_client.put(b'foo')
355 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
356 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
358 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
359 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
361 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
362 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
364 def test_head_timeout(self):
365 api_client = self.mock_keep_services(count=1)
366 force_timeout = socket.timeout("timed out")
367 with tutil.mock_keep_responses(force_timeout, 0) as mock:
368 keep_client = arvados.KeepClient(
369 api_client=api_client,
370 block_cache=self.make_block_cache(self.disk_cache),
373 with self.assertRaises(arvados.errors.KeepReadError):
374 keep_client.head('ffffffffffffffffffffffffffffffff')
376 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
377 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
379 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
382 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
385 def test_proxy_get_timeout(self):
386 api_client = self.mock_keep_services(service_type='proxy', count=1)
387 force_timeout = socket.timeout("timed out")
388 with tutil.mock_keep_responses(force_timeout, 0) as mock:
389 keep_client = arvados.KeepClient(
390 api_client=api_client,
391 block_cache=self.make_block_cache(self.disk_cache),
394 with self.assertRaises(arvados.errors.KeepReadError):
395 keep_client.get('ffffffffffffffffffffffffffffffff')
397 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
398 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
400 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
401 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
403 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
404 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
406 def test_proxy_head_timeout(self):
407 api_client = self.mock_keep_services(service_type='proxy', count=1)
408 force_timeout = socket.timeout("timed out")
409 with tutil.mock_keep_responses(force_timeout, 0) as mock:
410 keep_client = arvados.KeepClient(
411 api_client=api_client,
412 block_cache=self.make_block_cache(self.disk_cache),
415 with self.assertRaises(arvados.errors.KeepReadError):
416 keep_client.head('ffffffffffffffffffffffffffffffff')
418 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
419 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
421 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
424 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
427 def test_proxy_put_timeout(self):
428 self.disk_cache_dir = None
429 api_client = self.mock_keep_services(service_type='proxy', count=1)
430 force_timeout = socket.timeout("timed out")
431 with tutil.mock_keep_responses(force_timeout, 0) as mock:
432 keep_client = arvados.KeepClient(
433 api_client=api_client,
436 with self.assertRaises(arvados.errors.KeepWriteError):
437 keep_client.put('foo')
439 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
440 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
442 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
443 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
445 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
446 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
448 def check_no_services_error(self, verb, exc_class):
449 api_client = mock.MagicMock(name='api_client')
450 api_client.keep_services().accessible().execute.side_effect = (
451 arvados.errors.ApiError)
452 keep_client = arvados.KeepClient(
453 api_client=api_client,
454 block_cache=self.make_block_cache(self.disk_cache),
457 with self.assertRaises(exc_class) as err_check:
458 getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
459 self.assertEqual(0, len(err_check.exception.request_errors()))
461 def test_get_error_with_no_services(self):
462 self.check_no_services_error('get', arvados.errors.KeepReadError)
464 def test_head_error_with_no_services(self):
465 self.check_no_services_error('head', arvados.errors.KeepReadError)
467 def test_put_error_with_no_services(self):
468 self.check_no_services_error('put', arvados.errors.KeepWriteError)
470 def check_errors_from_last_retry(self, verb, exc_class):
471 api_client = self.mock_keep_services(count=2)
472 req_mock = tutil.mock_keep_responses(
473 "retry error reporting test", 500, 500, 500, 500, 500, 500, 502, 502)
474 with req_mock, tutil.skip_sleep, \
475 self.assertRaises(exc_class) as err_check:
476 keep_client = arvados.KeepClient(
477 api_client=api_client,
478 block_cache=self.make_block_cache(self.disk_cache),
481 getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
483 self.assertEqual([502, 502], [
484 getattr(error, 'status_code', None)
485 for error in err_check.exception.request_errors().values()])
486 self.assertRegex(str(err_check.exception), r'failed to (read|write) .* after 4 attempts')
488 def test_get_error_reflects_last_retry(self):
489 self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
491 def test_head_error_reflects_last_retry(self):
492 self.check_errors_from_last_retry('head', arvados.errors.KeepReadError)
494 def test_put_error_reflects_last_retry(self):
495 self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
497 def test_put_error_does_not_include_successful_puts(self):
498 data = 'partial failure test'
499 data_loc = tutil.str_keep_locator(data)
500 api_client = self.mock_keep_services(count=3)
501 with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
502 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
503 keep_client = arvados.KeepClient(
504 api_client=api_client,
505 block_cache=self.make_block_cache(self.disk_cache),
508 keep_client.put(data)
509 self.assertEqual(2, len(exc_check.exception.request_errors()))
511 def test_proxy_put_with_no_writable_services(self):
512 data = 'test with no writable services'
513 data_loc = tutil.str_keep_locator(data)
514 api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
515 with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
516 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
517 keep_client = arvados.KeepClient(
518 api_client=api_client,
519 block_cache=self.make_block_cache(self.disk_cache),
522 keep_client.put(data)
523 self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
524 self.assertEqual(0, len(exc_check.exception.request_errors()))
526 def test_oddball_service_get(self):
527 body = b'oddball service get'
528 api_client = self.mock_keep_services(service_type='fancynewblobstore')
529 with tutil.mock_keep_responses(body, 200):
530 keep_client = arvados.KeepClient(
531 api_client=api_client,
532 block_cache=self.make_block_cache(self.disk_cache),
535 actual = keep_client.get(tutil.str_keep_locator(body))
536 self.assertEqual(body, actual)
538 def test_oddball_service_put(self):
539 body = b'oddball service put'
540 pdh = tutil.str_keep_locator(body)
541 api_client = self.mock_keep_services(service_type='fancynewblobstore')
542 with tutil.mock_keep_responses(pdh, 200):
543 keep_client = arvados.KeepClient(
544 api_client=api_client,
545 block_cache=self.make_block_cache(self.disk_cache),
548 actual = keep_client.put(body, copies=1)
549 self.assertEqual(pdh, actual)
551 def test_oddball_service_writer_count(self):
552 body = b'oddball service writer count'
553 pdh = tutil.str_keep_locator(body)
554 api_client = self.mock_keep_services(service_type='fancynewblobstore',
556 headers = {'x-keep-replicas-stored': 3}
557 with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
558 **headers) as req_mock:
559 keep_client = arvados.KeepClient(
560 api_client=api_client,
561 block_cache=self.make_block_cache(self.disk_cache),
564 actual = keep_client.put(body, copies=2)
565 self.assertEqual(pdh, actual)
566 self.assertEqual(1, req_mock.call_count)
569 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
570 class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
574 self.api_client = self.mock_keep_services(count=2)
575 self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
577 self.locator = '1271ed5ef305aadabc605b1609e24c52'
580 DiskCacheBase.tearDown(self)
582 @mock.patch('arvados.KeepClient.KeepService.get')
583 def test_get_request_cache(self, get_mock):
584 with tutil.mock_keep_responses(self.data, 200, 200):
585 self.keep_client.get(self.locator)
586 self.keep_client.get(self.locator)
587 # Request already cached, don't require more than one request
588 get_mock.assert_called_once()
590 @mock.patch('arvados.KeepClient.KeepService.get')
591 def test_head_request_cache(self, get_mock):
592 with tutil.mock_keep_responses(self.data, 200, 200):
593 self.keep_client.head(self.locator)
594 self.keep_client.head(self.locator)
595 # Don't cache HEAD requests so that they're not confused with GET reqs
596 self.assertEqual(2, get_mock.call_count)
598 @mock.patch('arvados.KeepClient.KeepService.get')
599 def test_head_and_then_get_return_different_responses(self, get_mock):
602 get_mock.side_effect = [b'first response', b'second response']
603 with tutil.mock_keep_responses(self.data, 200, 200):
604 head_resp = self.keep_client.head(self.locator)
605 get_resp = self.keep_client.get(self.locator)
606 self.assertEqual(b'first response', head_resp)
607 # First reponse was not cached because it was from a HEAD request.
608 self.assertNotEqual(head_resp, get_resp)
615 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
616 class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
620 self.api_client = self.mock_keep_services(count=2)
621 self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
623 self.locator = '1271ed5ef305aadabc605b1609e24c52'
624 self.test_id = arvados.util.new_request_id()
625 self.assertRegex(self.test_id, r'^req-[a-z0-9]{20}$')
626 # If we don't set request_id to None explicitly here, it will
627 # return <MagicMock name='api_client_mock.request_id'
629 self.api_client.request_id = None
632 DiskCacheBase.tearDown(self)
634 def test_default_to_api_client_request_id(self):
635 self.api_client.request_id = self.test_id
636 with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
637 self.keep_client.put(self.data)
638 self.assertEqual(2, len(mock.responses))
639 for resp in mock.responses:
640 self.assertProvidedRequestId(resp)
642 with tutil.mock_keep_responses(self.data, 200) as mock:
643 self.keep_client.get(self.locator)
644 self.assertProvidedRequestId(mock.responses[0])
646 with tutil.mock_keep_responses(b'', 200) as mock:
647 self.keep_client.head(self.locator)
648 self.assertProvidedRequestId(mock.responses[0])
650 def test_explicit_request_id(self):
651 with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
652 self.keep_client.put(self.data, request_id=self.test_id)
653 self.assertEqual(2, len(mock.responses))
654 for resp in mock.responses:
655 self.assertProvidedRequestId(resp)
657 with tutil.mock_keep_responses(self.data, 200) as mock:
658 self.keep_client.get(self.locator, request_id=self.test_id)
659 self.assertProvidedRequestId(mock.responses[0])
661 with tutil.mock_keep_responses(b'', 200) as mock:
662 self.keep_client.head(self.locator, request_id=self.test_id)
663 self.assertProvidedRequestId(mock.responses[0])
665 def test_automatic_request_id(self):
666 with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
667 self.keep_client.put(self.data)
668 self.assertEqual(2, len(mock.responses))
669 for resp in mock.responses:
670 self.assertAutomaticRequestId(resp)
672 with tutil.mock_keep_responses(self.data, 200) as mock:
673 self.keep_client.get(self.locator)
674 self.assertAutomaticRequestId(mock.responses[0])
676 with tutil.mock_keep_responses(b'', 200) as mock:
677 self.keep_client.head(self.locator)
678 self.assertAutomaticRequestId(mock.responses[0])
680 def test_request_id_in_exception(self):
681 with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
682 with self.assertRaisesRegex(arvados.errors.KeepReadError, self.test_id):
683 self.keep_client.head(self.locator, request_id=self.test_id)
685 with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
686 with self.assertRaisesRegex(arvados.errors.KeepReadError, r'req-[a-z0-9]{20}'):
687 self.keep_client.get(self.locator)
689 with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
690 with self.assertRaisesRegex(arvados.errors.KeepWriteError, self.test_id):
691 self.keep_client.put(self.data, request_id=self.test_id)
693 with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
694 with self.assertRaisesRegex(arvados.errors.KeepWriteError, r'req-[a-z0-9]{20}'):
695 self.keep_client.put(self.data)
697 def assertAutomaticRequestId(self, resp):
698 hdr = [x for x in resp.getopt(pycurl.HTTPHEADER)
699 if x.startswith('X-Request-Id: ')][0]
700 self.assertNotEqual(hdr, 'X-Request-Id: '+self.test_id)
701 self.assertRegex(hdr, r'^X-Request-Id: req-[a-z0-9]{20}$')
703 def assertProvidedRequestId(self, resp):
704 self.assertIn('X-Request-Id: '+self.test_id,
705 resp.getopt(pycurl.HTTPHEADER))
709 #@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
710 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
714 # expected_order[i] is the probe order for
715 # hash=md5(sprintf("%064x",i)) where there are 16 services
716 # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
717 # the first probe for the block consisting of 64 "0"
718 # characters is the service whose uuid is
719 # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
721 self.expected_order = [
722 list('3eab2d5fc9681074'),
723 list('097dba52e648f1c3'),
724 list('c5b4e023f8a7d691'),
725 list('9d81c02e76a3bf54'),
728 "{:064x}".format(x).encode()
729 for x in range(len(self.expected_order))]
731 hashlib.md5(self.blocks[x]).hexdigest()
732 for x in range(len(self.expected_order))]
733 self.api_client = self.mock_keep_services(count=self.services)
734 self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
737 DiskCacheBase.tearDown(self)
739 def test_weighted_service_roots_against_reference_set(self):
740 # Confirm weighted_service_roots() returns the correct order
741 for i, hash in enumerate(self.hashes):
742 roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash))
744 re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
746 self.assertEqual(self.expected_order[i], got_order)
748 def test_get_probe_order_against_reference_set(self):
749 self._test_probe_order_against_reference_set(
750 lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
752 def test_head_probe_order_against_reference_set(self):
753 self._test_probe_order_against_reference_set(
754 lambda i: self.keep_client.head(self.hashes[i], num_retries=1))
756 def test_put_probe_order_against_reference_set(self):
757 # copies=1 prevents the test from being sensitive to races
758 # between writer threads.
759 self._test_probe_order_against_reference_set(
760 lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1))
762 def _test_probe_order_against_reference_set(self, op):
763 for i in range(len(self.blocks)):
764 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \
765 self.assertRaises(arvados.errors.KeepRequestError):
768 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
769 for resp in mock.responses]
770 self.assertEqual(self.expected_order[i]*2, got_order)
772 def test_put_probe_order_multiple_copies(self):
773 for copies in range(2, 4):
774 for i in range(len(self.blocks)):
775 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \
776 self.assertRaises(arvados.errors.KeepWriteError):
777 self.keep_client.put(self.blocks[i], num_retries=2, copies=copies)
779 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
780 for resp in mock.responses]
781 # With T threads racing to make requests, the position
782 # of a given server in the sequence of HTTP requests
783 # (got_order) cannot be more than T-1 positions
784 # earlier than that server's position in the reference
785 # probe sequence (expected_order).
787 # Loop invariant: we have accounted for +pos+ expected
788 # probes, either by seeing them in +got_order+ or by
789 # putting them in +pending+ in the hope of seeing them
790 # later. As long as +len(pending)<T+, we haven't
791 # started a request too early.
793 for pos, expected in enumerate(self.expected_order[i]*3):
794 got = got_order[pos-len(pending)]
795 while got in pending:
796 del pending[pending.index(got)]
797 got = got_order[pos-len(pending)]
799 pending.append(expected)
801 len(pending), copies,
802 "pending={}, with copies={}, got {}, expected {}".format(
803 pending, copies, repr(got_order), repr(self.expected_order[i]*3)))
805 def test_probe_waste_adding_one_server(self):
807 hashlib.md5("{:064x}".format(x).encode()).hexdigest() for x in range(100)]
808 initial_services = 12
809 self.api_client = self.mock_keep_services(count=initial_services)
810 self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
812 self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
813 for added_services in range(1, 12):
814 api_client = self.mock_keep_services(count=initial_services+added_services)
815 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
817 for hash_index in range(len(hashes)):
818 probe_after = keep_client.weighted_service_roots(
819 arvados.KeepLocator(hashes[hash_index]))
820 penalty = probe_after.index(probes_before[hash_index][0])
821 self.assertLessEqual(penalty, added_services)
822 total_penalty += penalty
823 # Average penalty per block should not exceed
824 # N(added)/N(orig) by more than 20%, and should get closer
825 # to the ideal as we add data points.
828 len(hashes) / initial_services)
831 (120 - added_services)/100)
833 expect_penalty * 8/10)
835 min_penalty <= total_penalty <= max_penalty,
836 "With {}+{} services, {} blocks, penalty {} but expected {}..{}".format(
844 def check_64_zeros_error_order(self, verb, exc_class):
847 data = tutil.str_keep_locator(data)
848 # Arbitrary port number:
849 aport = random.randint(1024,65535)
850 api_client = self.mock_keep_services(service_port=aport, count=self.services)
851 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
852 with mock.patch('pycurl.Curl') as curl_mock, \
853 self.assertRaises(exc_class) as err_check:
854 curl_mock.return_value = tutil.FakeCurl.make(code=500, body=b'')
855 getattr(keep_client, verb)(data)
856 urls = [urllib.parse.urlparse(url)
857 for url in err_check.exception.request_errors()]
858 self.assertEqual([('keep0x' + c, aport) for c in '3eab2d5fc9681074'],
859 [(url.hostname, url.port) for url in urls])
861 def test_get_error_shows_probe_order(self):
862 self.check_64_zeros_error_order('get', arvados.errors.KeepReadError)
864 def test_put_error_shows_probe_order(self):
865 self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
867 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
868 class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase, DiskCacheBase):
871 # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
872 # 1s worth of data and then trigger bandwidth errors before running
875 BANDWIDTH_LOW_LIM = 1024
879 DiskCacheBase.tearDown(self)
881 class assertTakesBetween(unittest.TestCase):
882 def __init__(self, tmin, tmax):
887 self.t0 = time.time()
889 def __exit__(self, *args, **kwargs):
890 # Round times to milliseconds, like CURL. Otherwise, we
891 # fail when CURL reaches a 1s timeout at 0.9998s.
892 delta = round(time.time() - self.t0, 3)
893 self.assertGreaterEqual(delta, self.tmin)
894 self.assertLessEqual(delta, self.tmax)
896 class assertTakesGreater(unittest.TestCase):
897 def __init__(self, tmin):
901 self.t0 = time.time()
903 def __exit__(self, *args, **kwargs):
904 delta = round(time.time() - self.t0, 3)
905 self.assertGreaterEqual(delta, self.tmin)
907 def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
908 return arvados.KeepClient(
909 api_client=self.api_client,
910 timeout=timeouts, block_cache=self.make_block_cache(self.disk_cache))
912 def test_timeout_slow_connect(self):
913 # Can't simulate TCP delays with our own socket. Leave our
914 # stub server running uselessly, and try to connect to an
915 # unroutable IP address instead.
916 self.api_client = self.mock_keep_services(
918 service_host='240.0.0.0',
920 with self.assertTakesBetween(0.1, 0.5):
921 with self.assertRaises(arvados.errors.KeepWriteError):
922 self.keepClient().put(self.DATA, copies=1, num_retries=0)
924 def test_low_bandwidth_no_delays_success(self):
925 self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
926 kc = self.keepClient()
927 loc = kc.put(self.DATA, copies=1, num_retries=0)
928 self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
930 def test_too_low_bandwidth_no_delays_failure(self):
931 # Check that lessening bandwidth corresponds to failing
932 kc = self.keepClient()
933 loc = kc.put(self.DATA, copies=1, num_retries=0)
934 self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
935 with self.assertTakesGreater(self.TIMEOUT_TIME):
936 with self.assertRaises(arvados.errors.KeepReadError):
937 kc.get(loc, num_retries=0)
938 with self.assertTakesGreater(self.TIMEOUT_TIME):
939 with self.assertRaises(arvados.errors.KeepWriteError):
940 kc.put(self.DATA, copies=1, num_retries=0)
942 def test_low_bandwidth_with_server_response_delay_failure(self):
943 kc = self.keepClient()
944 loc = kc.put(self.DATA, copies=1, num_retries=0)
945 self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
946 # Note the actual delay must be 1s longer than the low speed
947 # limit interval in order for curl to detect it reliably.
948 self.server.setdelays(response=self.TIMEOUT_TIME+1)
949 with self.assertTakesGreater(self.TIMEOUT_TIME):
950 with self.assertRaises(arvados.errors.KeepReadError):
951 kc.get(loc, num_retries=0)
952 with self.assertTakesGreater(self.TIMEOUT_TIME):
953 with self.assertRaises(arvados.errors.KeepWriteError):
954 kc.put(self.DATA, copies=1, num_retries=0)
955 with self.assertTakesGreater(self.TIMEOUT_TIME):
956 kc.head(loc, num_retries=0)
958 def test_low_bandwidth_with_server_mid_delay_failure(self):
959 kc = self.keepClient()
960 loc = kc.put(self.DATA, copies=1, num_retries=0)
961 self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
962 # Note the actual delay must be 1s longer than the low speed
963 # limit interval in order for curl to detect it reliably.
964 self.server.setdelays(mid_write=self.TIMEOUT_TIME+1, mid_read=self.TIMEOUT_TIME+1)
965 with self.assertTakesGreater(self.TIMEOUT_TIME):
966 with self.assertRaises(arvados.errors.KeepReadError) as e:
967 kc.get(loc, num_retries=0)
968 with self.assertTakesGreater(self.TIMEOUT_TIME):
969 with self.assertRaises(arvados.errors.KeepWriteError):
970 kc.put(self.DATA, copies=1, num_retries=0)
972 def test_timeout_slow_request(self):
973 loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
974 self.server.setdelays(request=.2)
975 self._test_connect_timeout_under_200ms(loc)
976 self.server.setdelays(request=2)
977 self._test_response_timeout_under_2s(loc)
979 def test_timeout_slow_response(self):
980 loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
981 self.server.setdelays(response=.2)
982 self._test_connect_timeout_under_200ms(loc)
983 self.server.setdelays(response=2)
984 self._test_response_timeout_under_2s(loc)
986 def test_timeout_slow_response_body(self):
987 loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
988 self.server.setdelays(response_body=.2)
989 self._test_connect_timeout_under_200ms(loc)
990 self.server.setdelays(response_body=2)
991 self._test_response_timeout_under_2s(loc)
993 def _test_connect_timeout_under_200ms(self, loc):
994 # Allow 100ms to connect, then 1s for response. Everything
995 # should work, and everything should take at least 200ms to
997 kc = self.keepClient(timeouts=(.1, 1))
998 with self.assertTakesBetween(.2, .3):
999 kc.put(self.DATA, copies=1, num_retries=0)
1000 with self.assertTakesBetween(.2, .3):
1001 self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
1003 def _test_response_timeout_under_2s(self, loc):
1004 # Allow 10s to connect, then 1s for response. Nothing should
1005 # work, and everything should take at least 1s to return.
1006 kc = self.keepClient(timeouts=(10, 1))
1007 with self.assertTakesBetween(1, 9):
1008 with self.assertRaises(arvados.errors.KeepReadError):
1009 kc.get(loc, num_retries=0)
1010 with self.assertTakesBetween(1, 9):
1011 with self.assertRaises(arvados.errors.KeepWriteError):
1012 kc.put(self.DATA, copies=1, num_retries=0)
1014 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1015 class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
1019 DiskCacheBase.tearDown(self)
1021 def mock_disks_and_gateways(self, disks=3, gateways=1):
1023 'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
1024 'owner_uuid': 'zzzzz-tpzed-000000000000000',
1025 'service_host': 'gatewayhost{}'.format(i),
1026 'service_port': 12345,
1027 'service_ssl_flag': True,
1028 'service_type': 'gateway:test',
1029 } for i in range(gateways)]
1030 self.gateway_roots = [
1031 "https://{service_host}:{service_port}/".format(**gw)
1032 for gw in self.gateways]
1033 self.api_client = self.mock_keep_services(
1034 count=disks, additional_services=self.gateways)
1035 self.keepClient = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
1037 @mock.patch('pycurl.Curl')
1038 def test_get_with_gateway_hint_first(self, MockCurl):
1039 MockCurl.return_value = tutil.FakeCurl.make(
1040 code=200, body='foo', headers={'Content-Length': 3})
1041 self.mock_disks_and_gateways()
1042 locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@' + self.gateways[0]['uuid']
1043 self.assertEqual(b'foo', self.keepClient.get(locator))
1044 self.assertEqual(self.gateway_roots[0]+locator,
1045 MockCurl.return_value.getopt(pycurl.URL).decode())
1046 self.assertEqual(True, self.keepClient.head(locator))
1048 @mock.patch('pycurl.Curl')
1049 def test_get_with_gateway_hints_in_order(self, MockCurl):
1053 tutil.FakeCurl.make(code=404, body='')
1054 for _ in range(gateways+disks)
1056 MockCurl.side_effect = tutil.queue_with(mocks)
1057 self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1058 locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1059 ['K@'+gw['uuid'] for gw in self.gateways])
1060 with self.assertRaises(arvados.errors.NotFoundError):
1061 self.keepClient.get(locator)
1062 # Gateways are tried first, in the order given.
1063 for i, root in enumerate(self.gateway_roots):
1064 self.assertEqual(root+locator,
1065 mocks[i].getopt(pycurl.URL).decode())
1066 # Disk services are tried next.
1067 for i in range(gateways, gateways+disks):
1069 mocks[i].getopt(pycurl.URL).decode(),
1072 @mock.patch('pycurl.Curl')
1073 def test_head_with_gateway_hints_in_order(self, MockCurl):
1077 tutil.FakeCurl.make(code=404, body=b'')
1078 for _ in range(gateways+disks)
1080 MockCurl.side_effect = tutil.queue_with(mocks)
1081 self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1082 locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1083 ['K@'+gw['uuid'] for gw in self.gateways])
1084 with self.assertRaises(arvados.errors.NotFoundError):
1085 self.keepClient.head(locator)
1086 # Gateways are tried first, in the order given.
1087 for i, root in enumerate(self.gateway_roots):
1088 self.assertEqual(root+locator,
1089 mocks[i].getopt(pycurl.URL).decode())
1090 # Disk services are tried next.
1091 for i in range(gateways, gateways+disks):
1093 mocks[i].getopt(pycurl.URL).decode(),
1096 @mock.patch('pycurl.Curl')
1097 def test_get_with_remote_proxy_hint(self, MockCurl):
1098 MockCurl.return_value = tutil.FakeCurl.make(
1099 code=200, body=b'foo', headers={'Content-Length': 3})
1100 self.mock_disks_and_gateways()
1101 locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1102 self.assertEqual(b'foo', self.keepClient.get(locator))
1103 self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1104 MockCurl.return_value.getopt(pycurl.URL).decode())
1106 @mock.patch('pycurl.Curl')
1107 def test_head_with_remote_proxy_hint(self, MockCurl):
1108 MockCurl.return_value = tutil.FakeCurl.make(
1109 code=200, body=b'foo', headers={'Content-Length': 3})
1110 self.mock_disks_and_gateways()
1111 locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1112 self.assertEqual(True, self.keepClient.head(locator))
1113 self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1114 MockCurl.return_value.getopt(pycurl.URL).decode())
1116 class KeepClientRetryTestMixin(object):
1119 # Testing with a local Keep store won't exercise the retry behavior.
1120 # Instead, our strategy is:
1121 # * Create a client with one proxy specified (pointed at a black
1122 # hole), so there's no need to instantiate an API client, and
1123 # all HTTP requests come from one place.
1124 # * Mock httplib's request method to provide simulated responses.
1125 # This lets us test the retry logic extensively without relying on any
1126 # supporting servers, and prevents side effects in case something hiccups.
1127 # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
1130 # Test classes must define TEST_PATCHER to a method that mocks
1131 # out appropriate methods in the client.
1133 PROXY_ADDR = 'http://[%s]:65535/' % (tutil.TEST_HOST,)
1134 TEST_DATA = b'testdata'
1135 TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
1138 self.client_kwargs = {'proxy': self.PROXY_ADDR, 'local_store': ''}
1140 def new_client(self, **caller_kwargs):
1141 kwargs = self.client_kwargs.copy()
1142 kwargs.update(caller_kwargs)
1143 kwargs['block_cache'] = self.make_block_cache(self.disk_cache)
1144 return arvados.KeepClient(**kwargs)
1146 def run_method(self, *args, **kwargs):
1147 raise NotImplementedError("test subclasses must define run_method")
1149 def check_success(self, expected=None, *args, **kwargs):
1150 if expected is None:
1151 expected = self.DEFAULT_EXPECT
1152 self.assertEqual(expected, self.run_method(*args, **kwargs))
1154 def check_exception(self, error_class=None, *args, **kwargs):
1155 if error_class is None:
1156 error_class = self.DEFAULT_EXCEPTION
1157 with self.assertRaises(error_class) as err:
1158 self.run_method(*args, **kwargs)
1161 def test_immediate_success(self):
1162 with self.TEST_PATCHER(self.DEFAULT_EXPECT, 200):
1163 self.check_success()
1165 def test_retry_then_success(self):
1166 with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1167 self.check_success(num_retries=3)
1169 def test_exception_then_success(self):
1170 with self.TEST_PATCHER(self.DEFAULT_EXPECT, Exception('mock err'), 200):
1171 self.check_success(num_retries=3)
1173 def test_no_retry_after_permanent_error(self):
1174 with self.TEST_PATCHER(self.DEFAULT_EXPECT, 403, 200):
1175 self.check_exception(num_retries=3)
1177 def test_error_after_retries_exhausted(self):
1178 with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 500, 200):
1179 err = self.check_exception(num_retries=1)
1180 self.assertRegex(str(err.exception), r'failed to .* after 2 attempts')
1182 def test_num_retries_instance_fallback(self):
1183 self.client_kwargs['num_retries'] = 3
1184 with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1185 self.check_success()
1189 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1190 class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1191 DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
1192 DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1193 HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1194 TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1197 DiskCacheBase.tearDown(self)
1199 def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1201 return self.new_client().get(locator, *args, **kwargs)
1203 def test_specific_exception_when_not_found(self):
1204 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1205 self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1207 def test_general_exception_with_mixed_errors(self):
1208 # get should raise a NotFoundError if no server returns the block,
1209 # and a high threshold of servers report that it's not found.
1210 # This test rigs up 50/50 disagreement between two servers, and
1211 # checks that it does not become a NotFoundError.
1212 client = self.new_client(num_retries=0)
1213 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1214 with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1215 client.get(self.HINTED_LOCATOR)
1216 self.assertNotIsInstance(
1217 exc_check.exception, arvados.errors.NotFoundError,
1218 "mixed errors raised NotFoundError")
1220 def test_hint_server_can_succeed_without_retries(self):
1221 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1222 self.check_success(locator=self.HINTED_LOCATOR)
1224 def test_try_next_server_after_timeout(self):
1225 with tutil.mock_keep_responses(
1226 (socket.timeout("timed out"), 200),
1227 (self.DEFAULT_EXPECT, 200)):
1228 self.check_success(locator=self.HINTED_LOCATOR)
1230 def test_retry_data_with_wrong_checksum(self):
1231 with tutil.mock_keep_responses(
1233 (self.DEFAULT_EXPECT, 200)):
1234 self.check_success(locator=self.HINTED_LOCATOR)
1237 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1238 class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1239 DEFAULT_EXPECT = True
1240 DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1241 HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1242 TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1245 DiskCacheBase.tearDown(self)
1247 def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1249 return self.new_client().head(locator, *args, **kwargs)
1251 def test_specific_exception_when_not_found(self):
1252 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1253 self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1255 def test_general_exception_with_mixed_errors(self):
1256 # head should raise a NotFoundError if no server returns the block,
1257 # and a high threshold of servers report that it's not found.
1258 # This test rigs up 50/50 disagreement between two servers, and
1259 # checks that it does not become a NotFoundError.
1260 client = self.new_client(num_retries=0)
1261 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1262 with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1263 client.head(self.HINTED_LOCATOR)
1264 self.assertNotIsInstance(
1265 exc_check.exception, arvados.errors.NotFoundError,
1266 "mixed errors raised NotFoundError")
1268 def test_hint_server_can_succeed_without_retries(self):
1269 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1270 self.check_success(locator=self.HINTED_LOCATOR)
1272 def test_try_next_server_after_timeout(self):
1273 with tutil.mock_keep_responses(
1274 (socket.timeout("timed out"), 200),
1275 (self.DEFAULT_EXPECT, 200)):
1276 self.check_success(locator=self.HINTED_LOCATOR)
1279 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1280 class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1281 DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
1282 DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
1283 TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1286 DiskCacheBase.tearDown(self)
1288 def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
1289 copies=1, *args, **kwargs):
1290 return self.new_client().put(data, copies, *args, **kwargs)
1292 def test_do_not_send_multiple_copies_to_same_server(self):
1293 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200):
1294 self.check_exception(copies=2, num_retries=3)
1297 class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
1299 class FakeKeepService(object):
1300 def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
1302 self.will_succeed = will_succeed
1303 self.will_raise = will_raise
1305 self._result['headers'] = {}
1306 self._result['headers']['x-keep-replicas-stored'] = str(replicas)
1307 self._result['headers']['x-keep-storage-classes-confirmed'] = 'default={}'.format(replicas)
1308 self._result['body'] = 'foobar'
1310 def put(self, data_hash, data, timeout, headers):
1311 time.sleep(self.delay)
1312 if self.will_raise is not None:
1313 raise self.will_raise
1314 return self.will_succeed
1316 def last_result(self):
1317 if self.will_succeed:
1320 return {"status_code": 500, "body": "didn't succeed"}
1327 self.pool = arvados.KeepClient.KeepWriterThreadPool(
1329 data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1330 max_service_replicas = self.copies,
1331 copies = self.copies
1334 def test_only_write_enough_on_success(self):
1336 ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1337 self.pool.add_task(ks, None)
1339 self.assertEqual(self.pool.done(), (self.copies, []))
1341 def test_only_write_enough_on_partial_success(self):
1343 ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
1344 self.pool.add_task(ks, None)
1345 ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1346 self.pool.add_task(ks, None)
1348 self.assertEqual(self.pool.done(), (self.copies, []))
1350 def test_only_write_enough_when_some_crash(self):
1352 ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1353 self.pool.add_task(ks, None)
1354 ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1355 self.pool.add_task(ks, None)
1357 self.assertEqual(self.pool.done(), (self.copies, []))
1359 def test_fail_when_too_many_crash(self):
1360 for i in range(self.copies+1):
1361 ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1362 self.pool.add_task(ks, None)
1363 for i in range(self.copies-1):
1364 ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1365 self.pool.add_task(ks, None)
1367 self.assertEqual(self.pool.done(), (self.copies-1, []))
1371 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1372 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
1375 # Test put()s that need two distinct servers to succeed, possibly
1376 # requiring multiple passes through the retry loop.
1379 self.api_client = self.mock_keep_services(count=2)
1380 self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
1383 DiskCacheBase.tearDown(self)
1385 def test_success_after_exception(self):
1386 with tutil.mock_keep_responses(
1387 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1388 Exception('mock err'), 200, 200) as req_mock:
1389 self.keep_client.put('foo', num_retries=1, copies=2)
1390 self.assertEqual(3, req_mock.call_count)
1392 def test_success_after_retryable_error(self):
1393 with tutil.mock_keep_responses(
1394 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1395 500, 200, 200) as req_mock:
1396 self.keep_client.put('foo', num_retries=1, copies=2)
1397 self.assertEqual(3, req_mock.call_count)
1399 def test_fail_after_final_error(self):
1400 # First retry loop gets a 200 (can't achieve replication by
1401 # storing again on that server) and a 400 (can't retry that
1402 # server at all), so we shouldn't try a third request.
1403 with tutil.mock_keep_responses(
1404 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1405 200, 400, 200) as req_mock:
1406 with self.assertRaises(arvados.errors.KeepWriteError):
1407 self.keep_client.put('foo', num_retries=1, copies=2)
1408 self.assertEqual(2, req_mock.call_count)
1410 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1411 class KeepClientAPIErrorTest(unittest.TestCase, DiskCacheBase):
1415 DiskCacheBase.tearDown(self)
1417 def test_api_fail(self):
1418 class ApiMock(object):
1419 def __getattr__(self, r):
1420 if r == "api_token":
1422 elif r == "insecure":
1427 raise arvados.errors.KeepReadError()
1428 keep_client = arvados.KeepClient(api_client=ApiMock(),
1429 proxy='', local_store='',
1430 block_cache=self.make_block_cache(self.disk_cache))
1432 # The bug this is testing for is that if an API (not
1433 # keepstore) exception is thrown as part of a get(), the next
1434 # attempt to get that same block will result in a deadlock.
1435 # This is why there are two get()s in a row. Unfortunately,
1436 # the failure mode for this test is that the test suite
1437 # deadlocks, there isn't a good way to avoid that without
1438 # adding a special case that has no use except for this test.
1440 with self.assertRaises(arvados.errors.KeepReadError):
1441 keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1442 with self.assertRaises(arvados.errors.KeepReadError):
1443 keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1446 class KeepDiskCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
1448 self.api_client = self.mock_keep_services(count=2)
1449 self.data = b'xyzzy'
1450 self.locator = '1271ed5ef305aadabc605b1609e24c52'
1451 self.disk_cache_dir = tempfile.mkdtemp()
1454 shutil.rmtree(self.disk_cache_dir)
1457 @mock.patch('arvados.KeepClient.KeepService.get')
1458 def test_disk_cache_read(self, get_mock):
1459 # confirm it finds an existing cache block when the cache is
1462 os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1463 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1466 # block cache should have found the existing block
1467 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1468 disk_cache_dir=self.disk_cache_dir)
1469 keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1471 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1473 get_mock.assert_not_called()
1476 @mock.patch('arvados.KeepClient.KeepService.get')
1477 def test_disk_cache_share(self, get_mock):
1478 # confirm it finds a cache block written after the disk cache
1481 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1482 disk_cache_dir=self.disk_cache_dir)
1483 keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1485 os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1486 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1489 # when we try to get the block, it'll check the disk and find it.
1490 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1492 get_mock.assert_not_called()
1495 def test_disk_cache_write(self):
1496 # confirm the cache block was created
1498 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1499 disk_cache_dir=self.disk_cache_dir)
1500 keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1502 with tutil.mock_keep_responses(self.data, 200) as mock:
1503 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1505 self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1507 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1508 self.assertTrue(tutil.binary_compare(f.read(), self.data))
1511 def test_disk_cache_clean(self):
1512 # confirm that a tmp file in the cache is cleaned up
1514 os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1515 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock"), "wb") as f:
1518 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC"), "wb") as f:
1521 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC"), "wb") as f:
1524 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1525 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1526 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1528 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1529 disk_cache_dir=self.disk_cache_dir)
1531 # The tmp still hasn't been deleted because it was created in the last 60 seconds
1532 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1533 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1534 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1536 # Set the mtime to 61s in the past
1537 os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock"), times=(time.time()-61, time.time()-61))
1538 os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC"), times=(time.time()-61, time.time()-61))
1539 os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC"), times=(time.time()-61, time.time()-61))
1541 block_cache2 = arvados.keep.KeepBlockCache(disk_cache=True,
1542 disk_cache_dir=self.disk_cache_dir)
1544 # Tmp should be gone but the other ones are safe.
1545 self.assertFalse(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1546 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1547 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1550 @mock.patch('arvados.KeepClient.KeepService.get')
1551 def test_disk_cache_cap(self, get_mock):
1552 # confirm that the cache is kept to the desired limit
1554 os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1555 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1558 os.makedirs(os.path.join(self.disk_cache_dir, "acb"))
1559 with open(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock"), "wb") as f:
1562 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1563 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1565 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1566 disk_cache_dir=self.disk_cache_dir,
1569 self.assertFalse(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1570 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1573 @mock.patch('arvados.KeepClient.KeepService.get')
1574 def test_disk_cache_share(self, get_mock):
1575 # confirm that a second cache doesn't delete files that belong to the first cache.
1577 os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1578 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1581 os.makedirs(os.path.join(self.disk_cache_dir, "acb"))
1582 with open(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock"), "wb") as f:
1585 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1586 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1588 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1589 disk_cache_dir=self.disk_cache_dir,
1592 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1593 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1595 block_cache2 = arvados.keep.KeepBlockCache(disk_cache=True,
1596 disk_cache_dir=self.disk_cache_dir,
1599 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1600 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1604 def test_disk_cache_error(self):
1605 os.chmod(self.disk_cache_dir, stat.S_IRUSR)
1607 # Fail during cache initialization.
1608 with self.assertRaises(OSError):
1609 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1610 disk_cache_dir=self.disk_cache_dir)
1613 def test_disk_cache_write_error(self):
1614 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1615 disk_cache_dir=self.disk_cache_dir)
1617 keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1619 # Make the cache dir read-only
1620 os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1621 os.chmod(os.path.join(self.disk_cache_dir, self.locator[0:3]), stat.S_IRUSR)
1624 with self.assertRaises(arvados.errors.KeepCacheError):
1625 with tutil.mock_keep_responses(self.data, 200) as mock:
1626 keep_client.get(self.locator)
1629 def test_disk_cache_retry_write_error(self):
1630 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1631 disk_cache_dir=self.disk_cache_dir)
1633 keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1636 realmmap = mmap.mmap
1637 def sideeffect_mmap(*args, **kwargs):
1641 raise OSError(errno.ENOSPC, "no space")
1643 return realmmap(*args, **kwargs)
1645 with patch('mmap.mmap') as mockmmap:
1646 mockmmap.side_effect = sideeffect_mmap
1648 cache_max_before = block_cache.cache_max
1650 with tutil.mock_keep_responses(self.data, 200) as mock:
1651 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1653 self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1655 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1656 self.assertTrue(tutil.binary_compare(f.read(), self.data))
1658 # shrank the cache in response to ENOSPC
1659 self.assertTrue(cache_max_before > block_cache.cache_max)
1662 def test_disk_cache_retry_write_error2(self):
1663 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1664 disk_cache_dir=self.disk_cache_dir)
1666 keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1669 realmmap = mmap.mmap
1670 def sideeffect_mmap(*args, **kwargs):
1674 raise OSError(errno.ENOMEM, "no memory")
1676 return realmmap(*args, **kwargs)
1678 with patch('mmap.mmap') as mockmmap:
1679 mockmmap.side_effect = sideeffect_mmap
1681 slots_before = block_cache._max_slots
1683 with tutil.mock_keep_responses(self.data, 200) as mock:
1684 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1686 self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1688 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1689 self.assertTrue(tutil.binary_compare(f.read(), self.data))
1691 # shrank the cache in response to ENOMEM
1692 self.assertTrue(slots_before > block_cache._max_slots)