1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
20 from unittest import mock
21 from unittest.mock import patch
29 from . import arvados_testutil as tutil
30 from . import keepstub
31 from . import run_test_server
33 from .arvados_testutil import DiskCacheBase
35 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
36 class KeepTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
40 block_cache_test = None
44 super(KeepTestCase, cls).setUpClass()
45 run_test_server.authorize_with("admin")
46 cls.api_client = arvados.api('v1')
47 cls.block_cache_test = DiskCacheBase()
48 cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
49 proxy='', local_store='',
50 block_cache=cls.block_cache_test.make_block_cache(cls.disk_cache))
53 def tearDownClass(cls):
54 super(KeepTestCase, cls).setUpClass()
55 cls.block_cache_test.tearDown()
57 def test_KeepBasicRWTest(self):
58 self.assertEqual(0, self.keep_client.upload_counter.get())
59 foo_locator = self.keep_client.put('foo')
62 '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
63 'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
65 # 6 bytes because uploaded 2 copies
66 self.assertEqual(6, self.keep_client.upload_counter.get())
68 self.assertEqual(0, self.keep_client.download_counter.get())
69 self.assertTrue(tutil.binary_compare(self.keep_client.get(foo_locator),
71 'wrong content from Keep.get(md5("foo"))')
72 self.assertEqual(3, self.keep_client.download_counter.get())
74 def test_KeepBinaryRWTest(self):
75 blob_str = b'\xff\xfe\xf7\x00\x01\x02'
76 blob_locator = self.keep_client.put(blob_str)
79 '^7fc7c53b45e53926ba52821140fef396\+6',
80 ('wrong locator from Keep.put(<binarydata>):' + blob_locator))
81 self.assertEqual(self.keep_client.get(blob_locator),
83 'wrong content from Keep.get(md5(<binarydata>))')
85 def test_KeepLongBinaryRWTest(self):
86 blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
87 for i in range(0, 23):
88 blob_data = blob_data + blob_data
89 blob_locator = self.keep_client.put(blob_data)
92 '^84d90fc0d8175dd5dcfab04b999bc956\+67108864',
93 ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
94 self.assertEqual(self.keep_client.get(blob_locator),
96 'wrong content from Keep.get(md5(<binarydata>))')
98 @unittest.skip("unreliable test - please fix and close #8752")
99 def test_KeepSingleCopyRWTest(self):
100 blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
101 blob_locator = self.keep_client.put(blob_data, copies=1)
104 '^c902006bc98a3eb4a3663b65ab4a6fab\+8',
105 ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
106 self.assertEqual(self.keep_client.get(blob_locator),
108 'wrong content from Keep.get(md5(<binarydata>))')
110 def test_KeepEmptyCollectionTest(self):
111 blob_locator = self.keep_client.put('', copies=1)
114 '^d41d8cd98f00b204e9800998ecf8427e\+0',
115 ('wrong locator from Keep.put(""): ' + blob_locator))
117 def test_unicode_must_be_ascii(self):
118 # If unicode type, must only consist of valid ASCII
119 foo_locator = self.keep_client.put(u'foo')
122 '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
123 'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
125 if sys.version_info < (3, 0):
126 with self.assertRaises(UnicodeEncodeError):
127 # Error if it is not ASCII
128 self.keep_client.put(u'\xe2')
130 with self.assertRaises(AttributeError):
131 # Must be bytes or have an encode() method
132 self.keep_client.put({})
134 def test_KeepHeadTest(self):
135 locator = self.keep_client.put('test_head')
138 '^b9a772c7049325feb7130fff1f8333e9\+9',
139 'wrong md5 hash from Keep.put for "test_head": ' + locator)
140 self.assertEqual(True, self.keep_client.head(locator))
141 self.assertEqual(self.keep_client.get(locator),
143 'wrong content from Keep.get for "test_head"')
145 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
146 class KeepPermissionTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
149 KEEP_SERVER = {'blob_signing': True}
152 DiskCacheBase.tearDown(self)
154 def test_KeepBasicRWTest(self):
155 run_test_server.authorize_with('active')
156 keep_client = arvados.KeepClient(block_cache=self.make_block_cache(self.disk_cache))
157 foo_locator = keep_client.put('foo')
160 r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
161 'invalid locator from Keep.put("foo"): ' + foo_locator)
162 self.assertEqual(keep_client.get(foo_locator),
164 'wrong content from Keep.get(md5("foo"))')
166 # GET with an unsigned locator => bad request
167 bar_locator = keep_client.put('bar')
168 unsigned_bar_locator = "37b51d194a7513e45b56f6524f2d51f2+3"
171 r'^37b51d194a7513e45b56f6524f2d51f2\+3\+A[a-f0-9]+@[a-f0-9]+$',
172 'invalid locator from Keep.put("bar"): ' + bar_locator)
173 self.assertRaises(arvados.errors.KeepReadError,
175 unsigned_bar_locator)
177 # GET from a different user => bad request
178 run_test_server.authorize_with('spectator')
179 self.assertRaises(arvados.errors.KeepReadError,
183 # Unauthenticated GET for a signed locator => bad request
184 # Unauthenticated GET for an unsigned locator => bad request
185 keep_client.api_token = ''
186 self.assertRaises(arvados.errors.KeepReadError,
189 self.assertRaises(arvados.errors.KeepReadError,
191 unsigned_bar_locator)
194 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
195 class KeepProxyTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
199 KEEP_PROXY_SERVER = {}
203 super(KeepProxyTestCase, cls).setUpClass()
204 run_test_server.authorize_with('active')
205 cls.api_client = arvados.api('v1')
208 super(KeepProxyTestCase, self).tearDown()
209 DiskCacheBase.tearDown(self)
211 def test_KeepProxyTest1(self):
212 # Will use ARVADOS_KEEP_SERVICES environment variable that
213 # is set by setUpClass().
214 keep_client = arvados.KeepClient(api_client=self.api_client,
215 local_store='', block_cache=self.make_block_cache(self.disk_cache))
216 baz_locator = keep_client.put('baz')
219 '^73feffa4b7f6bb68e44cf984c85f6e88\+3',
220 'wrong md5 hash from Keep.put("baz"): ' + baz_locator)
221 self.assertEqual(keep_client.get(baz_locator),
223 'wrong content from Keep.get(md5("baz"))')
224 self.assertTrue(keep_client.using_proxy)
226 def test_KeepProxyTestMultipleURIs(self):
227 # Test using ARVADOS_KEEP_SERVICES env var overriding any
228 # existing proxy setting and setting multiple proxies
229 arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
230 keep_client = arvados.KeepClient(api_client=self.api_client,
232 block_cache=self.make_block_cache(self.disk_cache))
233 uris = [x['_service_root'] for x in keep_client._keep_services]
234 self.assertEqual(uris, ['http://10.0.0.1/',
235 'https://foo.example.org:1234/'])
237 def test_KeepProxyTestInvalidURI(self):
238 arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org'
239 with self.assertRaises(arvados.errors.ArgumentError):
240 keep_client = arvados.KeepClient(api_client=self.api_client,
242 block_cache=self.make_block_cache(self.disk_cache))
245 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
246 class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
250 DiskCacheBase.tearDown(self)
252 def get_service_roots(self, api_client):
253 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
254 services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
255 return [urllib.parse.urlparse(url) for url in sorted(services)]
257 def test_ssl_flag_respected_in_roots(self):
258 for ssl_flag in [False, True]:
259 services = self.get_service_roots(self.mock_keep_services(
260 service_ssl_flag=ssl_flag))
262 ('https' if ssl_flag else 'http'), services[0].scheme)
264 def test_correct_ports_with_ipv6_addresses(self):
265 service = self.get_service_roots(self.mock_keep_services(
266 service_type='proxy', service_host='100::1', service_port=10, count=1))[0]
267 self.assertEqual('100::1', service.hostname)
268 self.assertEqual(10, service.port)
270 def test_recognize_proxy_services_in_controller_response(self):
271 keep_client = arvados.KeepClient(api_client=self.mock_keep_services(
272 service_type='proxy', service_host='localhost', service_port=9, count=1),
273 block_cache=self.make_block_cache(self.disk_cache))
275 # this will fail, but it ensures we get the service
277 keep_client.put('baz2', num_retries=0)
280 self.assertTrue(keep_client.using_proxy)
282 def test_insecure_disables_tls_verify(self):
283 api_client = self.mock_keep_services(count=1)
284 force_timeout = socket.timeout("timed out")
286 api_client.insecure = True
287 with tutil.mock_keep_responses(b'foo', 200) as mock:
288 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
289 keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
291 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
294 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
297 api_client.insecure = False
298 with tutil.mock_keep_responses(b'foo', 200) as mock:
299 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
300 keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
301 # getopt()==None here means we didn't change the
302 # default. If we were using real pycurl instead of a mock,
303 # it would return the default value 1.
305 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
308 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
311 def test_refresh_signature(self):
312 blk_digest = '6f5902ac237024bdd0c176cb93063dc4+11'
313 blk_sig = 'da39a3ee5e6b4b0d3255bfef95601890afd80709@53bed294'
314 local_loc = blk_digest+'+A'+blk_sig
315 remote_loc = blk_digest+'+R'+blk_sig
316 api_client = self.mock_keep_services(count=1)
317 headers = {'X-Keep-Locator':local_loc}
318 with tutil.mock_keep_responses('', 200, **headers):
319 # Check that the translated locator gets returned
320 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
321 self.assertEqual(local_loc, keep_client.refresh_signature(remote_loc))
322 # Check that refresh_signature() uses the correct method and headers
323 keep_client._get_or_head = mock.MagicMock()
324 keep_client.refresh_signature(remote_loc)
325 args, kwargs = keep_client._get_or_head.call_args_list[0]
326 self.assertIn(remote_loc, args)
327 self.assertEqual("HEAD", kwargs['method'])
328 self.assertIn('X-Keep-Signature', kwargs['headers'])
330 # test_*_timeout verify that KeepClient instructs pycurl to use
331 # the appropriate connection and read timeouts. They don't care
332 # whether pycurl actually exhibits the expected timeout behavior
333 # -- those tests are in the KeepClientTimeout test class.
335 def test_get_timeout(self):
336 api_client = self.mock_keep_services(count=1)
337 force_timeout = socket.timeout("timed out")
338 with tutil.mock_keep_responses(force_timeout, 0) as mock:
339 keep_client = arvados.KeepClient(
340 api_client=api_client,
341 block_cache=self.make_block_cache(self.disk_cache),
344 with self.assertRaises(arvados.errors.KeepReadError):
345 keep_client.get('ffffffffffffffffffffffffffffffff')
347 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
348 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
350 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
351 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
353 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
354 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
356 def test_put_timeout(self):
357 api_client = self.mock_keep_services(count=1)
358 force_timeout = socket.timeout("timed out")
359 with tutil.mock_keep_responses(force_timeout, 0) as mock:
360 keep_client = arvados.KeepClient(
361 api_client=api_client,
362 block_cache=self.make_block_cache(self.disk_cache),
365 with self.assertRaises(arvados.errors.KeepWriteError):
366 keep_client.put(b'foo')
368 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
369 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
371 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
372 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
374 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
375 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
377 def test_head_timeout(self):
378 api_client = self.mock_keep_services(count=1)
379 force_timeout = socket.timeout("timed out")
380 with tutil.mock_keep_responses(force_timeout, 0) as mock:
381 keep_client = arvados.KeepClient(
382 api_client=api_client,
383 block_cache=self.make_block_cache(self.disk_cache),
386 with self.assertRaises(arvados.errors.KeepReadError):
387 keep_client.head('ffffffffffffffffffffffffffffffff')
389 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
390 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
392 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
395 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
398 def test_proxy_get_timeout(self):
399 api_client = self.mock_keep_services(service_type='proxy', count=1)
400 force_timeout = socket.timeout("timed out")
401 with tutil.mock_keep_responses(force_timeout, 0) as mock:
402 keep_client = arvados.KeepClient(
403 api_client=api_client,
404 block_cache=self.make_block_cache(self.disk_cache),
407 with self.assertRaises(arvados.errors.KeepReadError):
408 keep_client.get('ffffffffffffffffffffffffffffffff')
410 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
411 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
413 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
414 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
416 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
417 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
419 def test_proxy_head_timeout(self):
420 api_client = self.mock_keep_services(service_type='proxy', count=1)
421 force_timeout = socket.timeout("timed out")
422 with tutil.mock_keep_responses(force_timeout, 0) as mock:
423 keep_client = arvados.KeepClient(
424 api_client=api_client,
425 block_cache=self.make_block_cache(self.disk_cache),
428 with self.assertRaises(arvados.errors.KeepReadError):
429 keep_client.head('ffffffffffffffffffffffffffffffff')
431 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
432 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
434 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
437 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
440 def test_proxy_put_timeout(self):
441 self.disk_cache_dir = None
442 api_client = self.mock_keep_services(service_type='proxy', count=1)
443 force_timeout = socket.timeout("timed out")
444 with tutil.mock_keep_responses(force_timeout, 0) as mock:
445 keep_client = arvados.KeepClient(
446 api_client=api_client,
449 with self.assertRaises(arvados.errors.KeepWriteError):
450 keep_client.put('foo')
452 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
453 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
455 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
456 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
458 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
459 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
461 def check_no_services_error(self, verb, exc_class):
462 api_client = mock.MagicMock(name='api_client')
463 api_client.keep_services().accessible().execute.side_effect = (
464 arvados.errors.ApiError)
465 keep_client = arvados.KeepClient(
466 api_client=api_client,
467 block_cache=self.make_block_cache(self.disk_cache),
470 with self.assertRaises(exc_class) as err_check:
471 getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
472 self.assertEqual(0, len(err_check.exception.request_errors()))
474 def test_get_error_with_no_services(self):
475 self.check_no_services_error('get', arvados.errors.KeepReadError)
477 def test_head_error_with_no_services(self):
478 self.check_no_services_error('head', arvados.errors.KeepReadError)
480 def test_put_error_with_no_services(self):
481 self.check_no_services_error('put', arvados.errors.KeepWriteError)
483 def check_errors_from_last_retry(self, verb, exc_class):
484 api_client = self.mock_keep_services(count=2)
485 req_mock = tutil.mock_keep_responses(
486 "retry error reporting test", 500, 500, 500, 500, 500, 500, 502, 502)
487 with req_mock, tutil.skip_sleep, \
488 self.assertRaises(exc_class) as err_check:
489 keep_client = arvados.KeepClient(
490 api_client=api_client,
491 block_cache=self.make_block_cache(self.disk_cache),
494 getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
496 self.assertEqual([502, 502], [
497 getattr(error, 'status_code', None)
498 for error in err_check.exception.request_errors().values()])
499 self.assertRegex(str(err_check.exception), r'failed to (read|write) .* after 4 attempts')
501 def test_get_error_reflects_last_retry(self):
502 self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
504 def test_head_error_reflects_last_retry(self):
505 self.check_errors_from_last_retry('head', arvados.errors.KeepReadError)
507 def test_put_error_reflects_last_retry(self):
508 self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
510 def test_put_error_does_not_include_successful_puts(self):
511 data = 'partial failure test'
512 data_loc = tutil.str_keep_locator(data)
513 api_client = self.mock_keep_services(count=3)
514 with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
515 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
516 keep_client = arvados.KeepClient(
517 api_client=api_client,
518 block_cache=self.make_block_cache(self.disk_cache),
521 keep_client.put(data)
522 self.assertEqual(2, len(exc_check.exception.request_errors()))
524 def test_proxy_put_with_no_writable_services(self):
525 data = 'test with no writable services'
526 data_loc = tutil.str_keep_locator(data)
527 api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
528 with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
529 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
530 keep_client = arvados.KeepClient(
531 api_client=api_client,
532 block_cache=self.make_block_cache(self.disk_cache),
535 keep_client.put(data)
536 self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
537 self.assertEqual(0, len(exc_check.exception.request_errors()))
539 def test_oddball_service_get(self):
540 body = b'oddball service get'
541 api_client = self.mock_keep_services(service_type='fancynewblobstore')
542 with tutil.mock_keep_responses(body, 200):
543 keep_client = arvados.KeepClient(
544 api_client=api_client,
545 block_cache=self.make_block_cache(self.disk_cache),
548 actual = keep_client.get(tutil.str_keep_locator(body))
549 self.assertEqual(body, actual)
551 def test_oddball_service_put(self):
552 body = b'oddball service put'
553 pdh = tutil.str_keep_locator(body)
554 api_client = self.mock_keep_services(service_type='fancynewblobstore')
555 with tutil.mock_keep_responses(pdh, 200):
556 keep_client = arvados.KeepClient(
557 api_client=api_client,
558 block_cache=self.make_block_cache(self.disk_cache),
561 actual = keep_client.put(body, copies=1)
562 self.assertEqual(pdh, actual)
564 def test_oddball_service_writer_count(self):
565 body = b'oddball service writer count'
566 pdh = tutil.str_keep_locator(body)
567 api_client = self.mock_keep_services(service_type='fancynewblobstore',
569 headers = {'x-keep-replicas-stored': 3}
570 with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
571 **headers) as req_mock:
572 keep_client = arvados.KeepClient(
573 api_client=api_client,
574 block_cache=self.make_block_cache(self.disk_cache),
577 actual = keep_client.put(body, copies=2)
578 self.assertEqual(pdh, actual)
579 self.assertEqual(1, req_mock.call_count)
583 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
584 class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
588 self.api_client = self.mock_keep_services(count=2)
589 self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
591 self.locator = '1271ed5ef305aadabc605b1609e24c52'
594 DiskCacheBase.tearDown(self)
596 @mock.patch('arvados.KeepClient.KeepService.get')
597 def test_get_request_cache(self, get_mock):
598 with tutil.mock_keep_responses(self.data, 200, 200):
599 self.keep_client.get(self.locator)
600 self.keep_client.get(self.locator)
601 # Request already cached, don't require more than one request
602 get_mock.assert_called_once()
604 @mock.patch('arvados.KeepClient.KeepService.get')
605 def test_head_request_cache(self, get_mock):
606 with tutil.mock_keep_responses(self.data, 200, 200):
607 self.keep_client.head(self.locator)
608 self.keep_client.head(self.locator)
609 # Don't cache HEAD requests so that they're not confused with GET reqs
610 self.assertEqual(2, get_mock.call_count)
612 @mock.patch('arvados.KeepClient.KeepService.get')
613 def test_head_and_then_get_return_different_responses(self, get_mock):
616 get_mock.side_effect = [b'first response', b'second response']
617 with tutil.mock_keep_responses(self.data, 200, 200):
618 head_resp = self.keep_client.head(self.locator)
619 get_resp = self.keep_client.get(self.locator)
620 self.assertEqual(b'first response', head_resp)
621 # First reponse was not cached because it was from a HEAD request.
622 self.assertNotEqual(head_resp, get_resp)
626 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
627 class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
631 self.api_client = self.mock_keep_services(count=2)
632 self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
634 self.locator = '1271ed5ef305aadabc605b1609e24c52'
635 self.test_id = arvados.util.new_request_id()
636 self.assertRegex(self.test_id, r'^req-[a-z0-9]{20}$')
637 # If we don't set request_id to None explicitly here, it will
638 # return <MagicMock name='api_client_mock.request_id'
640 self.api_client.request_id = None
643 DiskCacheBase.tearDown(self)
645 def test_default_to_api_client_request_id(self):
646 self.api_client.request_id = self.test_id
647 with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
648 self.keep_client.put(self.data)
649 self.assertEqual(2, len(mock.responses))
650 for resp in mock.responses:
651 self.assertProvidedRequestId(resp)
653 with tutil.mock_keep_responses(self.data, 200) as mock:
654 self.keep_client.get(self.locator)
655 self.assertProvidedRequestId(mock.responses[0])
657 with tutil.mock_keep_responses(b'', 200) as mock:
658 self.keep_client.head(self.locator)
659 self.assertProvidedRequestId(mock.responses[0])
661 def test_explicit_request_id(self):
662 with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
663 self.keep_client.put(self.data, request_id=self.test_id)
664 self.assertEqual(2, len(mock.responses))
665 for resp in mock.responses:
666 self.assertProvidedRequestId(resp)
668 with tutil.mock_keep_responses(self.data, 200) as mock:
669 self.keep_client.get(self.locator, request_id=self.test_id)
670 self.assertProvidedRequestId(mock.responses[0])
672 with tutil.mock_keep_responses(b'', 200) as mock:
673 self.keep_client.head(self.locator, request_id=self.test_id)
674 self.assertProvidedRequestId(mock.responses[0])
676 def test_automatic_request_id(self):
677 with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
678 self.keep_client.put(self.data)
679 self.assertEqual(2, len(mock.responses))
680 for resp in mock.responses:
681 self.assertAutomaticRequestId(resp)
683 with tutil.mock_keep_responses(self.data, 200) as mock:
684 self.keep_client.get(self.locator)
685 self.assertAutomaticRequestId(mock.responses[0])
687 with tutil.mock_keep_responses(b'', 200) as mock:
688 self.keep_client.head(self.locator)
689 self.assertAutomaticRequestId(mock.responses[0])
691 def test_request_id_in_exception(self):
692 with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
693 with self.assertRaisesRegex(arvados.errors.KeepReadError, self.test_id):
694 self.keep_client.head(self.locator, request_id=self.test_id)
696 with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
697 with self.assertRaisesRegex(arvados.errors.KeepReadError, r'req-[a-z0-9]{20}'):
698 self.keep_client.get(self.locator)
700 with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
701 with self.assertRaisesRegex(arvados.errors.KeepWriteError, self.test_id):
702 self.keep_client.put(self.data, request_id=self.test_id)
704 with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
705 with self.assertRaisesRegex(arvados.errors.KeepWriteError, r'req-[a-z0-9]{20}'):
706 self.keep_client.put(self.data)
708 def assertAutomaticRequestId(self, resp):
709 hdr = [x for x in resp.getopt(pycurl.HTTPHEADER)
710 if x.startswith('X-Request-Id: ')][0]
711 self.assertNotEqual(hdr, 'X-Request-Id: '+self.test_id)
712 self.assertRegex(hdr, r'^X-Request-Id: req-[a-z0-9]{20}$')
714 def assertProvidedRequestId(self, resp):
715 self.assertIn('X-Request-Id: '+self.test_id,
716 resp.getopt(pycurl.HTTPHEADER))
720 #@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
721 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
725 # expected_order[i] is the probe order for
726 # hash=md5(sprintf("%064x",i)) where there are 16 services
727 # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
728 # the first probe for the block consisting of 64 "0"
729 # characters is the service whose uuid is
730 # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
732 self.expected_order = [
733 list('3eab2d5fc9681074'),
734 list('097dba52e648f1c3'),
735 list('c5b4e023f8a7d691'),
736 list('9d81c02e76a3bf54'),
739 "{:064x}".format(x).encode()
740 for x in range(len(self.expected_order))]
742 hashlib.md5(self.blocks[x]).hexdigest()
743 for x in range(len(self.expected_order))]
744 self.api_client = self.mock_keep_services(count=self.services)
745 self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
748 DiskCacheBase.tearDown(self)
750 def test_weighted_service_roots_against_reference_set(self):
751 # Confirm weighted_service_roots() returns the correct order
752 for i, hash in enumerate(self.hashes):
753 roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash))
755 re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
757 self.assertEqual(self.expected_order[i], got_order)
759 def test_get_probe_order_against_reference_set(self):
760 self._test_probe_order_against_reference_set(
761 lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
763 def test_head_probe_order_against_reference_set(self):
764 self._test_probe_order_against_reference_set(
765 lambda i: self.keep_client.head(self.hashes[i], num_retries=1))
767 def test_put_probe_order_against_reference_set(self):
768 # copies=1 prevents the test from being sensitive to races
769 # between writer threads.
770 self._test_probe_order_against_reference_set(
771 lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1))
773 def _test_probe_order_against_reference_set(self, op):
774 for i in range(len(self.blocks)):
775 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \
776 self.assertRaises(arvados.errors.KeepRequestError):
779 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
780 for resp in mock.responses]
781 self.assertEqual(self.expected_order[i]*2, got_order)
783 def test_put_probe_order_multiple_copies(self):
784 for copies in range(2, 4):
785 for i in range(len(self.blocks)):
786 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \
787 self.assertRaises(arvados.errors.KeepWriteError):
788 self.keep_client.put(self.blocks[i], num_retries=2, copies=copies)
790 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
791 for resp in mock.responses]
792 # With T threads racing to make requests, the position
793 # of a given server in the sequence of HTTP requests
794 # (got_order) cannot be more than T-1 positions
795 # earlier than that server's position in the reference
796 # probe sequence (expected_order).
798 # Loop invariant: we have accounted for +pos+ expected
799 # probes, either by seeing them in +got_order+ or by
800 # putting them in +pending+ in the hope of seeing them
801 # later. As long as +len(pending)<T+, we haven't
802 # started a request too early.
804 for pos, expected in enumerate(self.expected_order[i]*3):
805 got = got_order[pos-len(pending)]
806 while got in pending:
807 del pending[pending.index(got)]
808 got = got_order[pos-len(pending)]
810 pending.append(expected)
812 len(pending), copies,
813 "pending={}, with copies={}, got {}, expected {}".format(
814 pending, copies, repr(got_order), repr(self.expected_order[i]*3)))
816 def test_probe_waste_adding_one_server(self):
818 hashlib.md5("{:064x}".format(x).encode()).hexdigest() for x in range(100)]
819 initial_services = 12
820 self.api_client = self.mock_keep_services(count=initial_services)
821 self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
823 self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
824 for added_services in range(1, 12):
825 api_client = self.mock_keep_services(count=initial_services+added_services)
826 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
828 for hash_index in range(len(hashes)):
829 probe_after = keep_client.weighted_service_roots(
830 arvados.KeepLocator(hashes[hash_index]))
831 penalty = probe_after.index(probes_before[hash_index][0])
832 self.assertLessEqual(penalty, added_services)
833 total_penalty += penalty
834 # Average penalty per block should not exceed
835 # N(added)/N(orig) by more than 20%, and should get closer
836 # to the ideal as we add data points.
839 len(hashes) / initial_services)
842 (120 - added_services)/100)
844 expect_penalty * 8/10)
846 min_penalty <= total_penalty <= max_penalty,
847 "With {}+{} services, {} blocks, penalty {} but expected {}..{}".format(
855 def check_64_zeros_error_order(self, verb, exc_class):
858 data = tutil.str_keep_locator(data)
859 # Arbitrary port number:
860 aport = random.randint(1024,65535)
861 api_client = self.mock_keep_services(service_port=aport, count=self.services)
862 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
863 with mock.patch('pycurl.Curl') as curl_mock, \
864 self.assertRaises(exc_class) as err_check:
865 curl_mock.return_value = tutil.FakeCurl.make(code=500, body=b'')
866 getattr(keep_client, verb)(data)
867 urls = [urllib.parse.urlparse(url)
868 for url in err_check.exception.request_errors()]
869 self.assertEqual([('keep0x' + c, aport) for c in '3eab2d5fc9681074'],
870 [(url.hostname, url.port) for url in urls])
872 def test_get_error_shows_probe_order(self):
873 self.check_64_zeros_error_order('get', arvados.errors.KeepReadError)
875 def test_put_error_shows_probe_order(self):
876 self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
879 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
880 class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase, DiskCacheBase):
883 # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
884 # 1s worth of data and then trigger bandwidth errors before running
887 BANDWIDTH_LOW_LIM = 1024
891 DiskCacheBase.tearDown(self)
893 class assertTakesBetween(unittest.TestCase):
894 def __init__(self, tmin, tmax):
899 self.t0 = time.time()
901 def __exit__(self, *args, **kwargs):
902 # Round times to milliseconds, like CURL. Otherwise, we
903 # fail when CURL reaches a 1s timeout at 0.9998s.
904 delta = round(time.time() - self.t0, 3)
905 self.assertGreaterEqual(delta, self.tmin)
906 self.assertLessEqual(delta, self.tmax)
908 class assertTakesGreater(unittest.TestCase):
909 def __init__(self, tmin):
913 self.t0 = time.time()
915 def __exit__(self, *args, **kwargs):
916 delta = round(time.time() - self.t0, 3)
917 self.assertGreaterEqual(delta, self.tmin)
919 def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
920 return arvados.KeepClient(
921 api_client=self.api_client,
922 timeout=timeouts, block_cache=self.make_block_cache(self.disk_cache))
924 def test_timeout_slow_connect(self):
925 # Can't simulate TCP delays with our own socket. Leave our
926 # stub server running uselessly, and try to connect to an
927 # unroutable IP address instead.
928 self.api_client = self.mock_keep_services(
930 service_host='240.0.0.0',
932 with self.assertTakesBetween(0.1, 0.5):
933 with self.assertRaises(arvados.errors.KeepWriteError):
934 self.keepClient().put(self.DATA, copies=1, num_retries=0)
936 def test_low_bandwidth_no_delays_success(self):
937 self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
938 kc = self.keepClient()
939 loc = kc.put(self.DATA, copies=1, num_retries=0)
940 self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
942 def test_too_low_bandwidth_no_delays_failure(self):
943 # Check that lessening bandwidth corresponds to failing
944 kc = self.keepClient()
945 loc = kc.put(self.DATA, copies=1, num_retries=0)
946 self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
947 with self.assertTakesGreater(self.TIMEOUT_TIME):
948 with self.assertRaises(arvados.errors.KeepReadError):
949 kc.get(loc, num_retries=0)
950 with self.assertTakesGreater(self.TIMEOUT_TIME):
951 with self.assertRaises(arvados.errors.KeepWriteError):
952 kc.put(self.DATA, copies=1, num_retries=0)
954 def test_low_bandwidth_with_server_response_delay_failure(self):
955 kc = self.keepClient()
956 loc = kc.put(self.DATA, copies=1, num_retries=0)
957 self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
958 # Note the actual delay must be 1s longer than the low speed
959 # limit interval in order for curl to detect it reliably.
960 self.server.setdelays(response=self.TIMEOUT_TIME+1)
961 with self.assertTakesGreater(self.TIMEOUT_TIME):
962 with self.assertRaises(arvados.errors.KeepReadError):
963 kc.get(loc, num_retries=0)
964 with self.assertTakesGreater(self.TIMEOUT_TIME):
965 with self.assertRaises(arvados.errors.KeepWriteError):
966 kc.put(self.DATA, copies=1, num_retries=0)
967 with self.assertTakesGreater(self.TIMEOUT_TIME):
968 kc.head(loc, num_retries=0)
970 def test_low_bandwidth_with_server_mid_delay_failure(self):
971 kc = self.keepClient()
972 loc = kc.put(self.DATA, copies=1, num_retries=0)
973 self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
974 # Note the actual delay must be 1s longer than the low speed
975 # limit interval in order for curl to detect it reliably.
976 self.server.setdelays(mid_write=self.TIMEOUT_TIME+1, mid_read=self.TIMEOUT_TIME+1)
977 with self.assertTakesGreater(self.TIMEOUT_TIME):
978 with self.assertRaises(arvados.errors.KeepReadError) as e:
979 kc.get(loc, num_retries=0)
980 with self.assertTakesGreater(self.TIMEOUT_TIME):
981 with self.assertRaises(arvados.errors.KeepWriteError):
982 kc.put(self.DATA, copies=1, num_retries=0)
984 def test_timeout_slow_request(self):
985 loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
986 self.server.setdelays(request=.2)
987 self._test_connect_timeout_under_200ms(loc)
988 self.server.setdelays(request=2)
989 self._test_response_timeout_under_2s(loc)
991 def test_timeout_slow_response(self):
992 loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
993 self.server.setdelays(response=.2)
994 self._test_connect_timeout_under_200ms(loc)
995 self.server.setdelays(response=2)
996 self._test_response_timeout_under_2s(loc)
998 def test_timeout_slow_response_body(self):
999 loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
1000 self.server.setdelays(response_body=.2)
1001 self._test_connect_timeout_under_200ms(loc)
1002 self.server.setdelays(response_body=2)
1003 self._test_response_timeout_under_2s(loc)
1005 def _test_connect_timeout_under_200ms(self, loc):
1006 # Allow 100ms to connect, then 1s for response. Everything
1007 # should work, and everything should take at least 200ms to
1009 kc = self.keepClient(timeouts=(.1, 1))
1010 with self.assertTakesBetween(.2, .3):
1011 kc.put(self.DATA, copies=1, num_retries=0)
1012 with self.assertTakesBetween(.2, .3):
1013 self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
1015 def _test_response_timeout_under_2s(self, loc):
1016 # Allow 10s to connect, then 1s for response. Nothing should
1017 # work, and everything should take at least 1s to return.
1018 kc = self.keepClient(timeouts=(10, 1))
1019 with self.assertTakesBetween(1, 9):
1020 with self.assertRaises(arvados.errors.KeepReadError):
1021 kc.get(loc, num_retries=0)
1022 with self.assertTakesBetween(1, 9):
1023 with self.assertRaises(arvados.errors.KeepWriteError):
1024 kc.put(self.DATA, copies=1, num_retries=0)
1027 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1028 class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
1032 DiskCacheBase.tearDown(self)
1034 def mock_disks_and_gateways(self, disks=3, gateways=1):
1036 'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
1037 'owner_uuid': 'zzzzz-tpzed-000000000000000',
1038 'service_host': 'gatewayhost{}'.format(i),
1039 'service_port': 12345,
1040 'service_ssl_flag': True,
1041 'service_type': 'gateway:test',
1042 } for i in range(gateways)]
1043 self.gateway_roots = [
1044 "https://{service_host}:{service_port}/".format(**gw)
1045 for gw in self.gateways]
1046 self.api_client = self.mock_keep_services(
1047 count=disks, additional_services=self.gateways)
1048 self.keepClient = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
1050 @mock.patch('pycurl.Curl')
1051 def test_get_with_gateway_hint_first(self, MockCurl):
1052 MockCurl.return_value = tutil.FakeCurl.make(
1053 code=200, body='foo', headers={'Content-Length': 3})
1054 self.mock_disks_and_gateways()
1055 locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@' + self.gateways[0]['uuid']
1056 self.assertEqual(b'foo', self.keepClient.get(locator))
1057 self.assertEqual(self.gateway_roots[0]+locator,
1058 MockCurl.return_value.getopt(pycurl.URL).decode())
1059 self.assertEqual(True, self.keepClient.head(locator))
1061 @mock.patch('pycurl.Curl')
1062 def test_get_with_gateway_hints_in_order(self, MockCurl):
1066 tutil.FakeCurl.make(code=404, body='')
1067 for _ in range(gateways+disks)
1069 MockCurl.side_effect = tutil.queue_with(mocks)
1070 self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1071 locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1072 ['K@'+gw['uuid'] for gw in self.gateways])
1073 with self.assertRaises(arvados.errors.NotFoundError):
1074 self.keepClient.get(locator)
1075 # Gateways are tried first, in the order given.
1076 for i, root in enumerate(self.gateway_roots):
1077 self.assertEqual(root+locator,
1078 mocks[i].getopt(pycurl.URL).decode())
1079 # Disk services are tried next.
1080 for i in range(gateways, gateways+disks):
1082 mocks[i].getopt(pycurl.URL).decode(),
1085 @mock.patch('pycurl.Curl')
1086 def test_head_with_gateway_hints_in_order(self, MockCurl):
1090 tutil.FakeCurl.make(code=404, body=b'')
1091 for _ in range(gateways+disks)
1093 MockCurl.side_effect = tutil.queue_with(mocks)
1094 self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1095 locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1096 ['K@'+gw['uuid'] for gw in self.gateways])
1097 with self.assertRaises(arvados.errors.NotFoundError):
1098 self.keepClient.head(locator)
1099 # Gateways are tried first, in the order given.
1100 for i, root in enumerate(self.gateway_roots):
1101 self.assertEqual(root+locator,
1102 mocks[i].getopt(pycurl.URL).decode())
1103 # Disk services are tried next.
1104 for i in range(gateways, gateways+disks):
1106 mocks[i].getopt(pycurl.URL).decode(),
1109 @mock.patch('pycurl.Curl')
1110 def test_get_with_remote_proxy_hint(self, MockCurl):
1111 MockCurl.return_value = tutil.FakeCurl.make(
1112 code=200, body=b'foo', headers={'Content-Length': 3})
1113 self.mock_disks_and_gateways()
1114 locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1115 self.assertEqual(b'foo', self.keepClient.get(locator))
1116 self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1117 MockCurl.return_value.getopt(pycurl.URL).decode())
1119 @mock.patch('pycurl.Curl')
1120 def test_head_with_remote_proxy_hint(self, MockCurl):
1121 MockCurl.return_value = tutil.FakeCurl.make(
1122 code=200, body=b'foo', headers={'Content-Length': 3})
1123 self.mock_disks_and_gateways()
1124 locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1125 self.assertEqual(True, self.keepClient.head(locator))
1126 self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1127 MockCurl.return_value.getopt(pycurl.URL).decode())
1130 class KeepClientRetryTestMixin(object):
1133 # Testing with a local Keep store won't exercise the retry behavior.
1134 # Instead, our strategy is:
1135 # * Create a client with one proxy specified (pointed at a black
1136 # hole), so there's no need to instantiate an API client, and
1137 # all HTTP requests come from one place.
1138 # * Mock httplib's request method to provide simulated responses.
1139 # This lets us test the retry logic extensively without relying on any
1140 # supporting servers, and prevents side effects in case something hiccups.
1141 # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
1144 # Test classes must define TEST_PATCHER to a method that mocks
1145 # out appropriate methods in the client.
1147 PROXY_ADDR = 'http://[%s]:65535/' % (tutil.TEST_HOST,)
1148 TEST_DATA = b'testdata'
1149 TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
1152 self.client_kwargs = {'proxy': self.PROXY_ADDR, 'local_store': ''}
1154 def new_client(self, **caller_kwargs):
1155 kwargs = self.client_kwargs.copy()
1156 kwargs.update(caller_kwargs)
1157 kwargs['block_cache'] = self.make_block_cache(self.disk_cache)
1158 return arvados.KeepClient(**kwargs)
1160 def run_method(self, *args, **kwargs):
1161 raise NotImplementedError("test subclasses must define run_method")
1163 def check_success(self, expected=None, *args, **kwargs):
1164 if expected is None:
1165 expected = self.DEFAULT_EXPECT
1166 self.assertEqual(expected, self.run_method(*args, **kwargs))
1168 def check_exception(self, error_class=None, *args, **kwargs):
1169 if error_class is None:
1170 error_class = self.DEFAULT_EXCEPTION
1171 with self.assertRaises(error_class) as err:
1172 self.run_method(*args, **kwargs)
1175 def test_immediate_success(self):
1176 with self.TEST_PATCHER(self.DEFAULT_EXPECT, 200):
1177 self.check_success()
1179 def test_retry_then_success(self):
1180 with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1181 self.check_success(num_retries=3)
1183 def test_exception_then_success(self):
1184 with self.TEST_PATCHER(self.DEFAULT_EXPECT, Exception('mock err'), 200):
1185 self.check_success(num_retries=3)
1187 def test_no_retry_after_permanent_error(self):
1188 with self.TEST_PATCHER(self.DEFAULT_EXPECT, 403, 200):
1189 self.check_exception(num_retries=3)
1191 def test_error_after_retries_exhausted(self):
1192 with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 500, 200):
1193 err = self.check_exception(num_retries=1)
1194 self.assertRegex(str(err.exception), r'failed to .* after 2 attempts')
1196 def test_num_retries_instance_fallback(self):
1197 self.client_kwargs['num_retries'] = 3
1198 with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1199 self.check_success()
1203 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1204 class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1205 DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
1206 DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1207 HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1208 TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1211 DiskCacheBase.tearDown(self)
1213 def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1215 return self.new_client().get(locator, *args, **kwargs)
1217 def test_specific_exception_when_not_found(self):
1218 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1219 self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1221 def test_general_exception_with_mixed_errors(self):
1222 # get should raise a NotFoundError if no server returns the block,
1223 # and a high threshold of servers report that it's not found.
1224 # This test rigs up 50/50 disagreement between two servers, and
1225 # checks that it does not become a NotFoundError.
1226 client = self.new_client(num_retries=0)
1227 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1228 with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1229 client.get(self.HINTED_LOCATOR)
1230 self.assertNotIsInstance(
1231 exc_check.exception, arvados.errors.NotFoundError,
1232 "mixed errors raised NotFoundError")
1234 def test_hint_server_can_succeed_without_retries(self):
1235 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1236 self.check_success(locator=self.HINTED_LOCATOR)
1238 def test_try_next_server_after_timeout(self):
1239 with tutil.mock_keep_responses(
1240 (socket.timeout("timed out"), 200),
1241 (self.DEFAULT_EXPECT, 200)):
1242 self.check_success(locator=self.HINTED_LOCATOR)
1244 def test_retry_data_with_wrong_checksum(self):
1245 with tutil.mock_keep_responses(
1247 (self.DEFAULT_EXPECT, 200)):
1248 self.check_success(locator=self.HINTED_LOCATOR)
1252 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1253 class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1254 DEFAULT_EXPECT = True
1255 DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1256 HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1257 TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1260 DiskCacheBase.tearDown(self)
1262 def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1264 return self.new_client().head(locator, *args, **kwargs)
1266 def test_specific_exception_when_not_found(self):
1267 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1268 self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1270 def test_general_exception_with_mixed_errors(self):
1271 # head should raise a NotFoundError if no server returns the block,
1272 # and a high threshold of servers report that it's not found.
1273 # This test rigs up 50/50 disagreement between two servers, and
1274 # checks that it does not become a NotFoundError.
1275 client = self.new_client(num_retries=0)
1276 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1277 with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1278 client.head(self.HINTED_LOCATOR)
1279 self.assertNotIsInstance(
1280 exc_check.exception, arvados.errors.NotFoundError,
1281 "mixed errors raised NotFoundError")
1283 def test_hint_server_can_succeed_without_retries(self):
1284 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1285 self.check_success(locator=self.HINTED_LOCATOR)
1287 def test_try_next_server_after_timeout(self):
1288 with tutil.mock_keep_responses(
1289 (socket.timeout("timed out"), 200),
1290 (self.DEFAULT_EXPECT, 200)):
1291 self.check_success(locator=self.HINTED_LOCATOR)
1295 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1296 class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1297 DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
1298 DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
1299 TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1302 DiskCacheBase.tearDown(self)
1304 def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
1305 copies=1, *args, **kwargs):
1306 return self.new_client().put(data, copies, *args, **kwargs)
1308 def test_do_not_send_multiple_copies_to_same_server(self):
1309 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200):
1310 self.check_exception(copies=2, num_retries=3)
1313 class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
1314 class FakeKeepService(object):
1315 def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
1317 self.will_succeed = will_succeed
1318 self.will_raise = will_raise
1320 self._result['headers'] = {}
1321 self._result['headers']['x-keep-replicas-stored'] = str(replicas)
1322 self._result['headers']['x-keep-storage-classes-confirmed'] = 'default={}'.format(replicas)
1323 self._result['body'] = 'foobar'
1325 def put(self, data_hash, data, timeout, headers):
1326 time.sleep(self.delay)
1327 if self.will_raise is not None:
1328 raise self.will_raise
1329 return self.will_succeed
1331 def last_result(self):
1332 if self.will_succeed:
1335 return {"status_code": 500, "body": "didn't succeed"}
1343 self.pool = arvados.KeepClient.KeepWriterThreadPool(
1345 data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1346 max_service_replicas = self.copies,
1347 copies = self.copies
1350 def test_only_write_enough_on_success(self):
1352 ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1353 self.pool.add_task(ks, None)
1355 self.assertEqual(self.pool.done(), (self.copies, []))
1357 def test_only_write_enough_on_partial_success(self):
1359 ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
1360 self.pool.add_task(ks, None)
1361 ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1362 self.pool.add_task(ks, None)
1364 self.assertEqual(self.pool.done(), (self.copies, []))
1366 def test_only_write_enough_when_some_crash(self):
1368 ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1369 self.pool.add_task(ks, None)
1370 ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1371 self.pool.add_task(ks, None)
1373 self.assertEqual(self.pool.done(), (self.copies, []))
1375 def test_fail_when_too_many_crash(self):
1376 for i in range(self.copies+1):
1377 ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1378 self.pool.add_task(ks, None)
1379 for i in range(self.copies-1):
1380 ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1381 self.pool.add_task(ks, None)
1383 self.assertEqual(self.pool.done(), (self.copies-1, []))
1387 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1388 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
1391 # Test put()s that need two distinct servers to succeed, possibly
1392 # requiring multiple passes through the retry loop.
1395 self.api_client = self.mock_keep_services(count=2)
1396 self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
1399 DiskCacheBase.tearDown(self)
1401 def test_success_after_exception(self):
1402 with tutil.mock_keep_responses(
1403 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1404 Exception('mock err'), 200, 200) as req_mock:
1405 self.keep_client.put('foo', num_retries=1, copies=2)
1406 self.assertEqual(3, req_mock.call_count)
1408 def test_success_after_retryable_error(self):
1409 with tutil.mock_keep_responses(
1410 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1411 500, 200, 200) as req_mock:
1412 self.keep_client.put('foo', num_retries=1, copies=2)
1413 self.assertEqual(3, req_mock.call_count)
1415 def test_fail_after_final_error(self):
1416 # First retry loop gets a 200 (can't achieve replication by
1417 # storing again on that server) and a 400 (can't retry that
1418 # server at all), so we shouldn't try a third request.
1419 with tutil.mock_keep_responses(
1420 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1421 200, 400, 200) as req_mock:
1422 with self.assertRaises(arvados.errors.KeepWriteError):
1423 self.keep_client.put('foo', num_retries=1, copies=2)
1424 self.assertEqual(2, req_mock.call_count)
1427 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1428 class KeepClientAPIErrorTest(unittest.TestCase, DiskCacheBase):
1432 DiskCacheBase.tearDown(self)
1434 def test_api_fail(self):
1435 class ApiMock(object):
1436 def __getattr__(self, r):
1437 if r == "api_token":
1439 elif r == "insecure":
1444 raise arvados.errors.KeepReadError()
1445 keep_client = arvados.KeepClient(api_client=ApiMock(),
1446 proxy='', local_store='',
1447 block_cache=self.make_block_cache(self.disk_cache))
1449 # The bug this is testing for is that if an API (not
1450 # keepstore) exception is thrown as part of a get(), the next
1451 # attempt to get that same block will result in a deadlock.
1452 # This is why there are two get()s in a row. Unfortunately,
1453 # the failure mode for this test is that the test suite
1454 # deadlocks, there isn't a good way to avoid that without
1455 # adding a special case that has no use except for this test.
1457 with self.assertRaises(arvados.errors.KeepReadError):
1458 keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1459 with self.assertRaises(arvados.errors.KeepReadError):
1460 keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1463 class KeepDiskCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
1465 self.api_client = self.mock_keep_services(count=2)
1466 self.data = b'xyzzy'
1467 self.locator = '1271ed5ef305aadabc605b1609e24c52'
1468 self.disk_cache_dir = tempfile.mkdtemp()
1471 shutil.rmtree(self.disk_cache_dir)
1473 @mock.patch('arvados.KeepClient.KeepService.get')
1474 def test_disk_cache_read(self, get_mock):
1475 # confirm it finds an existing cache block when the cache is
1478 os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1479 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1482 # block cache should have found the existing block
1483 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1484 disk_cache_dir=self.disk_cache_dir)
1485 keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1487 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1489 get_mock.assert_not_called()
1491 @mock.patch('arvados.KeepClient.KeepService.get')
1492 def test_disk_cache_share(self, get_mock):
1493 # confirm it finds a cache block written after the disk cache
1496 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1497 disk_cache_dir=self.disk_cache_dir)
1498 keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1500 os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1501 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1504 # when we try to get the block, it'll check the disk and find it.
1505 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1507 get_mock.assert_not_called()
1509 def test_disk_cache_write(self):
1510 # confirm the cache block was created
1512 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1513 disk_cache_dir=self.disk_cache_dir)
1514 keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1516 with tutil.mock_keep_responses(self.data, 200) as mock:
1517 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1519 self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1521 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1522 self.assertTrue(tutil.binary_compare(f.read(), self.data))
1524 def test_disk_cache_clean(self):
1525 # confirm that a tmp file in the cache is cleaned up
1527 os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1528 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock"), "wb") as f:
1531 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC"), "wb") as f:
1534 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC"), "wb") as f:
1537 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1538 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1539 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1541 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1542 disk_cache_dir=self.disk_cache_dir)
1544 # The tmp still hasn't been deleted because it was created in the last 60 seconds
1545 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1546 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1547 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1549 # Set the mtime to 61s in the past
1550 os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock"), times=(time.time()-61, time.time()-61))
1551 os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC"), times=(time.time()-61, time.time()-61))
1552 os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC"), times=(time.time()-61, time.time()-61))
1554 block_cache2 = arvados.keep.KeepBlockCache(disk_cache=True,
1555 disk_cache_dir=self.disk_cache_dir)
1557 # Tmp should be gone but the other ones are safe.
1558 self.assertFalse(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1559 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1560 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1562 @mock.patch('arvados.KeepClient.KeepService.get')
1563 def test_disk_cache_cap(self, get_mock):
1564 # confirm that the cache is kept to the desired limit
1566 os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1567 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1570 os.makedirs(os.path.join(self.disk_cache_dir, "acb"))
1571 with open(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock"), "wb") as f:
1574 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1575 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1577 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1578 disk_cache_dir=self.disk_cache_dir,
1581 self.assertFalse(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1582 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1584 @mock.patch('arvados.KeepClient.KeepService.get')
1585 def test_disk_cache_share(self, get_mock):
1586 # confirm that a second cache doesn't delete files that belong to the first cache.
1588 os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1589 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1592 os.makedirs(os.path.join(self.disk_cache_dir, "acb"))
1593 with open(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock"), "wb") as f:
1596 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1597 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1599 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1600 disk_cache_dir=self.disk_cache_dir,
1603 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1604 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1606 block_cache2 = arvados.keep.KeepBlockCache(disk_cache=True,
1607 disk_cache_dir=self.disk_cache_dir,
1610 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1611 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1613 def test_disk_cache_error(self):
1614 os.chmod(self.disk_cache_dir, stat.S_IRUSR)
1616 # Fail during cache initialization.
1617 with self.assertRaises(OSError):
1618 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1619 disk_cache_dir=self.disk_cache_dir)
1621 def test_disk_cache_write_error(self):
1622 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1623 disk_cache_dir=self.disk_cache_dir)
1625 keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1627 # Make the cache dir read-only
1628 os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1629 os.chmod(os.path.join(self.disk_cache_dir, self.locator[0:3]), stat.S_IRUSR)
1632 with self.assertRaises(arvados.errors.KeepCacheError):
1633 with tutil.mock_keep_responses(self.data, 200) as mock:
1634 keep_client.get(self.locator)
1636 def test_disk_cache_retry_write_error(self):
1637 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1638 disk_cache_dir=self.disk_cache_dir)
1640 keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1643 realmmap = mmap.mmap
1644 def sideeffect_mmap(*args, **kwargs):
1648 raise OSError(errno.ENOSPC, "no space")
1650 return realmmap(*args, **kwargs)
1652 with patch('mmap.mmap') as mockmmap:
1653 mockmmap.side_effect = sideeffect_mmap
1655 cache_max_before = block_cache.cache_max
1657 with tutil.mock_keep_responses(self.data, 200) as mock:
1658 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1660 self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1662 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1663 self.assertTrue(tutil.binary_compare(f.read(), self.data))
1665 # shrank the cache in response to ENOSPC
1666 self.assertTrue(cache_max_before > block_cache.cache_max)
1668 def test_disk_cache_retry_write_error2(self):
1669 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1670 disk_cache_dir=self.disk_cache_dir)
1672 keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1675 realmmap = mmap.mmap
1676 def sideeffect_mmap(*args, **kwargs):
1680 raise OSError(errno.ENOMEM, "no memory")
1682 return realmmap(*args, **kwargs)
1684 with patch('mmap.mmap') as mockmmap:
1685 mockmmap.side_effect = sideeffect_mmap
1687 slots_before = block_cache._max_slots
1689 with tutil.mock_keep_responses(self.data, 200) as mock:
1690 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1692 self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1694 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1695 self.assertTrue(tutil.binary_compare(f.read(), self.data))
1697 # shrank the cache in response to ENOMEM
1698 self.assertTrue(slots_before > block_cache._max_slots)