1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
20 from pathlib import Path
21 from unittest import mock
22 from unittest.mock import patch
30 from . import arvados_testutil as tutil
31 from . import keepstub
32 from . import run_test_server
34 from .arvados_testutil import DiskCacheBase
36 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
37 class KeepTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
41 block_cache_test = None
45 super(KeepTestCase, cls).setUpClass()
46 run_test_server.authorize_with("admin")
47 cls.api_client = arvados.api('v1')
48 cls.block_cache_test = DiskCacheBase()
49 cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
50 proxy='', local_store='',
51 block_cache=cls.block_cache_test.make_block_cache(cls.disk_cache))
54 def tearDownClass(cls):
55 super(KeepTestCase, cls).setUpClass()
56 cls.block_cache_test.tearDown()
58 def test_KeepBasicRWTest(self):
59 self.assertEqual(0, self.keep_client.upload_counter.get())
60 foo_locator = self.keep_client.put('foo')
63 r'^acbd18db4cc2f85cedef654fccc4a4d8\+3',
64 'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
66 # 6 bytes because uploaded 2 copies
67 self.assertEqual(6, self.keep_client.upload_counter.get())
69 self.assertEqual(0, self.keep_client.download_counter.get())
70 self.assertTrue(tutil.binary_compare(self.keep_client.get(foo_locator),
72 'wrong content from Keep.get(md5("foo"))')
73 self.assertEqual(3, self.keep_client.download_counter.get())
75 def test_KeepBinaryRWTest(self):
76 blob_str = b'\xff\xfe\xf7\x00\x01\x02'
77 blob_locator = self.keep_client.put(blob_str)
80 r'^7fc7c53b45e53926ba52821140fef396\+6',
81 ('wrong locator from Keep.put(<binarydata>):' + blob_locator))
82 self.assertEqual(self.keep_client.get(blob_locator),
84 'wrong content from Keep.get(md5(<binarydata>))')
86 def test_KeepLongBinaryRWTest(self):
87 blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
88 for i in range(0, 23):
89 blob_data = blob_data + blob_data
90 blob_locator = self.keep_client.put(blob_data)
93 r'^84d90fc0d8175dd5dcfab04b999bc956\+67108864',
94 ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
95 self.assertEqual(self.keep_client.get(blob_locator),
97 'wrong content from Keep.get(md5(<binarydata>))')
99 @unittest.skip("unreliable test - please fix and close #8752")
100 def test_KeepSingleCopyRWTest(self):
101 blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
102 blob_locator = self.keep_client.put(blob_data, copies=1)
105 r'^c902006bc98a3eb4a3663b65ab4a6fab\+8',
106 ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
107 self.assertEqual(self.keep_client.get(blob_locator),
109 'wrong content from Keep.get(md5(<binarydata>))')
111 def test_KeepEmptyCollectionTest(self):
112 blob_locator = self.keep_client.put('', copies=1)
115 r'^d41d8cd98f00b204e9800998ecf8427e\+0',
116 ('wrong locator from Keep.put(""): ' + blob_locator))
118 def test_KeepPutDataType(self):
119 with self.assertRaises(AttributeError):
120 # Must be bytes or have an encode() method
121 self.keep_client.put({})
123 def test_KeepHeadTest(self):
124 locator = self.keep_client.put('test_head')
127 r'^b9a772c7049325feb7130fff1f8333e9\+9',
128 'wrong md5 hash from Keep.put for "test_head": ' + locator)
129 self.assertEqual(True, self.keep_client.head(locator))
130 self.assertEqual(self.keep_client.get(locator),
132 'wrong content from Keep.get for "test_head"')
134 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
135 class KeepPermissionTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
138 KEEP_SERVER = {'blob_signing': True}
141 DiskCacheBase.tearDown(self)
143 def test_KeepBasicRWTest(self):
144 run_test_server.authorize_with('active')
145 keep_client = arvados.KeepClient(block_cache=self.make_block_cache(self.disk_cache))
146 foo_locator = keep_client.put('foo')
149 r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
150 'invalid locator from Keep.put("foo"): ' + foo_locator)
151 self.assertEqual(keep_client.get(foo_locator),
153 'wrong content from Keep.get(md5("foo"))')
155 # GET with an unsigned locator => bad request
156 bar_locator = keep_client.put('bar')
157 unsigned_bar_locator = "37b51d194a7513e45b56f6524f2d51f2+3"
160 r'^37b51d194a7513e45b56f6524f2d51f2\+3\+A[a-f0-9]+@[a-f0-9]+$',
161 'invalid locator from Keep.put("bar"): ' + bar_locator)
162 self.assertRaises(arvados.errors.KeepReadError,
164 unsigned_bar_locator)
166 # GET from a different user => bad request
167 run_test_server.authorize_with('spectator')
168 keep_client2 = arvados.KeepClient(block_cache=self.make_block_cache(self.disk_cache))
169 self.assertRaises(arvados.errors.KeepReadError,
173 # Unauthenticated GET for a signed locator => bad request
174 # Unauthenticated GET for an unsigned locator => bad request
175 keep_client.api_token = ''
176 self.assertRaises(arvados.errors.KeepReadError,
179 self.assertRaises(arvados.errors.KeepReadError,
181 unsigned_bar_locator)
184 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
185 class KeepProxyTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
189 KEEP_PROXY_SERVER = {}
193 super(KeepProxyTestCase, cls).setUpClass()
194 run_test_server.authorize_with('active')
195 cls.api_client = arvados.api('v1')
198 super(KeepProxyTestCase, self).tearDown()
199 DiskCacheBase.tearDown(self)
201 def test_KeepProxyTest1(self):
202 # Will use ARVADOS_KEEP_SERVICES environment variable that
203 # is set by setUpClass().
204 keep_client = arvados.KeepClient(api_client=self.api_client,
205 local_store='', block_cache=self.make_block_cache(self.disk_cache))
206 baz_locator = keep_client.put('baz')
209 r'^73feffa4b7f6bb68e44cf984c85f6e88\+3',
210 'wrong md5 hash from Keep.put("baz"): ' + baz_locator)
211 self.assertEqual(keep_client.get(baz_locator),
213 'wrong content from Keep.get(md5("baz"))')
214 self.assertTrue(keep_client.using_proxy)
216 def test_KeepProxyTestMultipleURIs(self):
217 # Test using ARVADOS_KEEP_SERVICES env var overriding any
218 # existing proxy setting and setting multiple proxies
219 arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
220 keep_client = arvados.KeepClient(api_client=self.api_client,
222 block_cache=self.make_block_cache(self.disk_cache))
223 uris = [x['_service_root'] for x in keep_client._keep_services]
224 self.assertEqual(uris, ['http://10.0.0.1/',
225 'https://foo.example.org:1234/'])
227 def test_KeepProxyTestInvalidURI(self):
228 arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org'
229 with self.assertRaises(arvados.errors.ArgumentError):
230 keep_client = arvados.KeepClient(api_client=self.api_client,
232 block_cache=self.make_block_cache(self.disk_cache))
235 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
236 class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
240 DiskCacheBase.tearDown(self)
242 def get_service_roots(self, api_client):
243 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
244 services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
245 return [urllib.parse.urlparse(url) for url in sorted(services)]
247 def test_ssl_flag_respected_in_roots(self):
248 for ssl_flag in [False, True]:
249 services = self.get_service_roots(self.mock_keep_services(
250 service_ssl_flag=ssl_flag))
252 ('https' if ssl_flag else 'http'), services[0].scheme)
254 def test_correct_ports_with_ipv6_addresses(self):
255 service = self.get_service_roots(self.mock_keep_services(
256 service_type='proxy', service_host='100::1', service_port=10, count=1))[0]
257 self.assertEqual('100::1', service.hostname)
258 self.assertEqual(10, service.port)
260 def test_recognize_proxy_services_in_controller_response(self):
261 keep_client = arvados.KeepClient(api_client=self.mock_keep_services(
262 service_type='proxy', service_host='localhost', service_port=9, count=1),
263 block_cache=self.make_block_cache(self.disk_cache))
265 # this will fail, but it ensures we get the service
267 keep_client.put('baz2', num_retries=0)
270 self.assertTrue(keep_client.using_proxy)
272 def test_insecure_disables_tls_verify(self):
273 api_client = self.mock_keep_services(count=1)
274 force_timeout = socket.timeout("timed out")
276 api_client.insecure = True
277 with tutil.mock_keep_responses(b'foo', 200) as mock:
278 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
279 keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
281 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
284 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
287 api_client.insecure = False
288 with tutil.mock_keep_responses(b'foo', 200) as mock:
289 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
290 keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
291 # getopt()==None here means we didn't change the
292 # default. If we were using real pycurl instead of a mock,
293 # it would return the default value 1.
295 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
298 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
301 def test_refresh_signature(self):
302 blk_digest = '6f5902ac237024bdd0c176cb93063dc4+11'
303 blk_sig = 'da39a3ee5e6b4b0d3255bfef95601890afd80709@53bed294'
304 local_loc = blk_digest+'+A'+blk_sig
305 remote_loc = blk_digest+'+R'+blk_sig
306 api_client = self.mock_keep_services(count=1)
307 headers = {'X-Keep-Locator':local_loc}
308 with tutil.mock_keep_responses('', 200, **headers):
309 # Check that the translated locator gets returned
310 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
311 self.assertEqual(local_loc, keep_client.refresh_signature(remote_loc))
312 # Check that refresh_signature() uses the correct method and headers
313 keep_client._get_or_head = mock.MagicMock()
314 keep_client.refresh_signature(remote_loc)
315 args, kwargs = keep_client._get_or_head.call_args_list[0]
316 self.assertIn(remote_loc, args)
317 self.assertEqual("HEAD", kwargs['method'])
318 self.assertIn('X-Keep-Signature', kwargs['headers'])
320 # test_*_timeout verify that KeepClient instructs pycurl to use
321 # the appropriate connection and read timeouts. They don't care
322 # whether pycurl actually exhibits the expected timeout behavior
323 # -- those tests are in the KeepClientTimeout test class.
325 def test_get_timeout(self):
326 api_client = self.mock_keep_services(count=1)
327 force_timeout = socket.timeout("timed out")
328 with tutil.mock_keep_responses(force_timeout, 0) as mock:
329 keep_client = arvados.KeepClient(
330 api_client=api_client,
331 block_cache=self.make_block_cache(self.disk_cache),
334 with self.assertRaises(arvados.errors.KeepReadError):
335 keep_client.get('ffffffffffffffffffffffffffffffff')
337 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
338 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
340 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
341 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
343 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
344 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
346 def test_put_timeout(self):
347 api_client = self.mock_keep_services(count=1)
348 force_timeout = socket.timeout("timed out")
349 with tutil.mock_keep_responses(force_timeout, 0) as mock:
350 keep_client = arvados.KeepClient(
351 api_client=api_client,
352 block_cache=self.make_block_cache(self.disk_cache),
355 with self.assertRaises(arvados.errors.KeepWriteError):
356 keep_client.put(b'foo')
358 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
359 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
361 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
362 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
364 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
365 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
367 def test_head_timeout(self):
368 api_client = self.mock_keep_services(count=1)
369 force_timeout = socket.timeout("timed out")
370 with tutil.mock_keep_responses(force_timeout, 0) as mock:
371 keep_client = arvados.KeepClient(
372 api_client=api_client,
373 block_cache=self.make_block_cache(self.disk_cache),
376 with self.assertRaises(arvados.errors.KeepReadError):
377 keep_client.head('ffffffffffffffffffffffffffffffff')
379 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
380 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
382 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
385 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
388 def test_proxy_get_timeout(self):
389 api_client = self.mock_keep_services(service_type='proxy', count=1)
390 force_timeout = socket.timeout("timed out")
391 with tutil.mock_keep_responses(force_timeout, 0) as mock:
392 keep_client = arvados.KeepClient(
393 api_client=api_client,
394 block_cache=self.make_block_cache(self.disk_cache),
397 with self.assertRaises(arvados.errors.KeepReadError):
398 keep_client.get('ffffffffffffffffffffffffffffffff')
400 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
401 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
403 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
404 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
406 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
407 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
409 def test_proxy_head_timeout(self):
410 api_client = self.mock_keep_services(service_type='proxy', count=1)
411 force_timeout = socket.timeout("timed out")
412 with tutil.mock_keep_responses(force_timeout, 0) as mock:
413 keep_client = arvados.KeepClient(
414 api_client=api_client,
415 block_cache=self.make_block_cache(self.disk_cache),
418 with self.assertRaises(arvados.errors.KeepReadError):
419 keep_client.head('ffffffffffffffffffffffffffffffff')
421 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
422 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
424 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
427 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
430 def test_proxy_put_timeout(self):
431 self.disk_cache_dir = None
432 api_client = self.mock_keep_services(service_type='proxy', count=1)
433 force_timeout = socket.timeout("timed out")
434 with tutil.mock_keep_responses(force_timeout, 0) as mock:
435 keep_client = arvados.KeepClient(
436 api_client=api_client,
439 with self.assertRaises(arvados.errors.KeepWriteError):
440 keep_client.put('foo')
442 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
443 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
445 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
446 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
448 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
449 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
451 def check_no_services_error(self, verb, exc_class):
452 api_client = mock.MagicMock(name='api_client')
453 api_client.keep_services().accessible().execute.side_effect = (
454 arvados.errors.ApiError)
455 keep_client = arvados.KeepClient(
456 api_client=api_client,
457 block_cache=self.make_block_cache(self.disk_cache),
460 with self.assertRaises(exc_class) as err_check:
461 getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
462 self.assertEqual(0, len(err_check.exception.request_errors()))
464 def test_get_error_with_no_services(self):
465 self.check_no_services_error('get', arvados.errors.KeepReadError)
467 def test_head_error_with_no_services(self):
468 self.check_no_services_error('head', arvados.errors.KeepReadError)
470 def test_put_error_with_no_services(self):
471 self.check_no_services_error('put', arvados.errors.KeepWriteError)
473 def check_errors_from_last_retry(self, verb, exc_class):
474 api_client = self.mock_keep_services(count=2)
475 req_mock = tutil.mock_keep_responses(
476 "retry error reporting test", 500, 500, 500, 500, 500, 500, 502, 502)
477 with req_mock, tutil.skip_sleep, \
478 self.assertRaises(exc_class) as err_check:
479 keep_client = arvados.KeepClient(
480 api_client=api_client,
481 block_cache=self.make_block_cache(self.disk_cache),
484 getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
486 self.assertEqual([502, 502], [
487 getattr(error, 'status_code', None)
488 for error in err_check.exception.request_errors().values()])
489 self.assertRegex(str(err_check.exception), r'failed to (read|write) .* after 4 attempts')
491 def test_get_error_reflects_last_retry(self):
492 self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
494 def test_head_error_reflects_last_retry(self):
495 self.check_errors_from_last_retry('head', arvados.errors.KeepReadError)
497 def test_put_error_reflects_last_retry(self):
498 self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
500 def test_put_error_does_not_include_successful_puts(self):
501 data = 'partial failure test'
502 data_loc = tutil.str_keep_locator(data)
503 api_client = self.mock_keep_services(count=3)
504 with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
505 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
506 keep_client = arvados.KeepClient(
507 api_client=api_client,
508 block_cache=self.make_block_cache(self.disk_cache),
511 keep_client.put(data)
512 self.assertEqual(2, len(exc_check.exception.request_errors()))
514 def test_proxy_put_with_no_writable_services(self):
515 data = 'test with no writable services'
516 data_loc = tutil.str_keep_locator(data)
517 api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
518 with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
519 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
520 keep_client = arvados.KeepClient(
521 api_client=api_client,
522 block_cache=self.make_block_cache(self.disk_cache),
525 keep_client.put(data)
526 self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
527 self.assertEqual(0, len(exc_check.exception.request_errors()))
529 def test_oddball_service_get(self):
530 body = b'oddball service get'
531 api_client = self.mock_keep_services(service_type='fancynewblobstore')
532 with tutil.mock_keep_responses(body, 200):
533 keep_client = arvados.KeepClient(
534 api_client=api_client,
535 block_cache=self.make_block_cache(self.disk_cache),
538 actual = keep_client.get(tutil.str_keep_locator(body))
539 self.assertEqual(body, actual)
541 def test_oddball_service_put(self):
542 body = b'oddball service put'
543 pdh = tutil.str_keep_locator(body)
544 api_client = self.mock_keep_services(service_type='fancynewblobstore')
545 with tutil.mock_keep_responses(pdh, 200):
546 keep_client = arvados.KeepClient(
547 api_client=api_client,
548 block_cache=self.make_block_cache(self.disk_cache),
551 actual = keep_client.put(body, copies=1)
552 self.assertEqual(pdh, actual)
554 def test_oddball_service_writer_count(self):
555 body = b'oddball service writer count'
556 pdh = tutil.str_keep_locator(body)
557 api_client = self.mock_keep_services(service_type='fancynewblobstore',
559 headers = {'x-keep-replicas-stored': 3}
560 with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
561 **headers) as req_mock:
562 keep_client = arvados.KeepClient(
563 api_client=api_client,
564 block_cache=self.make_block_cache(self.disk_cache),
567 actual = keep_client.put(body, copies=2)
568 self.assertEqual(pdh, actual)
569 self.assertEqual(1, req_mock.call_count)
573 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
574 class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
578 self.api_client = self.mock_keep_services(count=2)
579 self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
581 self.locator = '1271ed5ef305aadabc605b1609e24c52'
584 DiskCacheBase.tearDown(self)
586 @mock.patch('arvados.KeepClient.KeepService.get')
587 def test_get_request_cache(self, get_mock):
588 with tutil.mock_keep_responses(self.data, 200, 200):
589 self.keep_client.get(self.locator)
590 self.keep_client.get(self.locator)
591 # Request already cached, don't require more than one request
592 get_mock.assert_called_once()
594 @mock.patch('arvados.KeepClient.KeepService.get')
595 def test_head_request_cache(self, get_mock):
596 with tutil.mock_keep_responses(self.data, 200, 200):
597 self.keep_client.head(self.locator)
598 self.keep_client.head(self.locator)
599 # Don't cache HEAD requests so that they're not confused with GET reqs
600 self.assertEqual(2, get_mock.call_count)
602 @mock.patch('arvados.KeepClient.KeepService.get')
603 def test_head_and_then_get_return_different_responses(self, get_mock):
606 get_mock.side_effect = [b'first response', b'second response']
607 with tutil.mock_keep_responses(self.data, 200, 200):
608 head_resp = self.keep_client.head(self.locator)
609 get_resp = self.keep_client.get(self.locator)
610 self.assertEqual(b'first response', head_resp)
611 # First reponse was not cached because it was from a HEAD request.
612 self.assertNotEqual(head_resp, get_resp)
616 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
617 class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
621 self.api_client = self.mock_keep_services(count=2)
622 self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
624 self.locator = '1271ed5ef305aadabc605b1609e24c52'
625 self.test_id = arvados.util.new_request_id()
626 self.assertRegex(self.test_id, r'^req-[a-z0-9]{20}$')
627 # If we don't set request_id to None explicitly here, it will
628 # return <MagicMock name='api_client_mock.request_id'
630 self.api_client.request_id = None
633 DiskCacheBase.tearDown(self)
635 def test_default_to_api_client_request_id(self):
636 self.api_client.request_id = self.test_id
637 with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
638 self.keep_client.put(self.data)
639 self.assertEqual(2, len(mock.responses))
640 for resp in mock.responses:
641 self.assertProvidedRequestId(resp)
643 with tutil.mock_keep_responses(self.data, 200) as mock:
644 self.keep_client.get(self.locator)
645 self.assertProvidedRequestId(mock.responses[0])
647 with tutil.mock_keep_responses(b'', 200) as mock:
648 self.keep_client.head(self.locator)
649 self.assertProvidedRequestId(mock.responses[0])
651 def test_explicit_request_id(self):
652 with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
653 self.keep_client.put(self.data, request_id=self.test_id)
654 self.assertEqual(2, len(mock.responses))
655 for resp in mock.responses:
656 self.assertProvidedRequestId(resp)
658 with tutil.mock_keep_responses(self.data, 200) as mock:
659 self.keep_client.get(self.locator, request_id=self.test_id)
660 self.assertProvidedRequestId(mock.responses[0])
662 with tutil.mock_keep_responses(b'', 200) as mock:
663 self.keep_client.head(self.locator, request_id=self.test_id)
664 self.assertProvidedRequestId(mock.responses[0])
666 def test_automatic_request_id(self):
667 with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
668 self.keep_client.put(self.data)
669 self.assertEqual(2, len(mock.responses))
670 for resp in mock.responses:
671 self.assertAutomaticRequestId(resp)
673 with tutil.mock_keep_responses(self.data, 200) as mock:
674 self.keep_client.get(self.locator)
675 self.assertAutomaticRequestId(mock.responses[0])
677 with tutil.mock_keep_responses(b'', 200) as mock:
678 self.keep_client.head(self.locator)
679 self.assertAutomaticRequestId(mock.responses[0])
681 def test_request_id_in_exception(self):
682 with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
683 with self.assertRaisesRegex(arvados.errors.KeepReadError, self.test_id):
684 self.keep_client.head(self.locator, request_id=self.test_id)
686 with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
687 with self.assertRaisesRegex(arvados.errors.KeepReadError, r'req-[a-z0-9]{20}'):
688 self.keep_client.get(self.locator)
690 with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
691 with self.assertRaisesRegex(arvados.errors.KeepWriteError, self.test_id):
692 self.keep_client.put(self.data, request_id=self.test_id)
694 with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
695 with self.assertRaisesRegex(arvados.errors.KeepWriteError, r'req-[a-z0-9]{20}'):
696 self.keep_client.put(self.data)
698 def assertAutomaticRequestId(self, resp):
699 hdr = [x for x in resp.getopt(pycurl.HTTPHEADER)
700 if x.startswith('X-Request-Id: ')][0]
701 self.assertNotEqual(hdr, 'X-Request-Id: '+self.test_id)
702 self.assertRegex(hdr, r'^X-Request-Id: req-[a-z0-9]{20}$')
704 def assertProvidedRequestId(self, resp):
705 self.assertIn('X-Request-Id: '+self.test_id,
706 resp.getopt(pycurl.HTTPHEADER))
710 #@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
711 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
715 # expected_order[i] is the probe order for
716 # hash=md5(sprintf("%064x",i)) where there are 16 services
717 # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
718 # the first probe for the block consisting of 64 "0"
719 # characters is the service whose uuid is
720 # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
722 self.expected_order = [
723 list('3eab2d5fc9681074'),
724 list('097dba52e648f1c3'),
725 list('c5b4e023f8a7d691'),
726 list('9d81c02e76a3bf54'),
729 "{:064x}".format(x).encode()
730 for x in range(len(self.expected_order))]
732 hashlib.md5(self.blocks[x]).hexdigest()
733 for x in range(len(self.expected_order))]
734 self.api_client = self.mock_keep_services(count=self.services)
735 self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
738 DiskCacheBase.tearDown(self)
740 def test_weighted_service_roots_against_reference_set(self):
741 # Confirm weighted_service_roots() returns the correct order
742 for i, hash in enumerate(self.hashes):
743 roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash))
745 re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
747 self.assertEqual(self.expected_order[i], got_order)
749 def test_get_probe_order_against_reference_set(self):
750 self._test_probe_order_against_reference_set(
751 lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
753 def test_head_probe_order_against_reference_set(self):
754 self._test_probe_order_against_reference_set(
755 lambda i: self.keep_client.head(self.hashes[i], num_retries=1))
757 def test_put_probe_order_against_reference_set(self):
758 # copies=1 prevents the test from being sensitive to races
759 # between writer threads.
760 self._test_probe_order_against_reference_set(
761 lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1))
763 def _test_probe_order_against_reference_set(self, op):
764 for i in range(len(self.blocks)):
765 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \
766 self.assertRaises(arvados.errors.KeepRequestError):
769 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
770 for resp in mock.responses]
771 self.assertEqual(self.expected_order[i]*2, got_order)
773 def test_put_probe_order_multiple_copies(self):
774 for copies in range(2, 4):
775 for i in range(len(self.blocks)):
776 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \
777 self.assertRaises(arvados.errors.KeepWriteError):
778 self.keep_client.put(self.blocks[i], num_retries=2, copies=copies)
780 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
781 for resp in mock.responses]
782 # With T threads racing to make requests, the position
783 # of a given server in the sequence of HTTP requests
784 # (got_order) cannot be more than T-1 positions
785 # earlier than that server's position in the reference
786 # probe sequence (expected_order).
788 # Loop invariant: we have accounted for +pos+ expected
789 # probes, either by seeing them in +got_order+ or by
790 # putting them in +pending+ in the hope of seeing them
791 # later. As long as +len(pending)<T+, we haven't
792 # started a request too early.
794 for pos, expected in enumerate(self.expected_order[i]*3):
795 got = got_order[pos-len(pending)]
796 while got in pending:
797 del pending[pending.index(got)]
798 got = got_order[pos-len(pending)]
800 pending.append(expected)
802 len(pending), copies,
803 "pending={}, with copies={}, got {}, expected {}".format(
804 pending, copies, repr(got_order), repr(self.expected_order[i]*3)))
806 def test_probe_waste_adding_one_server(self):
808 hashlib.md5("{:064x}".format(x).encode()).hexdigest() for x in range(100)]
809 initial_services = 12
810 self.api_client = self.mock_keep_services(count=initial_services)
811 self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
813 self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
814 for added_services in range(1, 12):
815 api_client = self.mock_keep_services(count=initial_services+added_services)
816 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
818 for hash_index in range(len(hashes)):
819 probe_after = keep_client.weighted_service_roots(
820 arvados.KeepLocator(hashes[hash_index]))
821 penalty = probe_after.index(probes_before[hash_index][0])
822 self.assertLessEqual(penalty, added_services)
823 total_penalty += penalty
824 # Average penalty per block should not exceed
825 # N(added)/N(orig) by more than 20%, and should get closer
826 # to the ideal as we add data points.
829 len(hashes) / initial_services)
832 (120 - added_services)/100)
834 expect_penalty * 8/10)
836 min_penalty <= total_penalty <= max_penalty,
837 "With {}+{} services, {} blocks, penalty {} but expected {}..{}".format(
845 def check_64_zeros_error_order(self, verb, exc_class):
848 data = tutil.str_keep_locator(data)
849 # Arbitrary port number:
850 aport = random.randint(1024,65535)
851 api_client = self.mock_keep_services(service_port=aport, count=self.services)
852 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
853 with mock.patch('pycurl.Curl') as curl_mock, \
854 self.assertRaises(exc_class) as err_check:
855 curl_mock.return_value = tutil.FakeCurl.make(code=500, body=b'')
856 getattr(keep_client, verb)(data)
857 urls = [urllib.parse.urlparse(url)
858 for url in err_check.exception.request_errors()]
859 self.assertEqual([('keep0x' + c, aport) for c in '3eab2d5fc9681074'],
860 [(url.hostname, url.port) for url in urls])
862 def test_get_error_shows_probe_order(self):
863 self.check_64_zeros_error_order('get', arvados.errors.KeepReadError)
865 def test_put_error_shows_probe_order(self):
866 self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
869 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
870 class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase, DiskCacheBase):
873 # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
874 # 1s worth of data and then trigger bandwidth errors before running
877 BANDWIDTH_LOW_LIM = 1024
881 DiskCacheBase.tearDown(self)
883 class assertTakesBetween(unittest.TestCase):
884 def __init__(self, tmin, tmax):
889 self.t0 = time.time()
891 def __exit__(self, *args, **kwargs):
892 # Round times to milliseconds, like CURL. Otherwise, we
893 # fail when CURL reaches a 1s timeout at 0.9998s.
894 delta = round(time.time() - self.t0, 3)
895 self.assertGreaterEqual(delta, self.tmin)
896 self.assertLessEqual(delta, self.tmax)
898 class assertTakesGreater(unittest.TestCase):
899 def __init__(self, tmin):
903 self.t0 = time.time()
905 def __exit__(self, *args, **kwargs):
906 delta = round(time.time() - self.t0, 3)
907 self.assertGreaterEqual(delta, self.tmin)
909 def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
910 return arvados.KeepClient(
911 api_client=self.api_client,
912 timeout=timeouts, block_cache=self.make_block_cache(self.disk_cache))
914 def test_timeout_slow_connect(self):
915 # Can't simulate TCP delays with our own socket. Leave our
916 # stub server running uselessly, and try to connect to an
917 # unroutable IP address instead.
918 self.api_client = self.mock_keep_services(
920 service_host='240.0.0.0',
922 with self.assertTakesBetween(0.1, 0.5):
923 with self.assertRaises(arvados.errors.KeepWriteError):
924 self.keepClient().put(self.DATA, copies=1, num_retries=0)
926 def test_low_bandwidth_no_delays_success(self):
927 self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
928 kc = self.keepClient()
929 loc = kc.put(self.DATA, copies=1, num_retries=0)
930 self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
932 def test_too_low_bandwidth_no_delays_failure(self):
933 # Check that lessening bandwidth corresponds to failing
934 kc = self.keepClient()
935 loc = kc.put(self.DATA, copies=1, num_retries=0)
936 self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
937 with self.assertTakesGreater(self.TIMEOUT_TIME):
938 with self.assertRaises(arvados.errors.KeepReadError):
939 kc.get(loc, num_retries=0)
940 with self.assertTakesGreater(self.TIMEOUT_TIME):
941 with self.assertRaises(arvados.errors.KeepWriteError):
942 kc.put(self.DATA, copies=1, num_retries=0)
944 def test_low_bandwidth_with_server_response_delay_failure(self):
945 kc = self.keepClient()
946 loc = kc.put(self.DATA, copies=1, num_retries=0)
947 self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
948 # Note the actual delay must be 1s longer than the low speed
949 # limit interval in order for curl to detect it reliably.
950 self.server.setdelays(response=self.TIMEOUT_TIME+1)
951 with self.assertTakesGreater(self.TIMEOUT_TIME):
952 with self.assertRaises(arvados.errors.KeepReadError):
953 kc.get(loc, num_retries=0)
954 with self.assertTakesGreater(self.TIMEOUT_TIME):
955 with self.assertRaises(arvados.errors.KeepWriteError):
956 kc.put(self.DATA, copies=1, num_retries=0)
957 with self.assertTakesGreater(self.TIMEOUT_TIME):
958 kc.head(loc, num_retries=0)
960 def test_low_bandwidth_with_server_mid_delay_failure(self):
961 kc = self.keepClient()
962 loc = kc.put(self.DATA, copies=1, num_retries=0)
963 self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
964 # Note the actual delay must be 1s longer than the low speed
965 # limit interval in order for curl to detect it reliably.
966 self.server.setdelays(mid_write=self.TIMEOUT_TIME+1, mid_read=self.TIMEOUT_TIME+1)
967 with self.assertTakesGreater(self.TIMEOUT_TIME):
968 with self.assertRaises(arvados.errors.KeepReadError) as e:
969 kc.get(loc, num_retries=0)
970 with self.assertTakesGreater(self.TIMEOUT_TIME):
971 with self.assertRaises(arvados.errors.KeepWriteError):
972 kc.put(self.DATA, copies=1, num_retries=0)
974 def test_timeout_slow_request(self):
975 loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
976 self.server.setdelays(request=.2)
977 self._test_connect_timeout_under_200ms(loc)
978 self.server.setdelays(request=2)
979 self._test_response_timeout_under_2s(loc)
981 def test_timeout_slow_response(self):
982 loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
983 self.server.setdelays(response=.2)
984 self._test_connect_timeout_under_200ms(loc)
985 self.server.setdelays(response=2)
986 self._test_response_timeout_under_2s(loc)
988 def test_timeout_slow_response_body(self):
989 loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
990 self.server.setdelays(response_body=.2)
991 self._test_connect_timeout_under_200ms(loc)
992 self.server.setdelays(response_body=2)
993 self._test_response_timeout_under_2s(loc)
995 def _test_connect_timeout_under_200ms(self, loc):
996 # Allow 100ms to connect, then 1s for response. Everything
997 # should work, and everything should take at least 200ms to
999 kc = self.keepClient(timeouts=(.1, 1))
1000 with self.assertTakesBetween(.2, .3):
1001 kc.put(self.DATA, copies=1, num_retries=0)
1002 with self.assertTakesBetween(.2, .3):
1003 self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
1005 def _test_response_timeout_under_2s(self, loc):
1006 # Allow 10s to connect, then 1s for response. Nothing should
1007 # work, and everything should take at least 1s to return.
1008 kc = self.keepClient(timeouts=(10, 1))
1009 with self.assertTakesBetween(1, 9):
1010 with self.assertRaises(arvados.errors.KeepReadError):
1011 kc.get(loc, num_retries=0)
1012 with self.assertTakesBetween(1, 9):
1013 with self.assertRaises(arvados.errors.KeepWriteError):
1014 kc.put(self.DATA, copies=1, num_retries=0)
1017 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1018 class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
1022 DiskCacheBase.tearDown(self)
1024 def mock_disks_and_gateways(self, disks=3, gateways=1):
1026 'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
1027 'owner_uuid': 'zzzzz-tpzed-000000000000000',
1028 'service_host': 'gatewayhost{}'.format(i),
1029 'service_port': 12345,
1030 'service_ssl_flag': True,
1031 'service_type': 'gateway:test',
1032 } for i in range(gateways)]
1033 self.gateway_roots = [
1034 "https://{service_host}:{service_port}/".format(**gw)
1035 for gw in self.gateways]
1036 self.api_client = self.mock_keep_services(
1037 count=disks, additional_services=self.gateways)
1038 self.keepClient = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
1040 @mock.patch('pycurl.Curl')
1041 def test_get_with_gateway_hint_first(self, MockCurl):
1042 MockCurl.return_value = tutil.FakeCurl.make(
1043 code=200, body='foo', headers={'Content-Length': 3})
1044 self.mock_disks_and_gateways()
1045 locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@' + self.gateways[0]['uuid']
1046 self.assertEqual(b'foo', self.keepClient.get(locator))
1047 self.assertEqual(self.gateway_roots[0]+locator,
1048 MockCurl.return_value.getopt(pycurl.URL).decode())
1049 self.assertEqual(True, self.keepClient.head(locator))
1051 @mock.patch('pycurl.Curl')
1052 def test_get_with_gateway_hints_in_order(self, MockCurl):
1056 tutil.FakeCurl.make(code=404, body='')
1057 for _ in range(gateways+disks)
1059 MockCurl.side_effect = tutil.queue_with(mocks)
1060 self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1061 locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1062 ['K@'+gw['uuid'] for gw in self.gateways])
1063 with self.assertRaises(arvados.errors.NotFoundError):
1064 self.keepClient.get(locator)
1065 # Gateways are tried first, in the order given.
1066 for i, root in enumerate(self.gateway_roots):
1067 self.assertEqual(root+locator,
1068 mocks[i].getopt(pycurl.URL).decode())
1069 # Disk services are tried next.
1070 for i in range(gateways, gateways+disks):
1072 mocks[i].getopt(pycurl.URL).decode(),
1075 @mock.patch('pycurl.Curl')
1076 def test_head_with_gateway_hints_in_order(self, MockCurl):
1080 tutil.FakeCurl.make(code=404, body=b'')
1081 for _ in range(gateways+disks)
1083 MockCurl.side_effect = tutil.queue_with(mocks)
1084 self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1085 locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1086 ['K@'+gw['uuid'] for gw in self.gateways])
1087 with self.assertRaises(arvados.errors.NotFoundError):
1088 self.keepClient.head(locator)
1089 # Gateways are tried first, in the order given.
1090 for i, root in enumerate(self.gateway_roots):
1091 self.assertEqual(root+locator,
1092 mocks[i].getopt(pycurl.URL).decode())
1093 # Disk services are tried next.
1094 for i in range(gateways, gateways+disks):
1096 mocks[i].getopt(pycurl.URL).decode(),
1099 @mock.patch('pycurl.Curl')
1100 def test_get_with_remote_proxy_hint(self, MockCurl):
1101 MockCurl.return_value = tutil.FakeCurl.make(
1102 code=200, body=b'foo', headers={'Content-Length': 3})
1103 self.mock_disks_and_gateways()
1104 locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1105 self.assertEqual(b'foo', self.keepClient.get(locator))
1106 self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1107 MockCurl.return_value.getopt(pycurl.URL).decode())
1109 @mock.patch('pycurl.Curl')
1110 def test_head_with_remote_proxy_hint(self, MockCurl):
1111 MockCurl.return_value = tutil.FakeCurl.make(
1112 code=200, body=b'foo', headers={'Content-Length': 3})
1113 self.mock_disks_and_gateways()
1114 locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1115 self.assertEqual(True, self.keepClient.head(locator))
1116 self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1117 MockCurl.return_value.getopt(pycurl.URL).decode())
1120 class KeepClientRetryTestMixin(object):
1123 # Testing with a local Keep store won't exercise the retry behavior.
1124 # Instead, our strategy is:
1125 # * Create a client with one proxy specified (pointed at a black
1126 # hole), so there's no need to instantiate an API client, and
1127 # all HTTP requests come from one place.
1128 # * Mock httplib's request method to provide simulated responses.
1129 # This lets us test the retry logic extensively without relying on any
1130 # supporting servers, and prevents side effects in case something hiccups.
1131 # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
1134 # Test classes must define TEST_PATCHER to a method that mocks
1135 # out appropriate methods in the client.
1137 PROXY_ADDR = 'http://[%s]:65535/' % (tutil.TEST_HOST,)
1138 TEST_DATA = b'testdata'
1139 TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
1142 self.client_kwargs = {'proxy': self.PROXY_ADDR, 'local_store': ''}
1144 def new_client(self, **caller_kwargs):
1145 kwargs = self.client_kwargs.copy()
1146 kwargs.update(caller_kwargs)
1147 kwargs['block_cache'] = self.make_block_cache(self.disk_cache)
1148 return arvados.KeepClient(**kwargs)
1150 def run_method(self, *args, **kwargs):
1151 raise NotImplementedError("test subclasses must define run_method")
1153 def check_success(self, expected=None, *args, **kwargs):
1154 if expected is None:
1155 expected = self.DEFAULT_EXPECT
1156 self.assertEqual(expected, self.run_method(*args, **kwargs))
1158 def check_exception(self, error_class=None, *args, **kwargs):
1159 if error_class is None:
1160 error_class = self.DEFAULT_EXCEPTION
1161 with self.assertRaises(error_class) as err:
1162 self.run_method(*args, **kwargs)
1165 def test_immediate_success(self):
1166 with self.TEST_PATCHER(self.DEFAULT_EXPECT, 200):
1167 self.check_success()
1169 def test_retry_then_success(self):
1170 with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1171 self.check_success(num_retries=3)
1173 def test_exception_then_success(self):
1174 with self.TEST_PATCHER(self.DEFAULT_EXPECT, Exception('mock err'), 200):
1175 self.check_success(num_retries=3)
1177 def test_no_retry_after_permanent_error(self):
1178 with self.TEST_PATCHER(self.DEFAULT_EXPECT, 403, 200):
1179 self.check_exception(num_retries=3)
1181 def test_error_after_retries_exhausted(self):
1182 with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 500, 200):
1183 err = self.check_exception(num_retries=1)
1184 self.assertRegex(str(err.exception), r'failed to .* after 2 attempts')
1186 def test_num_retries_instance_fallback(self):
1187 self.client_kwargs['num_retries'] = 3
1188 with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1189 self.check_success()
1193 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1194 class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1195 DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
1196 DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1197 HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1198 TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1201 DiskCacheBase.tearDown(self)
1203 def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1205 return self.new_client().get(locator, *args, **kwargs)
1207 def test_specific_exception_when_not_found(self):
1208 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1209 self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1211 def test_general_exception_with_mixed_errors(self):
1212 # get should raise a NotFoundError if no server returns the block,
1213 # and a high threshold of servers report that it's not found.
1214 # This test rigs up 50/50 disagreement between two servers, and
1215 # checks that it does not become a NotFoundError.
1216 client = self.new_client(num_retries=0)
1217 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1218 with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1219 client.get(self.HINTED_LOCATOR)
1220 self.assertNotIsInstance(
1221 exc_check.exception, arvados.errors.NotFoundError,
1222 "mixed errors raised NotFoundError")
1224 def test_hint_server_can_succeed_without_retries(self):
1225 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1226 self.check_success(locator=self.HINTED_LOCATOR)
1228 def test_try_next_server_after_timeout(self):
1229 with tutil.mock_keep_responses(
1230 (socket.timeout("timed out"), 200),
1231 (self.DEFAULT_EXPECT, 200)):
1232 self.check_success(locator=self.HINTED_LOCATOR)
1234 def test_retry_data_with_wrong_checksum(self):
1235 with tutil.mock_keep_responses(
1237 (self.DEFAULT_EXPECT, 200)):
1238 self.check_success(locator=self.HINTED_LOCATOR)
1242 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1243 class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1244 DEFAULT_EXPECT = True
1245 DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1246 HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1247 TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1250 DiskCacheBase.tearDown(self)
1252 def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1254 return self.new_client().head(locator, *args, **kwargs)
1256 def test_specific_exception_when_not_found(self):
1257 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1258 self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1260 def test_general_exception_with_mixed_errors(self):
1261 # head should raise a NotFoundError if no server returns the block,
1262 # and a high threshold of servers report that it's not found.
1263 # This test rigs up 50/50 disagreement between two servers, and
1264 # checks that it does not become a NotFoundError.
1265 client = self.new_client(num_retries=0)
1266 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1267 with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1268 client.head(self.HINTED_LOCATOR)
1269 self.assertNotIsInstance(
1270 exc_check.exception, arvados.errors.NotFoundError,
1271 "mixed errors raised NotFoundError")
1273 def test_hint_server_can_succeed_without_retries(self):
1274 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1275 self.check_success(locator=self.HINTED_LOCATOR)
1277 def test_try_next_server_after_timeout(self):
1278 with tutil.mock_keep_responses(
1279 (socket.timeout("timed out"), 200),
1280 (self.DEFAULT_EXPECT, 200)):
1281 self.check_success(locator=self.HINTED_LOCATOR)
1285 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1286 class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1287 DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
1288 DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
1289 TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1292 DiskCacheBase.tearDown(self)
1294 def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
1295 copies=1, *args, **kwargs):
1296 return self.new_client().put(data, copies, *args, **kwargs)
1298 def test_do_not_send_multiple_copies_to_same_server(self):
1299 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200):
1300 self.check_exception(copies=2, num_retries=3)
1303 class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
1304 class FakeKeepService(object):
1305 def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
1307 self.will_succeed = will_succeed
1308 self.will_raise = will_raise
1310 self._result['headers'] = {}
1311 self._result['headers']['x-keep-replicas-stored'] = str(replicas)
1312 self._result['headers']['x-keep-storage-classes-confirmed'] = 'default={}'.format(replicas)
1313 self._result['body'] = 'foobar'
1315 def put(self, data_hash, data, timeout, headers):
1316 time.sleep(self.delay)
1317 if self.will_raise is not None:
1318 raise self.will_raise
1319 return self.will_succeed
1321 def last_result(self):
1322 if self.will_succeed:
1325 return {"status_code": 500, "body": "didn't succeed"}
1333 self.pool = arvados.KeepClient.KeepWriterThreadPool(
1335 data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1336 max_service_replicas = self.copies,
1337 copies = self.copies
1340 def test_only_write_enough_on_success(self):
1342 ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1343 self.pool.add_task(ks, None)
1345 self.assertEqual(self.pool.done(), (self.copies, []))
1347 def test_only_write_enough_on_partial_success(self):
1349 ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
1350 self.pool.add_task(ks, None)
1351 ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1352 self.pool.add_task(ks, None)
1354 self.assertEqual(self.pool.done(), (self.copies, []))
1356 def test_only_write_enough_when_some_crash(self):
1358 ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1359 self.pool.add_task(ks, None)
1360 ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1361 self.pool.add_task(ks, None)
1363 self.assertEqual(self.pool.done(), (self.copies, []))
1365 def test_fail_when_too_many_crash(self):
1366 for i in range(self.copies+1):
1367 ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1368 self.pool.add_task(ks, None)
1369 for i in range(self.copies-1):
1370 ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1371 self.pool.add_task(ks, None)
1373 self.assertEqual(self.pool.done(), (self.copies-1, []))
1377 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1378 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
1381 # Test put()s that need two distinct servers to succeed, possibly
1382 # requiring multiple passes through the retry loop.
1385 self.api_client = self.mock_keep_services(count=2)
1386 self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
1389 DiskCacheBase.tearDown(self)
1391 def test_success_after_exception(self):
1392 with tutil.mock_keep_responses(
1393 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1394 Exception('mock err'), 200, 200) as req_mock:
1395 self.keep_client.put('foo', num_retries=1, copies=2)
1396 self.assertEqual(3, req_mock.call_count)
1398 def test_success_after_retryable_error(self):
1399 with tutil.mock_keep_responses(
1400 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1401 500, 200, 200) as req_mock:
1402 self.keep_client.put('foo', num_retries=1, copies=2)
1403 self.assertEqual(3, req_mock.call_count)
1405 def test_fail_after_final_error(self):
1406 # First retry loop gets a 200 (can't achieve replication by
1407 # storing again on that server) and a 400 (can't retry that
1408 # server at all), so we shouldn't try a third request.
1409 with tutil.mock_keep_responses(
1410 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1411 200, 400, 200) as req_mock:
1412 with self.assertRaises(arvados.errors.KeepWriteError):
1413 self.keep_client.put('foo', num_retries=1, copies=2)
1414 self.assertEqual(2, req_mock.call_count)
1417 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1418 class KeepClientAPIErrorTest(unittest.TestCase, DiskCacheBase):
1422 DiskCacheBase.tearDown(self)
1424 def test_api_fail(self):
1425 class ApiMock(object):
1426 def __getattr__(self, r):
1427 if r == "api_token":
1429 elif r == "insecure":
1434 raise arvados.errors.KeepReadError()
1435 keep_client = arvados.KeepClient(api_client=ApiMock(),
1436 proxy='', local_store='',
1437 block_cache=self.make_block_cache(self.disk_cache))
1439 # The bug this is testing for is that if an API (not
1440 # keepstore) exception is thrown as part of a get(), the next
1441 # attempt to get that same block will result in a deadlock.
1442 # This is why there are two get()s in a row. Unfortunately,
1443 # the failure mode for this test is that the test suite
1444 # deadlocks, there isn't a good way to avoid that without
1445 # adding a special case that has no use except for this test.
1447 with self.assertRaises(arvados.errors.KeepReadError):
1448 keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1449 with self.assertRaises(arvados.errors.KeepReadError):
1450 keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1453 class KeepDiskCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
1455 self.api_client = self.mock_keep_services(count=2)
1456 self.data = b'xyzzy'
1457 self.locator = '1271ed5ef305aadabc605b1609e24c52'
1458 self.disk_cache_dir = tempfile.mkdtemp()
1461 shutil.rmtree(self.disk_cache_dir)
1463 @mock.patch('arvados.util._BaseDirectories.storage_path')
1464 def test_default_disk_cache_dir(self, storage_path):
1465 expected = Path(self.disk_cache_dir)
1466 storage_path.return_value = expected
1467 cache = arvados.keep.KeepBlockCache(disk_cache=True)
1468 storage_path.assert_called_with('keep')
1469 self.assertEqual(cache._disk_cache_dir, str(expected))
1471 @mock.patch('arvados.KeepClient.KeepService.get')
1472 def test_disk_cache_read(self, get_mock):
1473 # confirm it finds an existing cache block when the cache is
1476 os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1477 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1480 # block cache should have found the existing block
1481 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1482 disk_cache_dir=self.disk_cache_dir)
1483 keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1485 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1487 get_mock.assert_not_called()
1489 @mock.patch('arvados.KeepClient.KeepService.get')
1490 def test_disk_cache_share(self, get_mock):
1491 # confirm it finds a cache block written after the disk cache
1494 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1495 disk_cache_dir=self.disk_cache_dir)
1496 keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1498 os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1499 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1502 # when we try to get the block, it'll check the disk and find it.
1503 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1505 get_mock.assert_not_called()
1507 def test_disk_cache_write(self):
1508 # confirm the cache block was created
1510 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1511 disk_cache_dir=self.disk_cache_dir)
1512 keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1514 with tutil.mock_keep_responses(self.data, 200) as mock:
1515 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1517 self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1519 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1520 self.assertTrue(tutil.binary_compare(f.read(), self.data))
1522 def test_disk_cache_clean(self):
1523 # confirm that a tmp file in the cache is cleaned up
1525 os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1526 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock"), "wb") as f:
1529 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC"), "wb") as f:
1532 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC"), "wb") as f:
1535 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1536 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1537 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1539 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1540 disk_cache_dir=self.disk_cache_dir)
1542 # The tmp still hasn't been deleted because it was created in the last 60 seconds
1543 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1544 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1545 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1547 # Set the mtime to 61s in the past
1548 os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock"), times=(time.time()-61, time.time()-61))
1549 os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC"), times=(time.time()-61, time.time()-61))
1550 os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC"), times=(time.time()-61, time.time()-61))
1552 block_cache2 = arvados.keep.KeepBlockCache(disk_cache=True,
1553 disk_cache_dir=self.disk_cache_dir)
1555 # Tmp should be gone but the other ones are safe.
1556 self.assertFalse(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1557 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1558 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1560 @mock.patch('arvados.KeepClient.KeepService.get')
1561 def test_disk_cache_cap(self, get_mock):
1562 # confirm that the cache is kept to the desired limit
1564 os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1565 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1568 os.makedirs(os.path.join(self.disk_cache_dir, "acb"))
1569 with open(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock"), "wb") as f:
1572 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1573 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1575 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1576 disk_cache_dir=self.disk_cache_dir,
1579 self.assertFalse(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1580 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1582 @mock.patch('arvados.KeepClient.KeepService.get')
1583 def test_disk_cache_share(self, get_mock):
1584 # confirm that a second cache doesn't delete files that belong to the first cache.
1586 os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1587 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1590 os.makedirs(os.path.join(self.disk_cache_dir, "acb"))
1591 with open(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock"), "wb") as f:
1594 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1595 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1597 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1598 disk_cache_dir=self.disk_cache_dir,
1601 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1602 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1604 block_cache2 = arvados.keep.KeepBlockCache(disk_cache=True,
1605 disk_cache_dir=self.disk_cache_dir,
1608 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1609 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1611 def test_disk_cache_error(self):
1612 os.chmod(self.disk_cache_dir, stat.S_IRUSR)
1614 # Fail during cache initialization.
1615 with self.assertRaises(OSError):
1616 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1617 disk_cache_dir=self.disk_cache_dir)
1619 def test_disk_cache_write_error(self):
1620 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1621 disk_cache_dir=self.disk_cache_dir)
1623 keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1625 # Make the cache dir read-only
1626 os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1627 os.chmod(os.path.join(self.disk_cache_dir, self.locator[0:3]), stat.S_IRUSR)
1630 with self.assertRaises(arvados.errors.KeepCacheError):
1631 with tutil.mock_keep_responses(self.data, 200) as mock:
1632 keep_client.get(self.locator)
1634 def test_disk_cache_retry_write_error(self):
1635 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1636 disk_cache_dir=self.disk_cache_dir)
1638 keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1641 realmmap = mmap.mmap
1642 def sideeffect_mmap(*args, **kwargs):
1646 raise OSError(errno.ENOSPC, "no space")
1648 return realmmap(*args, **kwargs)
1650 with patch('mmap.mmap', autospec=True, side_effect=sideeffect_mmap) as mockmmap:
1651 cache_max_before = block_cache.cache_max
1653 with tutil.mock_keep_responses(self.data, 200) as mock:
1654 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1656 self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1658 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1659 self.assertTrue(tutil.binary_compare(f.read(), self.data))
1661 # shrank the cache in response to ENOSPC
1662 self.assertGreater(cache_max_before, block_cache.cache_max)
1664 def test_disk_cache_retry_write_error2(self):
1665 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1666 disk_cache_dir=self.disk_cache_dir)
1668 keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1671 realmmap = mmap.mmap
1672 def sideeffect_mmap(*args, **kwargs):
1676 raise OSError(errno.ENOMEM, "no memory")
1678 return realmmap(*args, **kwargs)
1680 with patch('mmap.mmap', autospec=True, side_effect=sideeffect_mmap) as mockmmap:
1681 slots_before = block_cache._max_slots
1683 with tutil.mock_keep_responses(self.data, 200) as mock:
1684 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1686 self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1688 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1689 self.assertTrue(tutil.binary_compare(f.read(), self.data))
1691 # shrank the cache in response to ENOMEM
1692 self.assertGreater(slots_before, block_cache._max_slots)