1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
20 from pathlib import Path
21 from unittest import mock
22 from unittest.mock import patch
31 from . import arvados_testutil as tutil
32 from . import keepstub
33 from . import run_test_server
35 from .arvados_testutil import DiskCacheBase
37 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
38 class KeepTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
42 block_cache_test = None
46 super(KeepTestCase, cls).setUpClass()
47 run_test_server.authorize_with("admin")
48 cls.api_client = arvados.api('v1')
49 cls.block_cache_test = DiskCacheBase()
50 cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
51 proxy='', local_store='',
52 block_cache=cls.block_cache_test.make_block_cache(cls.disk_cache))
55 def tearDownClass(cls):
56 super(KeepTestCase, cls).setUpClass()
57 cls.block_cache_test.tearDown()
59 def test_KeepBasicRWTest(self):
60 self.assertEqual(0, self.keep_client.upload_counter.get())
61 foo_locator = self.keep_client.put('foo')
64 r'^acbd18db4cc2f85cedef654fccc4a4d8\+3',
65 'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
67 # 6 bytes because uploaded 2 copies
68 self.assertEqual(6, self.keep_client.upload_counter.get())
70 self.assertEqual(0, self.keep_client.download_counter.get())
71 self.assertTrue(tutil.binary_compare(self.keep_client.get(foo_locator),
73 'wrong content from Keep.get(md5("foo"))')
74 self.assertEqual(3, self.keep_client.download_counter.get())
76 def test_KeepBinaryRWTest(self):
77 blob_str = b'\xff\xfe\xf7\x00\x01\x02'
78 blob_locator = self.keep_client.put(blob_str)
81 r'^7fc7c53b45e53926ba52821140fef396\+6',
82 ('wrong locator from Keep.put(<binarydata>):' + blob_locator))
83 self.assertEqual(self.keep_client.get(blob_locator),
85 'wrong content from Keep.get(md5(<binarydata>))')
87 def test_KeepLongBinaryRWTest(self):
88 blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
89 for i in range(0, 23):
90 blob_data = blob_data + blob_data
91 blob_locator = self.keep_client.put(blob_data)
94 r'^84d90fc0d8175dd5dcfab04b999bc956\+67108864',
95 ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
96 self.assertEqual(self.keep_client.get(blob_locator),
98 'wrong content from Keep.get(md5(<binarydata>))')
100 @unittest.skip("unreliable test - please fix and close #8752")
101 def test_KeepSingleCopyRWTest(self):
102 blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
103 blob_locator = self.keep_client.put(blob_data, copies=1)
106 r'^c902006bc98a3eb4a3663b65ab4a6fab\+8',
107 ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
108 self.assertEqual(self.keep_client.get(blob_locator),
110 'wrong content from Keep.get(md5(<binarydata>))')
112 def test_KeepEmptyCollectionTest(self):
113 blob_locator = self.keep_client.put('', copies=1)
116 r'^d41d8cd98f00b204e9800998ecf8427e\+0',
117 ('wrong locator from Keep.put(""): ' + blob_locator))
119 def test_KeepPutDataType(self):
120 with self.assertRaises(AttributeError):
121 # Must be bytes or have an encode() method
122 self.keep_client.put({})
124 def test_KeepHeadTest(self):
125 locator = self.keep_client.put('test_head')
128 r'^b9a772c7049325feb7130fff1f8333e9\+9',
129 'wrong md5 hash from Keep.put for "test_head": ' + locator)
130 self.assertEqual(True, self.keep_client.head(locator))
131 self.assertEqual(self.keep_client.get(locator),
133 'wrong content from Keep.get for "test_head"')
135 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
136 class KeepPermissionTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
139 KEEP_SERVER = {'blob_signing': True}
142 DiskCacheBase.tearDown(self)
144 def test_KeepBasicRWTest(self):
145 run_test_server.authorize_with('active')
146 keep_client = arvados.KeepClient(block_cache=self.make_block_cache(self.disk_cache))
147 foo_locator = keep_client.put('foo')
150 r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
151 'invalid locator from Keep.put("foo"): ' + foo_locator)
152 self.assertEqual(keep_client.get(foo_locator),
154 'wrong content from Keep.get(md5("foo"))')
156 # GET with an unsigned locator => bad request
157 bar_locator = keep_client.put('bar')
158 unsigned_bar_locator = "37b51d194a7513e45b56f6524f2d51f2+3"
161 r'^37b51d194a7513e45b56f6524f2d51f2\+3\+A[a-f0-9]+@[a-f0-9]+$',
162 'invalid locator from Keep.put("bar"): ' + bar_locator)
163 self.assertRaises(arvados.errors.KeepReadError,
165 unsigned_bar_locator)
167 # GET from a different user => bad request
168 run_test_server.authorize_with('spectator')
169 keep_client2 = arvados.KeepClient(block_cache=self.make_block_cache(self.disk_cache))
170 self.assertRaises(arvados.errors.KeepReadError,
174 # Unauthenticated GET for a signed locator => bad request
175 # Unauthenticated GET for an unsigned locator => bad request
176 keep_client.api_token = ''
177 self.assertRaises(arvados.errors.KeepReadError,
180 self.assertRaises(arvados.errors.KeepReadError,
182 unsigned_bar_locator)
185 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
186 class KeepProxyTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
190 KEEP_PROXY_SERVER = {}
194 super(KeepProxyTestCase, cls).setUpClass()
195 run_test_server.authorize_with('active')
196 cls.api_client = arvados.api('v1')
199 super(KeepProxyTestCase, self).tearDown()
200 DiskCacheBase.tearDown(self)
202 def test_KeepProxyTest1(self):
203 # Will use ARVADOS_KEEP_SERVICES environment variable that
204 # is set by setUpClass().
205 keep_client = arvados.KeepClient(api_client=self.api_client,
206 local_store='', block_cache=self.make_block_cache(self.disk_cache))
207 baz_locator = keep_client.put('baz')
210 r'^73feffa4b7f6bb68e44cf984c85f6e88\+3',
211 'wrong md5 hash from Keep.put("baz"): ' + baz_locator)
212 self.assertEqual(keep_client.get(baz_locator),
214 'wrong content from Keep.get(md5("baz"))')
215 self.assertTrue(keep_client.using_proxy)
217 def test_KeepProxyTestMultipleURIs(self):
218 # Test using ARVADOS_KEEP_SERVICES env var overriding any
219 # existing proxy setting and setting multiple proxies
220 arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
221 keep_client = arvados.KeepClient(api_client=self.api_client,
223 block_cache=self.make_block_cache(self.disk_cache))
224 uris = [x['_service_root'] for x in keep_client._keep_services]
225 self.assertEqual(uris, ['http://10.0.0.1/',
226 'https://foo.example.org:1234/'])
228 def test_KeepProxyTestInvalidURI(self):
229 arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org'
230 with self.assertRaises(arvados.errors.ArgumentError):
231 keep_client = arvados.KeepClient(api_client=self.api_client,
233 block_cache=self.make_block_cache(self.disk_cache))
236 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
237 class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
241 DiskCacheBase.tearDown(self)
243 def get_service_roots(self, api_client):
244 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
245 services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
246 return [urllib.parse.urlparse(url) for url in sorted(services)]
248 def test_ssl_flag_respected_in_roots(self):
249 for ssl_flag in [False, True]:
250 services = self.get_service_roots(self.mock_keep_services(
251 service_ssl_flag=ssl_flag))
253 ('https' if ssl_flag else 'http'), services[0].scheme)
255 def test_correct_ports_with_ipv6_addresses(self):
256 service = self.get_service_roots(self.mock_keep_services(
257 service_type='proxy', service_host='100::1', service_port=10, count=1))[0]
258 self.assertEqual('100::1', service.hostname)
259 self.assertEqual(10, service.port)
261 def test_recognize_proxy_services_in_controller_response(self):
262 keep_client = arvados.KeepClient(api_client=self.mock_keep_services(
263 service_type='proxy', service_host='localhost', service_port=9, count=1),
264 block_cache=self.make_block_cache(self.disk_cache))
266 # this will fail, but it ensures we get the service
268 keep_client.put('baz2', num_retries=0)
271 self.assertTrue(keep_client.using_proxy)
273 def test_insecure_disables_tls_verify(self):
274 api_client = self.mock_keep_services(count=1)
275 force_timeout = socket.timeout("timed out")
277 api_client.insecure = True
278 with tutil.mock_keep_responses(b'foo', 200) as mock:
279 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
280 keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
282 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
285 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
288 api_client.insecure = False
289 with tutil.mock_keep_responses(b'foo', 200) as mock:
290 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
291 keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
292 # getopt()==None here means we didn't change the
293 # default. If we were using real pycurl instead of a mock,
294 # it would return the default value 1.
296 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
299 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
302 def test_refresh_signature(self):
303 blk_digest = '6f5902ac237024bdd0c176cb93063dc4+11'
304 blk_sig = 'da39a3ee5e6b4b0d3255bfef95601890afd80709@53bed294'
305 local_loc = blk_digest+'+A'+blk_sig
306 remote_loc = blk_digest+'+R'+blk_sig
307 api_client = self.mock_keep_services(count=1)
308 headers = {'X-Keep-Locator':local_loc}
309 with tutil.mock_keep_responses('', 200, **headers):
310 # Check that the translated locator gets returned
311 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
312 self.assertEqual(local_loc, keep_client.refresh_signature(remote_loc))
313 # Check that refresh_signature() uses the correct method and headers
314 keep_client._get_or_head = mock.MagicMock()
315 keep_client.refresh_signature(remote_loc)
316 args, kwargs = keep_client._get_or_head.call_args_list[0]
317 self.assertIn(remote_loc, args)
318 self.assertEqual("HEAD", kwargs['method'])
319 self.assertIn('X-Keep-Signature', kwargs['headers'])
321 # test_*_timeout verify that KeepClient instructs pycurl to use
322 # the appropriate connection and read timeouts. They don't care
323 # whether pycurl actually exhibits the expected timeout behavior
324 # -- those tests are in the KeepClientTimeout test class.
326 def test_get_timeout(self):
327 api_client = self.mock_keep_services(count=1)
328 force_timeout = socket.timeout("timed out")
329 with tutil.mock_keep_responses(force_timeout, 0) as mock:
330 keep_client = arvados.KeepClient(
331 api_client=api_client,
332 block_cache=self.make_block_cache(self.disk_cache),
335 with self.assertRaises(arvados.errors.KeepReadError):
336 keep_client.get('ffffffffffffffffffffffffffffffff')
338 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
339 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
341 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
342 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
344 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
345 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
347 def test_put_timeout(self):
348 api_client = self.mock_keep_services(count=1)
349 force_timeout = socket.timeout("timed out")
350 with tutil.mock_keep_responses(force_timeout, 0) as mock:
351 keep_client = arvados.KeepClient(
352 api_client=api_client,
353 block_cache=self.make_block_cache(self.disk_cache),
356 with self.assertRaises(arvados.errors.KeepWriteError):
357 keep_client.put(b'foo')
359 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
360 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
362 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
363 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
365 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
366 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
368 def test_head_timeout(self):
369 api_client = self.mock_keep_services(count=1)
370 force_timeout = socket.timeout("timed out")
371 with tutil.mock_keep_responses(force_timeout, 0) as mock:
372 keep_client = arvados.KeepClient(
373 api_client=api_client,
374 block_cache=self.make_block_cache(self.disk_cache),
377 with self.assertRaises(arvados.errors.KeepReadError):
378 keep_client.head('ffffffffffffffffffffffffffffffff')
380 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
381 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
383 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
386 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
389 def test_proxy_get_timeout(self):
390 api_client = self.mock_keep_services(service_type='proxy', count=1)
391 force_timeout = socket.timeout("timed out")
392 with tutil.mock_keep_responses(force_timeout, 0) as mock:
393 keep_client = arvados.KeepClient(
394 api_client=api_client,
395 block_cache=self.make_block_cache(self.disk_cache),
398 with self.assertRaises(arvados.errors.KeepReadError):
399 keep_client.get('ffffffffffffffffffffffffffffffff')
401 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
402 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
404 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
405 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
407 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
408 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
410 def test_proxy_head_timeout(self):
411 api_client = self.mock_keep_services(service_type='proxy', count=1)
412 force_timeout = socket.timeout("timed out")
413 with tutil.mock_keep_responses(force_timeout, 0) as mock:
414 keep_client = arvados.KeepClient(
415 api_client=api_client,
416 block_cache=self.make_block_cache(self.disk_cache),
419 with self.assertRaises(arvados.errors.KeepReadError):
420 keep_client.head('ffffffffffffffffffffffffffffffff')
422 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
423 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
425 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
428 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
431 def test_proxy_put_timeout(self):
432 self.disk_cache_dir = None
433 api_client = self.mock_keep_services(service_type='proxy', count=1)
434 force_timeout = socket.timeout("timed out")
435 with tutil.mock_keep_responses(force_timeout, 0) as mock:
436 keep_client = arvados.KeepClient(
437 api_client=api_client,
440 with self.assertRaises(arvados.errors.KeepWriteError):
441 keep_client.put('foo')
443 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
444 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
446 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
447 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
449 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
450 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
452 def check_no_services_error(self, verb, exc_class):
453 api_client = mock.MagicMock(name='api_client')
454 api_client.keep_services().accessible().execute.side_effect = (
455 arvados.errors.ApiError)
456 keep_client = arvados.KeepClient(
457 api_client=api_client,
458 block_cache=self.make_block_cache(self.disk_cache),
461 with self.assertRaises(exc_class) as err_check:
462 getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
463 self.assertEqual(0, len(err_check.exception.request_errors()))
465 def test_get_error_with_no_services(self):
466 self.check_no_services_error('get', arvados.errors.KeepReadError)
468 def test_head_error_with_no_services(self):
469 self.check_no_services_error('head', arvados.errors.KeepReadError)
471 def test_put_error_with_no_services(self):
472 self.check_no_services_error('put', arvados.errors.KeepWriteError)
474 def check_errors_from_last_retry(self, verb, exc_class):
475 api_client = self.mock_keep_services(count=2)
476 req_mock = tutil.mock_keep_responses(
477 "retry error reporting test", 500, 500, 500, 500, 500, 500, 502, 502)
478 with req_mock, tutil.skip_sleep, \
479 self.assertRaises(exc_class) as err_check:
480 keep_client = arvados.KeepClient(
481 api_client=api_client,
482 block_cache=self.make_block_cache(self.disk_cache),
485 getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
487 self.assertEqual([502, 502], [
488 getattr(error, 'status_code', None)
489 for error in err_check.exception.request_errors().values()])
490 self.assertRegex(str(err_check.exception), r'failed to (read|write) .* after 4 attempts')
492 def test_get_error_reflects_last_retry(self):
493 self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
495 def test_head_error_reflects_last_retry(self):
496 self.check_errors_from_last_retry('head', arvados.errors.KeepReadError)
498 def test_put_error_reflects_last_retry(self):
499 self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
501 def test_put_error_does_not_include_successful_puts(self):
502 data = 'partial failure test'
503 data_loc = tutil.str_keep_locator(data)
504 api_client = self.mock_keep_services(count=3)
505 with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
506 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
507 keep_client = arvados.KeepClient(
508 api_client=api_client,
509 block_cache=self.make_block_cache(self.disk_cache),
512 keep_client.put(data)
513 self.assertEqual(2, len(exc_check.exception.request_errors()))
515 def test_proxy_put_with_no_writable_services(self):
516 data = 'test with no writable services'
517 data_loc = tutil.str_keep_locator(data)
518 api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
519 with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
520 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
521 keep_client = arvados.KeepClient(
522 api_client=api_client,
523 block_cache=self.make_block_cache(self.disk_cache),
526 keep_client.put(data)
527 self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
528 self.assertEqual(0, len(exc_check.exception.request_errors()))
530 def test_oddball_service_get(self):
531 body = b'oddball service get'
532 api_client = self.mock_keep_services(service_type='fancynewblobstore')
533 with tutil.mock_keep_responses(body, 200):
534 keep_client = arvados.KeepClient(
535 api_client=api_client,
536 block_cache=self.make_block_cache(self.disk_cache),
539 actual = keep_client.get(tutil.str_keep_locator(body))
540 self.assertEqual(body, actual)
542 def test_oddball_service_put(self):
543 body = b'oddball service put'
544 pdh = tutil.str_keep_locator(body)
545 api_client = self.mock_keep_services(service_type='fancynewblobstore')
546 with tutil.mock_keep_responses(pdh, 200):
547 keep_client = arvados.KeepClient(
548 api_client=api_client,
549 block_cache=self.make_block_cache(self.disk_cache),
552 actual = keep_client.put(body, copies=1)
553 self.assertEqual(pdh, actual)
555 def test_oddball_service_writer_count(self):
556 body = b'oddball service writer count'
557 pdh = tutil.str_keep_locator(body)
558 api_client = self.mock_keep_services(service_type='fancynewblobstore',
560 headers = {'x-keep-replicas-stored': 3}
561 with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
562 **headers) as req_mock:
563 keep_client = arvados.KeepClient(
564 api_client=api_client,
565 block_cache=self.make_block_cache(self.disk_cache),
568 actual = keep_client.put(body, copies=2)
569 self.assertEqual(pdh, actual)
570 self.assertEqual(1, req_mock.call_count)
574 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
575 class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
579 self.api_client = self.mock_keep_services(count=2)
580 self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
582 self.locator = '1271ed5ef305aadabc605b1609e24c52'
585 DiskCacheBase.tearDown(self)
587 @mock.patch('arvados.KeepClient._KeepService.get')
588 def test_get_request_cache(self, get_mock):
589 with tutil.mock_keep_responses(self.data, 200, 200):
590 self.keep_client.get(self.locator)
591 self.keep_client.get(self.locator)
592 # Request already cached, don't require more than one request
593 get_mock.assert_called_once()
595 @mock.patch('arvados.KeepClient._KeepService.get')
596 def test_head_request_cache(self, get_mock):
597 with tutil.mock_keep_responses(self.data, 200, 200):
598 self.keep_client.head(self.locator)
599 self.keep_client.head(self.locator)
600 # Don't cache HEAD requests so that they're not confused with GET reqs
601 self.assertEqual(2, get_mock.call_count)
603 @mock.patch('arvados.KeepClient._KeepService.get')
604 def test_head_and_then_get_return_different_responses(self, get_mock):
607 get_mock.side_effect = [b'first response', b'second response']
608 with tutil.mock_keep_responses(self.data, 200, 200):
609 head_resp = self.keep_client.head(self.locator)
610 get_resp = self.keep_client.get(self.locator)
611 self.assertEqual(b'first response', head_resp)
612 # First reponse was not cached because it was from a HEAD request.
613 self.assertNotEqual(head_resp, get_resp)
617 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
618 class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
622 self.api_client = self.mock_keep_services(count=2)
623 self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
625 self.locator = '1271ed5ef305aadabc605b1609e24c52'
626 self.test_id = arvados.util.new_request_id()
627 self.assertRegex(self.test_id, r'^req-[a-z0-9]{20}$')
628 # If we don't set request_id to None explicitly here, it will
629 # return <MagicMock name='api_client_mock.request_id'
631 self.api_client.request_id = None
634 DiskCacheBase.tearDown(self)
636 def test_default_to_api_client_request_id(self):
637 self.api_client.request_id = self.test_id
638 with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
639 self.keep_client.put(self.data)
640 self.assertEqual(2, len(mock.responses))
641 for resp in mock.responses:
642 self.assertProvidedRequestId(resp)
644 with tutil.mock_keep_responses(self.data, 200) as mock:
645 self.keep_client.get(self.locator)
646 self.assertProvidedRequestId(mock.responses[0])
648 with tutil.mock_keep_responses(b'', 200) as mock:
649 self.keep_client.head(self.locator)
650 self.assertProvidedRequestId(mock.responses[0])
652 def test_explicit_request_id(self):
653 with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
654 self.keep_client.put(self.data, request_id=self.test_id)
655 self.assertEqual(2, len(mock.responses))
656 for resp in mock.responses:
657 self.assertProvidedRequestId(resp)
659 with tutil.mock_keep_responses(self.data, 200) as mock:
660 self.keep_client.get(self.locator, request_id=self.test_id)
661 self.assertProvidedRequestId(mock.responses[0])
663 with tutil.mock_keep_responses(b'', 200) as mock:
664 self.keep_client.head(self.locator, request_id=self.test_id)
665 self.assertProvidedRequestId(mock.responses[0])
667 def test_automatic_request_id(self):
668 with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
669 self.keep_client.put(self.data)
670 self.assertEqual(2, len(mock.responses))
671 for resp in mock.responses:
672 self.assertAutomaticRequestId(resp)
674 with tutil.mock_keep_responses(self.data, 200) as mock:
675 self.keep_client.get(self.locator)
676 self.assertAutomaticRequestId(mock.responses[0])
678 with tutil.mock_keep_responses(b'', 200) as mock:
679 self.keep_client.head(self.locator)
680 self.assertAutomaticRequestId(mock.responses[0])
682 def test_request_id_in_exception(self):
683 with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
684 with self.assertRaisesRegex(arvados.errors.KeepReadError, self.test_id):
685 self.keep_client.head(self.locator, request_id=self.test_id)
687 with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
688 with self.assertRaisesRegex(arvados.errors.KeepReadError, r'req-[a-z0-9]{20}'):
689 self.keep_client.get(self.locator)
691 with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
692 with self.assertRaisesRegex(arvados.errors.KeepWriteError, self.test_id):
693 self.keep_client.put(self.data, request_id=self.test_id)
695 with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
696 with self.assertRaisesRegex(arvados.errors.KeepWriteError, r'req-[a-z0-9]{20}'):
697 self.keep_client.put(self.data)
699 def assertAutomaticRequestId(self, resp):
700 hdr = [x for x in resp.getopt(pycurl.HTTPHEADER)
701 if x.startswith('X-Request-Id: ')][0]
702 self.assertNotEqual(hdr, 'X-Request-Id: '+self.test_id)
703 self.assertRegex(hdr, r'^X-Request-Id: req-[a-z0-9]{20}$')
705 def assertProvidedRequestId(self, resp):
706 self.assertIn('X-Request-Id: '+self.test_id,
707 resp.getopt(pycurl.HTTPHEADER))
711 #@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
712 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
716 # expected_order[i] is the probe order for
717 # hash=md5(sprintf("%064x",i)) where there are 16 services
718 # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
719 # the first probe for the block consisting of 64 "0"
720 # characters is the service whose uuid is
721 # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
723 self.expected_order = [
724 list('3eab2d5fc9681074'),
725 list('097dba52e648f1c3'),
726 list('c5b4e023f8a7d691'),
727 list('9d81c02e76a3bf54'),
730 "{:064x}".format(x).encode()
731 for x in range(len(self.expected_order))]
733 hashlib.md5(self.blocks[x]).hexdigest()
734 for x in range(len(self.expected_order))]
735 self.api_client = self.mock_keep_services(count=self.services)
736 self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
739 DiskCacheBase.tearDown(self)
741 def test_weighted_service_roots_against_reference_set(self):
742 # Confirm weighted_service_roots() returns the correct order
743 for i, hash in enumerate(self.hashes):
744 roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash))
746 re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
748 self.assertEqual(self.expected_order[i], got_order)
750 def test_get_probe_order_against_reference_set(self):
751 self._test_probe_order_against_reference_set(
752 lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
754 def test_head_probe_order_against_reference_set(self):
755 self._test_probe_order_against_reference_set(
756 lambda i: self.keep_client.head(self.hashes[i], num_retries=1))
758 def test_put_probe_order_against_reference_set(self):
759 # copies=1 prevents the test from being sensitive to races
760 # between writer threads.
761 self._test_probe_order_against_reference_set(
762 lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1))
764 def _test_probe_order_against_reference_set(self, op):
765 for i in range(len(self.blocks)):
766 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \
767 self.assertRaises(arvados.errors.KeepRequestError):
770 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
771 for resp in mock.responses]
772 self.assertEqual(self.expected_order[i]*2, got_order)
774 def test_put_probe_order_multiple_copies(self):
775 for copies in range(2, 4):
776 for i in range(len(self.blocks)):
777 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \
778 self.assertRaises(arvados.errors.KeepWriteError):
779 self.keep_client.put(self.blocks[i], num_retries=2, copies=copies)
781 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
782 for resp in mock.responses]
783 # With T threads racing to make requests, the position
784 # of a given server in the sequence of HTTP requests
785 # (got_order) cannot be more than T-1 positions
786 # earlier than that server's position in the reference
787 # probe sequence (expected_order).
789 # Loop invariant: we have accounted for +pos+ expected
790 # probes, either by seeing them in +got_order+ or by
791 # putting them in +pending+ in the hope of seeing them
792 # later. As long as +len(pending)<T+, we haven't
793 # started a request too early.
795 for pos, expected in enumerate(self.expected_order[i]*3):
796 got = got_order[pos-len(pending)]
797 while got in pending:
798 del pending[pending.index(got)]
799 got = got_order[pos-len(pending)]
801 pending.append(expected)
803 len(pending), copies,
804 "pending={}, with copies={}, got {}, expected {}".format(
805 pending, copies, repr(got_order), repr(self.expected_order[i]*3)))
807 def test_probe_waste_adding_one_server(self):
809 hashlib.md5("{:064x}".format(x).encode()).hexdigest() for x in range(100)]
810 initial_services = 12
811 self.api_client = self.mock_keep_services(count=initial_services)
812 self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
814 self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
815 for added_services in range(1, 12):
816 api_client = self.mock_keep_services(count=initial_services+added_services)
817 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
819 for hash_index in range(len(hashes)):
820 probe_after = keep_client.weighted_service_roots(
821 arvados.KeepLocator(hashes[hash_index]))
822 penalty = probe_after.index(probes_before[hash_index][0])
823 self.assertLessEqual(penalty, added_services)
824 total_penalty += penalty
825 # Average penalty per block should not exceed
826 # N(added)/N(orig) by more than 20%, and should get closer
827 # to the ideal as we add data points.
830 len(hashes) / initial_services)
833 (120 - added_services)/100)
835 expect_penalty * 8/10)
837 min_penalty <= total_penalty <= max_penalty,
838 "With {}+{} services, {} blocks, penalty {} but expected {}..{}".format(
846 def check_64_zeros_error_order(self, verb, exc_class):
849 data = tutil.str_keep_locator(data)
850 # Arbitrary port number:
851 aport = random.randint(1024,65535)
852 api_client = self.mock_keep_services(service_port=aport, count=self.services)
853 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
854 with mock.patch('pycurl.Curl') as curl_mock, \
855 self.assertRaises(exc_class) as err_check:
856 curl_mock.return_value = tutil.FakeCurl.make(code=500, body=b'')
857 getattr(keep_client, verb)(data)
858 urls = [urllib.parse.urlparse(url)
859 for url in err_check.exception.request_errors()]
860 self.assertEqual([('keep0x' + c, aport) for c in '3eab2d5fc9681074'],
861 [(url.hostname, url.port) for url in urls])
863 def test_get_error_shows_probe_order(self):
864 self.check_64_zeros_error_order('get', arvados.errors.KeepReadError)
866 def test_put_error_shows_probe_order(self):
867 self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
870 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
871 class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase, DiskCacheBase):
874 # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
875 # 1s worth of data and then trigger bandwidth errors before running
878 BANDWIDTH_LOW_LIM = 1024
882 DiskCacheBase.tearDown(self)
884 class assertTakesBetween(unittest.TestCase):
885 def __init__(self, tmin, tmax):
890 self.t0 = time.time()
892 def __exit__(self, *args, **kwargs):
893 # Round times to milliseconds, like CURL. Otherwise, we
894 # fail when CURL reaches a 1s timeout at 0.9998s.
895 delta = round(time.time() - self.t0, 3)
896 self.assertGreaterEqual(delta, self.tmin)
897 self.assertLessEqual(delta, self.tmax)
899 class assertTakesGreater(unittest.TestCase):
900 def __init__(self, tmin):
904 self.t0 = time.time()
906 def __exit__(self, *args, **kwargs):
907 delta = round(time.time() - self.t0, 3)
908 self.assertGreaterEqual(delta, self.tmin)
910 def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
911 return arvados.KeepClient(
912 api_client=self.api_client,
913 timeout=timeouts, block_cache=self.make_block_cache(self.disk_cache))
915 def test_timeout_slow_connect(self):
916 # Can't simulate TCP delays with our own socket. Leave our
917 # stub server running uselessly, and try to connect to an
918 # unroutable IP address instead.
919 self.api_client = self.mock_keep_services(
921 service_host='240.0.0.0',
923 with self.assertTakesBetween(0.1, 0.5):
924 with self.assertRaises(arvados.errors.KeepWriteError):
925 self.keepClient().put(self.DATA, copies=1, num_retries=0)
927 def test_low_bandwidth_no_delays_success(self):
928 self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
929 kc = self.keepClient()
930 loc = kc.put(self.DATA, copies=1, num_retries=0)
931 self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
933 def test_too_low_bandwidth_no_delays_failure(self):
934 # Check that lessening bandwidth corresponds to failing
935 kc = self.keepClient()
936 loc = kc.put(self.DATA, copies=1, num_retries=0)
937 self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
938 with self.assertTakesGreater(self.TIMEOUT_TIME):
939 with self.assertRaises(arvados.errors.KeepReadError):
940 kc.get(loc, num_retries=0)
941 with self.assertTakesGreater(self.TIMEOUT_TIME):
942 with self.assertRaises(arvados.errors.KeepWriteError):
943 kc.put(self.DATA, copies=1, num_retries=0)
945 def test_low_bandwidth_with_server_response_delay_failure(self):
946 kc = self.keepClient()
947 loc = kc.put(self.DATA, copies=1, num_retries=0)
948 self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
949 # Note the actual delay must be 1s longer than the low speed
950 # limit interval in order for curl to detect it reliably.
951 self.server.setdelays(response=self.TIMEOUT_TIME+1)
952 with self.assertTakesGreater(self.TIMEOUT_TIME):
953 with self.assertRaises(arvados.errors.KeepReadError):
954 kc.get(loc, num_retries=0)
955 with self.assertTakesGreater(self.TIMEOUT_TIME):
956 with self.assertRaises(arvados.errors.KeepWriteError):
957 kc.put(self.DATA, copies=1, num_retries=0)
958 with self.assertTakesGreater(self.TIMEOUT_TIME):
959 kc.head(loc, num_retries=0)
961 def test_low_bandwidth_with_server_mid_delay_failure(self):
962 kc = self.keepClient()
963 loc = kc.put(self.DATA, copies=1, num_retries=0)
964 self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
965 # Note the actual delay must be 1s longer than the low speed
966 # limit interval in order for curl to detect it reliably.
967 self.server.setdelays(mid_write=self.TIMEOUT_TIME+1, mid_read=self.TIMEOUT_TIME+1)
968 with self.assertTakesGreater(self.TIMEOUT_TIME):
969 with self.assertRaises(arvados.errors.KeepReadError) as e:
970 kc.get(loc, num_retries=0)
971 with self.assertTakesGreater(self.TIMEOUT_TIME):
972 with self.assertRaises(arvados.errors.KeepWriteError):
973 kc.put(self.DATA, copies=1, num_retries=0)
975 def test_timeout_slow_request(self):
976 loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
977 self.server.setdelays(request=.2)
978 self._test_connect_timeout_under_200ms(loc)
979 self.server.setdelays(request=2)
980 self._test_response_timeout_under_2s(loc)
982 def test_timeout_slow_response(self):
983 loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
984 self.server.setdelays(response=.2)
985 self._test_connect_timeout_under_200ms(loc)
986 self.server.setdelays(response=2)
987 self._test_response_timeout_under_2s(loc)
989 def test_timeout_slow_response_body(self):
990 loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
991 self.server.setdelays(response_body=.2)
992 self._test_connect_timeout_under_200ms(loc)
993 self.server.setdelays(response_body=2)
994 self._test_response_timeout_under_2s(loc)
996 def _test_connect_timeout_under_200ms(self, loc):
997 # Allow 100ms to connect, then 1s for response. Everything
998 # should work, and everything should take at least 200ms to
1000 kc = self.keepClient(timeouts=(.1, 1))
1001 with self.assertTakesBetween(.2, .3):
1002 kc.put(self.DATA, copies=1, num_retries=0)
1003 with self.assertTakesBetween(.2, .3):
1004 self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
1006 def _test_response_timeout_under_2s(self, loc):
1007 # Allow 10s to connect, then 1s for response. Nothing should
1008 # work, and everything should take at least 1s to return.
1009 kc = self.keepClient(timeouts=(10, 1))
1010 with self.assertTakesBetween(1, 9):
1011 with self.assertRaises(arvados.errors.KeepReadError):
1012 kc.get(loc, num_retries=0)
1013 with self.assertTakesBetween(1, 9):
1014 with self.assertRaises(arvados.errors.KeepWriteError):
1015 kc.put(self.DATA, copies=1, num_retries=0)
1018 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1019 class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
1023 DiskCacheBase.tearDown(self)
1025 def mock_disks_and_gateways(self, disks=3, gateways=1):
1027 'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
1028 'owner_uuid': 'zzzzz-tpzed-000000000000000',
1029 'service_host': 'gatewayhost{}'.format(i),
1030 'service_port': 12345,
1031 'service_ssl_flag': True,
1032 'service_type': 'gateway:test',
1033 } for i in range(gateways)]
1034 self.gateway_roots = [
1035 "https://{service_host}:{service_port}/".format(**gw)
1036 for gw in self.gateways]
1037 self.api_client = self.mock_keep_services(
1038 count=disks, additional_services=self.gateways)
1039 self.keepClient = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
1041 @mock.patch('pycurl.Curl')
1042 def test_get_with_gateway_hint_first(self, MockCurl):
1043 MockCurl.return_value = tutil.FakeCurl.make(
1044 code=200, body='foo', headers={'Content-Length': 3})
1045 self.mock_disks_and_gateways()
1046 locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@' + self.gateways[0]['uuid']
1047 self.assertEqual(b'foo', self.keepClient.get(locator))
1048 self.assertEqual(self.gateway_roots[0]+locator,
1049 MockCurl.return_value.getopt(pycurl.URL).decode())
1050 self.assertEqual(True, self.keepClient.head(locator))
1052 @mock.patch('pycurl.Curl')
1053 def test_get_with_gateway_hints_in_order(self, MockCurl):
1057 tutil.FakeCurl.make(code=404, body='')
1058 for _ in range(gateways+disks)
1060 MockCurl.side_effect = tutil.queue_with(mocks)
1061 self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1062 locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1063 ['K@'+gw['uuid'] for gw in self.gateways])
1064 with self.assertRaises(arvados.errors.NotFoundError):
1065 self.keepClient.get(locator)
1066 # Gateways are tried first, in the order given.
1067 for i, root in enumerate(self.gateway_roots):
1068 self.assertEqual(root+locator,
1069 mocks[i].getopt(pycurl.URL).decode())
1070 # Disk services are tried next.
1071 for i in range(gateways, gateways+disks):
1073 mocks[i].getopt(pycurl.URL).decode(),
1076 @mock.patch('pycurl.Curl')
1077 def test_head_with_gateway_hints_in_order(self, MockCurl):
1081 tutil.FakeCurl.make(code=404, body=b'')
1082 for _ in range(gateways+disks)
1084 MockCurl.side_effect = tutil.queue_with(mocks)
1085 self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1086 locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1087 ['K@'+gw['uuid'] for gw in self.gateways])
1088 with self.assertRaises(arvados.errors.NotFoundError):
1089 self.keepClient.head(locator)
1090 # Gateways are tried first, in the order given.
1091 for i, root in enumerate(self.gateway_roots):
1092 self.assertEqual(root+locator,
1093 mocks[i].getopt(pycurl.URL).decode())
1094 # Disk services are tried next.
1095 for i in range(gateways, gateways+disks):
1097 mocks[i].getopt(pycurl.URL).decode(),
1100 @mock.patch('pycurl.Curl')
1101 def test_get_with_remote_proxy_hint(self, MockCurl):
1102 MockCurl.return_value = tutil.FakeCurl.make(
1103 code=200, body=b'foo', headers={'Content-Length': 3})
1104 self.mock_disks_and_gateways()
1105 locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1106 self.assertEqual(b'foo', self.keepClient.get(locator))
1107 self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1108 MockCurl.return_value.getopt(pycurl.URL).decode())
1110 @mock.patch('pycurl.Curl')
1111 def test_head_with_remote_proxy_hint(self, MockCurl):
1112 MockCurl.return_value = tutil.FakeCurl.make(
1113 code=200, body=b'foo', headers={'Content-Length': 3})
1114 self.mock_disks_and_gateways()
1115 locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1116 self.assertEqual(True, self.keepClient.head(locator))
1117 self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1118 MockCurl.return_value.getopt(pycurl.URL).decode())
1121 class KeepClientRetryTestMixin(object):
1124 # Testing with a local Keep store won't exercise the retry behavior.
1125 # Instead, our strategy is:
1126 # * Create a client with one proxy specified (pointed at a black
1127 # hole), so there's no need to instantiate an API client, and
1128 # all HTTP requests come from one place.
1129 # * Mock httplib's request method to provide simulated responses.
1130 # This lets us test the retry logic extensively without relying on any
1131 # supporting servers, and prevents side effects in case something hiccups.
1132 # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
1135 # Test classes must define TEST_PATCHER to a method that mocks
1136 # out appropriate methods in the client.
1138 PROXY_ADDR = 'http://[%s]:65535/' % (tutil.TEST_HOST,)
1139 TEST_DATA = b'testdata'
1140 TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
1143 self.client_kwargs = {'proxy': self.PROXY_ADDR, 'local_store': ''}
1145 def new_client(self, **caller_kwargs):
1146 kwargs = self.client_kwargs.copy()
1147 kwargs.update(caller_kwargs)
1148 kwargs['block_cache'] = self.make_block_cache(self.disk_cache)
1149 return arvados.KeepClient(**kwargs)
1151 def run_method(self, *args, **kwargs):
1152 raise NotImplementedError("test subclasses must define run_method")
1154 def check_success(self, expected=None, *args, **kwargs):
1155 if expected is None:
1156 expected = self.DEFAULT_EXPECT
1157 self.assertEqual(expected, self.run_method(*args, **kwargs))
1159 def check_exception(self, error_class=None, *args, **kwargs):
1160 if error_class is None:
1161 error_class = self.DEFAULT_EXCEPTION
1162 with self.assertRaises(error_class) as err:
1163 self.run_method(*args, **kwargs)
1166 def test_immediate_success(self):
1167 with self.TEST_PATCHER(self.DEFAULT_EXPECT, 200):
1168 self.check_success()
1170 def test_retry_then_success(self):
1171 with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1172 self.check_success(num_retries=3)
1174 def test_exception_then_success(self):
1175 with self.TEST_PATCHER(self.DEFAULT_EXPECT, Exception('mock err'), 200):
1176 self.check_success(num_retries=3)
1178 def test_no_retry_after_permanent_error(self):
1179 with self.TEST_PATCHER(self.DEFAULT_EXPECT, 403, 200):
1180 self.check_exception(num_retries=3)
1182 def test_error_after_retries_exhausted(self):
1183 with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 500, 200):
1184 err = self.check_exception(num_retries=1)
1185 self.assertRegex(str(err.exception), r'failed to .* after 2 attempts')
1187 def test_num_retries_instance_fallback(self):
1188 self.client_kwargs['num_retries'] = 3
1189 with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1190 self.check_success()
1194 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1195 class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1196 DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
1197 DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1198 HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1199 TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1202 DiskCacheBase.tearDown(self)
1204 def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1206 return self.new_client().get(locator, *args, **kwargs)
1208 def test_specific_exception_when_not_found(self):
1209 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1210 self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1212 def test_general_exception_with_mixed_errors(self):
1213 # get should raise a NotFoundError if no server returns the block,
1214 # and a high threshold of servers report that it's not found.
1215 # This test rigs up 50/50 disagreement between two servers, and
1216 # checks that it does not become a NotFoundError.
1217 client = self.new_client(num_retries=0)
1218 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1219 with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1220 client.get(self.HINTED_LOCATOR)
1221 self.assertNotIsInstance(
1222 exc_check.exception, arvados.errors.NotFoundError,
1223 "mixed errors raised NotFoundError")
1225 def test_hint_server_can_succeed_without_retries(self):
1226 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1227 self.check_success(locator=self.HINTED_LOCATOR)
1229 def test_try_next_server_after_timeout(self):
1230 with tutil.mock_keep_responses(
1231 (socket.timeout("timed out"), 200),
1232 (self.DEFAULT_EXPECT, 200)):
1233 self.check_success(locator=self.HINTED_LOCATOR)
1235 def test_retry_data_with_wrong_checksum(self):
1236 with tutil.mock_keep_responses(
1238 (self.DEFAULT_EXPECT, 200)):
1239 self.check_success(locator=self.HINTED_LOCATOR)
1243 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1244 class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1245 DEFAULT_EXPECT = True
1246 DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1247 HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1248 TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1251 DiskCacheBase.tearDown(self)
1253 def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1255 return self.new_client().head(locator, *args, **kwargs)
1257 def test_specific_exception_when_not_found(self):
1258 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1259 self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1261 def test_general_exception_with_mixed_errors(self):
1262 # head should raise a NotFoundError if no server returns the block,
1263 # and a high threshold of servers report that it's not found.
1264 # This test rigs up 50/50 disagreement between two servers, and
1265 # checks that it does not become a NotFoundError.
1266 client = self.new_client(num_retries=0)
1267 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1268 with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1269 client.head(self.HINTED_LOCATOR)
1270 self.assertNotIsInstance(
1271 exc_check.exception, arvados.errors.NotFoundError,
1272 "mixed errors raised NotFoundError")
1274 def test_hint_server_can_succeed_without_retries(self):
1275 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1276 self.check_success(locator=self.HINTED_LOCATOR)
1278 def test_try_next_server_after_timeout(self):
1279 with tutil.mock_keep_responses(
1280 (socket.timeout("timed out"), 200),
1281 (self.DEFAULT_EXPECT, 200)):
1282 self.check_success(locator=self.HINTED_LOCATOR)
1286 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1287 class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1288 DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
1289 DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
1290 TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1293 DiskCacheBase.tearDown(self)
1295 def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
1296 copies=1, *args, **kwargs):
1297 return self.new_client().put(data, copies, *args, **kwargs)
1299 def test_do_not_send_multiple_copies_to_same_server(self):
1300 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200):
1301 self.check_exception(copies=2, num_retries=3)
1304 class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
1305 class FakeKeepService(object):
1306 def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
1308 self.will_succeed = will_succeed
1309 self.will_raise = will_raise
1311 self._result['headers'] = {}
1312 self._result['headers']['x-keep-replicas-stored'] = str(replicas)
1313 self._result['headers']['x-keep-storage-classes-confirmed'] = 'default={}'.format(replicas)
1314 self._result['body'] = 'foobar'
1316 def put(self, data_hash, data, timeout, headers):
1317 time.sleep(self.delay)
1318 if self.will_raise is not None:
1319 raise self.will_raise
1320 return self.will_succeed
1322 def last_result(self):
1323 if self.will_succeed:
1326 return {"status_code": 500, "body": "didn't succeed"}
1334 self.pool = arvados.KeepClient._KeepWriterThreadPool(
1336 data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1337 max_service_replicas = self.copies,
1338 copies = self.copies
1341 def test_only_write_enough_on_success(self):
1343 ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1344 self.pool.add_task(ks, None)
1346 self.assertEqual(self.pool.done(), (self.copies, []))
1348 def test_only_write_enough_on_partial_success(self):
1350 ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
1351 self.pool.add_task(ks, None)
1352 ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1353 self.pool.add_task(ks, None)
1355 self.assertEqual(self.pool.done(), (self.copies, []))
1357 def test_only_write_enough_when_some_crash(self):
1359 ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1360 self.pool.add_task(ks, None)
1361 ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1362 self.pool.add_task(ks, None)
1364 self.assertEqual(self.pool.done(), (self.copies, []))
1366 def test_fail_when_too_many_crash(self):
1367 for i in range(self.copies+1):
1368 ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1369 self.pool.add_task(ks, None)
1370 for i in range(self.copies-1):
1371 ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1372 self.pool.add_task(ks, None)
1374 self.assertEqual(self.pool.done(), (self.copies-1, []))
1378 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1379 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
1382 # Test put()s that need two distinct servers to succeed, possibly
1383 # requiring multiple passes through the retry loop.
1386 self.api_client = self.mock_keep_services(count=2)
1387 self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
1390 DiskCacheBase.tearDown(self)
1392 def test_success_after_exception(self):
1393 with tutil.mock_keep_responses(
1394 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1395 Exception('mock err'), 200, 200) as req_mock:
1396 self.keep_client.put('foo', num_retries=1, copies=2)
1397 self.assertEqual(3, req_mock.call_count)
1399 def test_success_after_retryable_error(self):
1400 with tutil.mock_keep_responses(
1401 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1402 500, 200, 200) as req_mock:
1403 self.keep_client.put('foo', num_retries=1, copies=2)
1404 self.assertEqual(3, req_mock.call_count)
1406 def test_fail_after_final_error(self):
1407 # First retry loop gets a 200 (can't achieve replication by
1408 # storing again on that server) and a 400 (can't retry that
1409 # server at all), so we shouldn't try a third request.
1410 with tutil.mock_keep_responses(
1411 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1412 200, 400, 200) as req_mock:
1413 with self.assertRaises(arvados.errors.KeepWriteError):
1414 self.keep_client.put('foo', num_retries=1, copies=2)
1415 self.assertEqual(2, req_mock.call_count)
1418 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1419 class KeepClientAPIErrorTest(unittest.TestCase, DiskCacheBase):
1423 DiskCacheBase.tearDown(self)
1425 def test_api_fail(self):
1426 class ApiMock(object):
1427 def __getattr__(self, r):
1428 if r == "api_token":
1430 elif r == "insecure":
1435 raise arvados.errors.KeepReadError()
1436 keep_client = arvados.KeepClient(api_client=ApiMock(),
1437 proxy='', local_store='',
1438 block_cache=self.make_block_cache(self.disk_cache))
1440 # The bug this is testing for is that if an API (not
1441 # keepstore) exception is thrown as part of a get(), the next
1442 # attempt to get that same block will result in a deadlock.
1443 # This is why there are two get()s in a row. Unfortunately,
1444 # the failure mode for this test is that the test suite
1445 # deadlocks, there isn't a good way to avoid that without
1446 # adding a special case that has no use except for this test.
1448 with self.assertRaises(arvados.errors.KeepReadError):
1449 keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1450 with self.assertRaises(arvados.errors.KeepReadError):
1451 keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1454 class KeepDiskCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
1456 self.api_client = self.mock_keep_services(count=2)
1457 self.data = b'xyzzy'
1458 self.locator = '1271ed5ef305aadabc605b1609e24c52'
1459 self.disk_cache_dir = tempfile.mkdtemp()
1462 shutil.rmtree(self.disk_cache_dir)
1464 @mock.patch('arvados._internal.basedirs.BaseDirectories.storage_path')
1465 def test_default_disk_cache_dir(self, storage_path):
1466 expected = Path(self.disk_cache_dir)
1467 storage_path.return_value = expected
1468 cache = arvados.keep.KeepBlockCache(disk_cache=True)
1469 storage_path.assert_called_with('keep')
1470 self.assertEqual(cache._disk_cache_dir, str(expected))
1472 @mock.patch('arvados.KeepClient._KeepService.get')
1473 def test_disk_cache_read(self, get_mock):
1474 # confirm it finds an existing cache block when the cache is
1477 os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1478 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1481 # block cache should have found the existing block
1482 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1483 disk_cache_dir=self.disk_cache_dir)
1484 keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1486 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1488 get_mock.assert_not_called()
1490 @mock.patch('arvados.KeepClient._KeepService.get')
1491 def test_disk_cache_share(self, get_mock):
1492 # confirm it finds a cache block written after the disk cache
1495 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1496 disk_cache_dir=self.disk_cache_dir)
1497 keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1499 os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1500 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1503 # when we try to get the block, it'll check the disk and find it.
1504 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1506 get_mock.assert_not_called()
1508 def test_disk_cache_write(self):
1509 # confirm the cache block was created
1511 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1512 disk_cache_dir=self.disk_cache_dir)
1513 keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1515 with tutil.mock_keep_responses(self.data, 200) as mock:
1516 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1518 self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1520 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1521 self.assertTrue(tutil.binary_compare(f.read(), self.data))
1523 def test_disk_cache_clean(self):
1524 # confirm that a tmp file in the cache is cleaned up
1526 os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1527 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock"), "wb") as f:
1530 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC"), "wb") as f:
1533 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC"), "wb") as f:
1536 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1537 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1538 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1540 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1541 disk_cache_dir=self.disk_cache_dir)
1543 # The tmp still hasn't been deleted because it was created in the last 60 seconds
1544 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1545 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1546 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1548 # Set the mtime to 61s in the past
1549 os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock"), times=(time.time()-61, time.time()-61))
1550 os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC"), times=(time.time()-61, time.time()-61))
1551 os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC"), times=(time.time()-61, time.time()-61))
1553 block_cache2 = arvados.keep.KeepBlockCache(disk_cache=True,
1554 disk_cache_dir=self.disk_cache_dir)
1556 # Tmp should be gone but the other ones are safe.
1557 self.assertFalse(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1558 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1559 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1561 @mock.patch('arvados.KeepClient._KeepService.get')
1562 def test_disk_cache_cap(self, get_mock):
1563 # confirm that the cache is kept to the desired limit
1565 os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1566 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1569 os.makedirs(os.path.join(self.disk_cache_dir, "acb"))
1570 with open(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock"), "wb") as f:
1573 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1574 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1576 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1577 disk_cache_dir=self.disk_cache_dir,
1580 self.assertFalse(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1581 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1583 @mock.patch('arvados.KeepClient._KeepService.get')
1584 def test_disk_cache_share(self, get_mock):
1585 # confirm that a second cache doesn't delete files that belong to the first cache.
1587 os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1588 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1591 os.makedirs(os.path.join(self.disk_cache_dir, "acb"))
1592 with open(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock"), "wb") as f:
1595 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1596 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1598 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1599 disk_cache_dir=self.disk_cache_dir,
1602 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1603 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1605 block_cache2 = arvados.keep.KeepBlockCache(disk_cache=True,
1606 disk_cache_dir=self.disk_cache_dir,
1609 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1610 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1612 def test_disk_cache_error(self):
1613 os.chmod(self.disk_cache_dir, stat.S_IRUSR)
1615 # Fail during cache initialization.
1616 with self.assertRaises(OSError):
1617 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1618 disk_cache_dir=self.disk_cache_dir)
1620 def test_disk_cache_write_error(self):
1621 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1622 disk_cache_dir=self.disk_cache_dir)
1624 keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1626 # Make the cache dir read-only
1627 os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1628 os.chmod(os.path.join(self.disk_cache_dir, self.locator[0:3]), stat.S_IRUSR)
1631 with self.assertRaises(arvados.errors.KeepCacheError):
1632 with tutil.mock_keep_responses(self.data, 200) as mock:
1633 keep_client.get(self.locator)
1635 def test_disk_cache_retry_write_error(self):
1636 cache_max_before = 512 * 1024 * 1024
1637 block_cache = arvados.keep.KeepBlockCache(
1638 cache_max=cache_max_before,
1640 disk_cache_dir=self.disk_cache_dir,
1642 keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1645 realmmap = mmap.mmap
1646 def sideeffect_mmap(*args, **kwargs):
1650 raise OSError(errno.ENOSPC, "no space")
1652 return realmmap(*args, **kwargs)
1654 with patch('mmap.mmap', autospec=True, side_effect=sideeffect_mmap) as mockmmap:
1655 with tutil.mock_keep_responses(self.data, 200) as mock:
1656 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1658 self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1660 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1661 self.assertTrue(tutil.binary_compare(f.read(), self.data))
1663 # shrank the cache in response to ENOSPC
1664 self.assertGreater(cache_max_before, block_cache.cache_max)
1666 def test_disk_cache_retry_write_error2(self):
1667 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1668 disk_cache_dir=self.disk_cache_dir)
1670 keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1673 realmmap = mmap.mmap
1674 def sideeffect_mmap(*args, **kwargs):
1678 raise OSError(errno.ENOMEM, "no memory")
1680 return realmmap(*args, **kwargs)
1682 with patch('mmap.mmap', autospec=True, side_effect=sideeffect_mmap) as mockmmap:
1683 slots_before = block_cache._max_slots
1685 with tutil.mock_keep_responses(self.data, 200) as mock:
1686 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1688 self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1690 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1691 self.assertTrue(tutil.binary_compare(f.read(), self.data))
1693 # shrank the cache in response to ENOMEM
1694 self.assertGreater(slots_before, block_cache._max_slots)