1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 standard_library.install_aliases()
9 from builtins import str
10 from builtins import range
11 from builtins import object
28 from unittest import mock
29 from unittest.mock import patch
36 from . import arvados_testutil as tutil
37 from . import keepstub
38 from . import run_test_server
40 from .arvados_testutil import DiskCacheBase
42 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
43 class KeepTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
47 block_cache_test = None
51 super(KeepTestCase, cls).setUpClass()
52 run_test_server.authorize_with("admin")
53 cls.api_client = arvados.api('v1')
54 cls.block_cache_test = DiskCacheBase()
55 cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
56 proxy='', local_store='',
57 block_cache=cls.block_cache_test.make_block_cache(cls.disk_cache))
60 def tearDownClass(cls):
61 super(KeepTestCase, cls).setUpClass()
62 cls.block_cache_test.tearDown()
64 def test_KeepBasicRWTest(self):
65 self.assertEqual(0, self.keep_client.upload_counter.get())
66 foo_locator = self.keep_client.put('foo')
69 '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
70 'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
72 # 6 bytes because uploaded 2 copies
73 self.assertEqual(6, self.keep_client.upload_counter.get())
75 self.assertEqual(0, self.keep_client.download_counter.get())
76 self.assertTrue(tutil.binary_compare(self.keep_client.get(foo_locator),
78 'wrong content from Keep.get(md5("foo"))')
79 self.assertEqual(3, self.keep_client.download_counter.get())
81 def test_KeepBinaryRWTest(self):
82 blob_str = b'\xff\xfe\xf7\x00\x01\x02'
83 blob_locator = self.keep_client.put(blob_str)
86 '^7fc7c53b45e53926ba52821140fef396\+6',
87 ('wrong locator from Keep.put(<binarydata>):' + blob_locator))
88 self.assertEqual(self.keep_client.get(blob_locator),
90 'wrong content from Keep.get(md5(<binarydata>))')
92 def test_KeepLongBinaryRWTest(self):
93 blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
94 for i in range(0, 23):
95 blob_data = blob_data + blob_data
96 blob_locator = self.keep_client.put(blob_data)
99 '^84d90fc0d8175dd5dcfab04b999bc956\+67108864',
100 ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
101 self.assertEqual(self.keep_client.get(blob_locator),
103 'wrong content from Keep.get(md5(<binarydata>))')
105 @unittest.skip("unreliable test - please fix and close #8752")
106 def test_KeepSingleCopyRWTest(self):
107 blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
108 blob_locator = self.keep_client.put(blob_data, copies=1)
111 '^c902006bc98a3eb4a3663b65ab4a6fab\+8',
112 ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
113 self.assertEqual(self.keep_client.get(blob_locator),
115 'wrong content from Keep.get(md5(<binarydata>))')
117 def test_KeepEmptyCollectionTest(self):
118 blob_locator = self.keep_client.put('', copies=1)
121 '^d41d8cd98f00b204e9800998ecf8427e\+0',
122 ('wrong locator from Keep.put(""): ' + blob_locator))
124 def test_unicode_must_be_ascii(self):
125 # If unicode type, must only consist of valid ASCII
126 foo_locator = self.keep_client.put(u'foo')
129 '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
130 'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
132 if sys.version_info < (3, 0):
133 with self.assertRaises(UnicodeEncodeError):
134 # Error if it is not ASCII
135 self.keep_client.put(u'\xe2')
137 with self.assertRaises(AttributeError):
138 # Must be bytes or have an encode() method
139 self.keep_client.put({})
141 def test_KeepHeadTest(self):
142 locator = self.keep_client.put('test_head')
145 '^b9a772c7049325feb7130fff1f8333e9\+9',
146 'wrong md5 hash from Keep.put for "test_head": ' + locator)
147 self.assertEqual(True, self.keep_client.head(locator))
148 self.assertEqual(self.keep_client.get(locator),
150 'wrong content from Keep.get for "test_head"')
152 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
153 class KeepPermissionTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
156 KEEP_SERVER = {'blob_signing': True}
159 DiskCacheBase.tearDown(self)
161 def test_KeepBasicRWTest(self):
162 run_test_server.authorize_with('active')
163 keep_client = arvados.KeepClient(block_cache=self.make_block_cache(self.disk_cache))
164 foo_locator = keep_client.put('foo')
167 r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
168 'invalid locator from Keep.put("foo"): ' + foo_locator)
169 self.assertEqual(keep_client.get(foo_locator),
171 'wrong content from Keep.get(md5("foo"))')
173 # GET with an unsigned locator => bad request
174 bar_locator = keep_client.put('bar')
175 unsigned_bar_locator = "37b51d194a7513e45b56f6524f2d51f2+3"
178 r'^37b51d194a7513e45b56f6524f2d51f2\+3\+A[a-f0-9]+@[a-f0-9]+$',
179 'invalid locator from Keep.put("bar"): ' + bar_locator)
180 self.assertRaises(arvados.errors.KeepReadError,
182 unsigned_bar_locator)
184 # GET from a different user => bad request
185 run_test_server.authorize_with('spectator')
186 self.assertRaises(arvados.errors.KeepReadError,
190 # Unauthenticated GET for a signed locator => bad request
191 # Unauthenticated GET for an unsigned locator => bad request
192 keep_client.api_token = ''
193 self.assertRaises(arvados.errors.KeepReadError,
196 self.assertRaises(arvados.errors.KeepReadError,
198 unsigned_bar_locator)
200 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
201 class KeepProxyTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
205 KEEP_PROXY_SERVER = {}
209 super(KeepProxyTestCase, cls).setUpClass()
210 run_test_server.authorize_with('active')
211 cls.api_client = arvados.api('v1')
214 super(KeepProxyTestCase, self).tearDown()
215 DiskCacheBase.tearDown(self)
217 def test_KeepProxyTest1(self):
218 # Will use ARVADOS_KEEP_SERVICES environment variable that
219 # is set by setUpClass().
220 keep_client = arvados.KeepClient(api_client=self.api_client,
221 local_store='', block_cache=self.make_block_cache(self.disk_cache))
222 baz_locator = keep_client.put('baz')
225 '^73feffa4b7f6bb68e44cf984c85f6e88\+3',
226 'wrong md5 hash from Keep.put("baz"): ' + baz_locator)
227 self.assertEqual(keep_client.get(baz_locator),
229 'wrong content from Keep.get(md5("baz"))')
230 self.assertTrue(keep_client.using_proxy)
232 def test_KeepProxyTestMultipleURIs(self):
233 # Test using ARVADOS_KEEP_SERVICES env var overriding any
234 # existing proxy setting and setting multiple proxies
235 arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
236 keep_client = arvados.KeepClient(api_client=self.api_client,
238 block_cache=self.make_block_cache(self.disk_cache))
239 uris = [x['_service_root'] for x in keep_client._keep_services]
240 self.assertEqual(uris, ['http://10.0.0.1/',
241 'https://foo.example.org:1234/'])
243 def test_KeepProxyTestInvalidURI(self):
244 arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org'
245 with self.assertRaises(arvados.errors.ArgumentError):
246 keep_client = arvados.KeepClient(api_client=self.api_client,
248 block_cache=self.make_block_cache(self.disk_cache))
250 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
251 class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
255 DiskCacheBase.tearDown(self)
257 def get_service_roots(self, api_client):
258 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
259 services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
260 return [urllib.parse.urlparse(url) for url in sorted(services)]
262 def test_ssl_flag_respected_in_roots(self):
263 for ssl_flag in [False, True]:
264 services = self.get_service_roots(self.mock_keep_services(
265 service_ssl_flag=ssl_flag))
267 ('https' if ssl_flag else 'http'), services[0].scheme)
269 def test_correct_ports_with_ipv6_addresses(self):
270 service = self.get_service_roots(self.mock_keep_services(
271 service_type='proxy', service_host='100::1', service_port=10, count=1))[0]
272 self.assertEqual('100::1', service.hostname)
273 self.assertEqual(10, service.port)
275 def test_recognize_proxy_services_in_controller_response(self):
276 keep_client = arvados.KeepClient(api_client=self.mock_keep_services(
277 service_type='proxy', service_host='localhost', service_port=9, count=1),
278 block_cache=self.make_block_cache(self.disk_cache))
280 # this will fail, but it ensures we get the service
282 keep_client.put('baz2', num_retries=0)
285 self.assertTrue(keep_client.using_proxy)
287 def test_insecure_disables_tls_verify(self):
288 api_client = self.mock_keep_services(count=1)
289 force_timeout = socket.timeout("timed out")
291 api_client.insecure = True
292 with tutil.mock_keep_responses(b'foo', 200) as mock:
293 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
294 keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
296 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
299 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
302 api_client.insecure = False
303 with tutil.mock_keep_responses(b'foo', 200) as mock:
304 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
305 keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
306 # getopt()==None here means we didn't change the
307 # default. If we were using real pycurl instead of a mock,
308 # it would return the default value 1.
310 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
313 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
316 def test_refresh_signature(self):
317 blk_digest = '6f5902ac237024bdd0c176cb93063dc4+11'
318 blk_sig = 'da39a3ee5e6b4b0d3255bfef95601890afd80709@53bed294'
319 local_loc = blk_digest+'+A'+blk_sig
320 remote_loc = blk_digest+'+R'+blk_sig
321 api_client = self.mock_keep_services(count=1)
322 headers = {'X-Keep-Locator':local_loc}
323 with tutil.mock_keep_responses('', 200, **headers):
324 # Check that the translated locator gets returned
325 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
326 self.assertEqual(local_loc, keep_client.refresh_signature(remote_loc))
327 # Check that refresh_signature() uses the correct method and headers
328 keep_client._get_or_head = mock.MagicMock()
329 keep_client.refresh_signature(remote_loc)
330 args, kwargs = keep_client._get_or_head.call_args_list[0]
331 self.assertIn(remote_loc, args)
332 self.assertEqual("HEAD", kwargs['method'])
333 self.assertIn('X-Keep-Signature', kwargs['headers'])
335 # test_*_timeout verify that KeepClient instructs pycurl to use
336 # the appropriate connection and read timeouts. They don't care
337 # whether pycurl actually exhibits the expected timeout behavior
338 # -- those tests are in the KeepClientTimeout test class.
340 def test_get_timeout(self):
341 api_client = self.mock_keep_services(count=1)
342 force_timeout = socket.timeout("timed out")
343 with tutil.mock_keep_responses(force_timeout, 0) as mock:
344 keep_client = arvados.KeepClient(
345 api_client=api_client,
346 block_cache=self.make_block_cache(self.disk_cache),
349 with self.assertRaises(arvados.errors.KeepReadError):
350 keep_client.get('ffffffffffffffffffffffffffffffff')
352 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
353 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
355 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
356 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
358 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
359 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
361 def test_put_timeout(self):
362 api_client = self.mock_keep_services(count=1)
363 force_timeout = socket.timeout("timed out")
364 with tutil.mock_keep_responses(force_timeout, 0) as mock:
365 keep_client = arvados.KeepClient(
366 api_client=api_client,
367 block_cache=self.make_block_cache(self.disk_cache),
370 with self.assertRaises(arvados.errors.KeepWriteError):
371 keep_client.put(b'foo')
373 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
374 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
376 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
377 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
379 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
380 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
382 def test_head_timeout(self):
383 api_client = self.mock_keep_services(count=1)
384 force_timeout = socket.timeout("timed out")
385 with tutil.mock_keep_responses(force_timeout, 0) as mock:
386 keep_client = arvados.KeepClient(
387 api_client=api_client,
388 block_cache=self.make_block_cache(self.disk_cache),
391 with self.assertRaises(arvados.errors.KeepReadError):
392 keep_client.head('ffffffffffffffffffffffffffffffff')
394 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
395 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
397 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
400 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
403 def test_proxy_get_timeout(self):
404 api_client = self.mock_keep_services(service_type='proxy', count=1)
405 force_timeout = socket.timeout("timed out")
406 with tutil.mock_keep_responses(force_timeout, 0) as mock:
407 keep_client = arvados.KeepClient(
408 api_client=api_client,
409 block_cache=self.make_block_cache(self.disk_cache),
412 with self.assertRaises(arvados.errors.KeepReadError):
413 keep_client.get('ffffffffffffffffffffffffffffffff')
415 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
416 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
418 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
419 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
421 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
422 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
424 def test_proxy_head_timeout(self):
425 api_client = self.mock_keep_services(service_type='proxy', count=1)
426 force_timeout = socket.timeout("timed out")
427 with tutil.mock_keep_responses(force_timeout, 0) as mock:
428 keep_client = arvados.KeepClient(
429 api_client=api_client,
430 block_cache=self.make_block_cache(self.disk_cache),
433 with self.assertRaises(arvados.errors.KeepReadError):
434 keep_client.head('ffffffffffffffffffffffffffffffff')
436 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
437 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
439 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
442 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
445 def test_proxy_put_timeout(self):
446 self.disk_cache_dir = None
447 api_client = self.mock_keep_services(service_type='proxy', count=1)
448 force_timeout = socket.timeout("timed out")
449 with tutil.mock_keep_responses(force_timeout, 0) as mock:
450 keep_client = arvados.KeepClient(
451 api_client=api_client,
454 with self.assertRaises(arvados.errors.KeepWriteError):
455 keep_client.put('foo')
457 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
458 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
460 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
461 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
463 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
464 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
466 def check_no_services_error(self, verb, exc_class):
467 api_client = mock.MagicMock(name='api_client')
468 api_client.keep_services().accessible().execute.side_effect = (
469 arvados.errors.ApiError)
470 keep_client = arvados.KeepClient(
471 api_client=api_client,
472 block_cache=self.make_block_cache(self.disk_cache),
475 with self.assertRaises(exc_class) as err_check:
476 getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
477 self.assertEqual(0, len(err_check.exception.request_errors()))
479 def test_get_error_with_no_services(self):
480 self.check_no_services_error('get', arvados.errors.KeepReadError)
482 def test_head_error_with_no_services(self):
483 self.check_no_services_error('head', arvados.errors.KeepReadError)
485 def test_put_error_with_no_services(self):
486 self.check_no_services_error('put', arvados.errors.KeepWriteError)
488 def check_errors_from_last_retry(self, verb, exc_class):
489 api_client = self.mock_keep_services(count=2)
490 req_mock = tutil.mock_keep_responses(
491 "retry error reporting test", 500, 500, 500, 500, 500, 500, 502, 502)
492 with req_mock, tutil.skip_sleep, \
493 self.assertRaises(exc_class) as err_check:
494 keep_client = arvados.KeepClient(
495 api_client=api_client,
496 block_cache=self.make_block_cache(self.disk_cache),
499 getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
501 self.assertEqual([502, 502], [
502 getattr(error, 'status_code', None)
503 for error in err_check.exception.request_errors().values()])
504 self.assertRegex(str(err_check.exception), r'failed to (read|write) .* after 4 attempts')
506 def test_get_error_reflects_last_retry(self):
507 self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
509 def test_head_error_reflects_last_retry(self):
510 self.check_errors_from_last_retry('head', arvados.errors.KeepReadError)
512 def test_put_error_reflects_last_retry(self):
513 self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
515 def test_put_error_does_not_include_successful_puts(self):
516 data = 'partial failure test'
517 data_loc = tutil.str_keep_locator(data)
518 api_client = self.mock_keep_services(count=3)
519 with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
520 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
521 keep_client = arvados.KeepClient(
522 api_client=api_client,
523 block_cache=self.make_block_cache(self.disk_cache),
526 keep_client.put(data)
527 self.assertEqual(2, len(exc_check.exception.request_errors()))
529 def test_proxy_put_with_no_writable_services(self):
530 data = 'test with no writable services'
531 data_loc = tutil.str_keep_locator(data)
532 api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
533 with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
534 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
535 keep_client = arvados.KeepClient(
536 api_client=api_client,
537 block_cache=self.make_block_cache(self.disk_cache),
540 keep_client.put(data)
541 self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
542 self.assertEqual(0, len(exc_check.exception.request_errors()))
544 def test_oddball_service_get(self):
545 body = b'oddball service get'
546 api_client = self.mock_keep_services(service_type='fancynewblobstore')
547 with tutil.mock_keep_responses(body, 200):
548 keep_client = arvados.KeepClient(
549 api_client=api_client,
550 block_cache=self.make_block_cache(self.disk_cache),
553 actual = keep_client.get(tutil.str_keep_locator(body))
554 self.assertEqual(body, actual)
556 def test_oddball_service_put(self):
557 body = b'oddball service put'
558 pdh = tutil.str_keep_locator(body)
559 api_client = self.mock_keep_services(service_type='fancynewblobstore')
560 with tutil.mock_keep_responses(pdh, 200):
561 keep_client = arvados.KeepClient(
562 api_client=api_client,
563 block_cache=self.make_block_cache(self.disk_cache),
566 actual = keep_client.put(body, copies=1)
567 self.assertEqual(pdh, actual)
569 def test_oddball_service_writer_count(self):
570 body = b'oddball service writer count'
571 pdh = tutil.str_keep_locator(body)
572 api_client = self.mock_keep_services(service_type='fancynewblobstore',
574 headers = {'x-keep-replicas-stored': 3}
575 with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
576 **headers) as req_mock:
577 keep_client = arvados.KeepClient(
578 api_client=api_client,
579 block_cache=self.make_block_cache(self.disk_cache),
582 actual = keep_client.put(body, copies=2)
583 self.assertEqual(pdh, actual)
584 self.assertEqual(1, req_mock.call_count)
587 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
588 class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
592 self.api_client = self.mock_keep_services(count=2)
593 self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
595 self.locator = '1271ed5ef305aadabc605b1609e24c52'
598 DiskCacheBase.tearDown(self)
600 @mock.patch('arvados.KeepClient.KeepService.get')
601 def test_get_request_cache(self, get_mock):
602 with tutil.mock_keep_responses(self.data, 200, 200):
603 self.keep_client.get(self.locator)
604 self.keep_client.get(self.locator)
605 # Request already cached, don't require more than one request
606 get_mock.assert_called_once()
608 @mock.patch('arvados.KeepClient.KeepService.get')
609 def test_head_request_cache(self, get_mock):
610 with tutil.mock_keep_responses(self.data, 200, 200):
611 self.keep_client.head(self.locator)
612 self.keep_client.head(self.locator)
613 # Don't cache HEAD requests so that they're not confused with GET reqs
614 self.assertEqual(2, get_mock.call_count)
616 @mock.patch('arvados.KeepClient.KeepService.get')
617 def test_head_and_then_get_return_different_responses(self, get_mock):
620 get_mock.side_effect = [b'first response', b'second response']
621 with tutil.mock_keep_responses(self.data, 200, 200):
622 head_resp = self.keep_client.head(self.locator)
623 get_resp = self.keep_client.get(self.locator)
624 self.assertEqual(b'first response', head_resp)
625 # First reponse was not cached because it was from a HEAD request.
626 self.assertNotEqual(head_resp, get_resp)
633 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
634 class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
638 self.api_client = self.mock_keep_services(count=2)
639 self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
641 self.locator = '1271ed5ef305aadabc605b1609e24c52'
642 self.test_id = arvados.util.new_request_id()
643 self.assertRegex(self.test_id, r'^req-[a-z0-9]{20}$')
644 # If we don't set request_id to None explicitly here, it will
645 # return <MagicMock name='api_client_mock.request_id'
647 self.api_client.request_id = None
650 DiskCacheBase.tearDown(self)
652 def test_default_to_api_client_request_id(self):
653 self.api_client.request_id = self.test_id
654 with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
655 self.keep_client.put(self.data)
656 self.assertEqual(2, len(mock.responses))
657 for resp in mock.responses:
658 self.assertProvidedRequestId(resp)
660 with tutil.mock_keep_responses(self.data, 200) as mock:
661 self.keep_client.get(self.locator)
662 self.assertProvidedRequestId(mock.responses[0])
664 with tutil.mock_keep_responses(b'', 200) as mock:
665 self.keep_client.head(self.locator)
666 self.assertProvidedRequestId(mock.responses[0])
668 def test_explicit_request_id(self):
669 with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
670 self.keep_client.put(self.data, request_id=self.test_id)
671 self.assertEqual(2, len(mock.responses))
672 for resp in mock.responses:
673 self.assertProvidedRequestId(resp)
675 with tutil.mock_keep_responses(self.data, 200) as mock:
676 self.keep_client.get(self.locator, request_id=self.test_id)
677 self.assertProvidedRequestId(mock.responses[0])
679 with tutil.mock_keep_responses(b'', 200) as mock:
680 self.keep_client.head(self.locator, request_id=self.test_id)
681 self.assertProvidedRequestId(mock.responses[0])
683 def test_automatic_request_id(self):
684 with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
685 self.keep_client.put(self.data)
686 self.assertEqual(2, len(mock.responses))
687 for resp in mock.responses:
688 self.assertAutomaticRequestId(resp)
690 with tutil.mock_keep_responses(self.data, 200) as mock:
691 self.keep_client.get(self.locator)
692 self.assertAutomaticRequestId(mock.responses[0])
694 with tutil.mock_keep_responses(b'', 200) as mock:
695 self.keep_client.head(self.locator)
696 self.assertAutomaticRequestId(mock.responses[0])
698 def test_request_id_in_exception(self):
699 with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
700 with self.assertRaisesRegex(arvados.errors.KeepReadError, self.test_id):
701 self.keep_client.head(self.locator, request_id=self.test_id)
703 with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
704 with self.assertRaisesRegex(arvados.errors.KeepReadError, r'req-[a-z0-9]{20}'):
705 self.keep_client.get(self.locator)
707 with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
708 with self.assertRaisesRegex(arvados.errors.KeepWriteError, self.test_id):
709 self.keep_client.put(self.data, request_id=self.test_id)
711 with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
712 with self.assertRaisesRegex(arvados.errors.KeepWriteError, r'req-[a-z0-9]{20}'):
713 self.keep_client.put(self.data)
715 def assertAutomaticRequestId(self, resp):
716 hdr = [x for x in resp.getopt(pycurl.HTTPHEADER)
717 if x.startswith('X-Request-Id: ')][0]
718 self.assertNotEqual(hdr, 'X-Request-Id: '+self.test_id)
719 self.assertRegex(hdr, r'^X-Request-Id: req-[a-z0-9]{20}$')
721 def assertProvidedRequestId(self, resp):
722 self.assertIn('X-Request-Id: '+self.test_id,
723 resp.getopt(pycurl.HTTPHEADER))
727 #@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
728 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
732 # expected_order[i] is the probe order for
733 # hash=md5(sprintf("%064x",i)) where there are 16 services
734 # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
735 # the first probe for the block consisting of 64 "0"
736 # characters is the service whose uuid is
737 # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
739 self.expected_order = [
740 list('3eab2d5fc9681074'),
741 list('097dba52e648f1c3'),
742 list('c5b4e023f8a7d691'),
743 list('9d81c02e76a3bf54'),
746 "{:064x}".format(x).encode()
747 for x in range(len(self.expected_order))]
749 hashlib.md5(self.blocks[x]).hexdigest()
750 for x in range(len(self.expected_order))]
751 self.api_client = self.mock_keep_services(count=self.services)
752 self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
755 DiskCacheBase.tearDown(self)
757 def test_weighted_service_roots_against_reference_set(self):
758 # Confirm weighted_service_roots() returns the correct order
759 for i, hash in enumerate(self.hashes):
760 roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash))
762 re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
764 self.assertEqual(self.expected_order[i], got_order)
766 def test_get_probe_order_against_reference_set(self):
767 self._test_probe_order_against_reference_set(
768 lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
770 def test_head_probe_order_against_reference_set(self):
771 self._test_probe_order_against_reference_set(
772 lambda i: self.keep_client.head(self.hashes[i], num_retries=1))
774 def test_put_probe_order_against_reference_set(self):
775 # copies=1 prevents the test from being sensitive to races
776 # between writer threads.
777 self._test_probe_order_against_reference_set(
778 lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1))
780 def _test_probe_order_against_reference_set(self, op):
781 for i in range(len(self.blocks)):
782 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \
783 self.assertRaises(arvados.errors.KeepRequestError):
786 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
787 for resp in mock.responses]
788 self.assertEqual(self.expected_order[i]*2, got_order)
790 def test_put_probe_order_multiple_copies(self):
791 for copies in range(2, 4):
792 for i in range(len(self.blocks)):
793 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \
794 self.assertRaises(arvados.errors.KeepWriteError):
795 self.keep_client.put(self.blocks[i], num_retries=2, copies=copies)
797 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
798 for resp in mock.responses]
799 # With T threads racing to make requests, the position
800 # of a given server in the sequence of HTTP requests
801 # (got_order) cannot be more than T-1 positions
802 # earlier than that server's position in the reference
803 # probe sequence (expected_order).
805 # Loop invariant: we have accounted for +pos+ expected
806 # probes, either by seeing them in +got_order+ or by
807 # putting them in +pending+ in the hope of seeing them
808 # later. As long as +len(pending)<T+, we haven't
809 # started a request too early.
811 for pos, expected in enumerate(self.expected_order[i]*3):
812 got = got_order[pos-len(pending)]
813 while got in pending:
814 del pending[pending.index(got)]
815 got = got_order[pos-len(pending)]
817 pending.append(expected)
819 len(pending), copies,
820 "pending={}, with copies={}, got {}, expected {}".format(
821 pending, copies, repr(got_order), repr(self.expected_order[i]*3)))
823 def test_probe_waste_adding_one_server(self):
825 hashlib.md5("{:064x}".format(x).encode()).hexdigest() for x in range(100)]
826 initial_services = 12
827 self.api_client = self.mock_keep_services(count=initial_services)
828 self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
830 self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
831 for added_services in range(1, 12):
832 api_client = self.mock_keep_services(count=initial_services+added_services)
833 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
835 for hash_index in range(len(hashes)):
836 probe_after = keep_client.weighted_service_roots(
837 arvados.KeepLocator(hashes[hash_index]))
838 penalty = probe_after.index(probes_before[hash_index][0])
839 self.assertLessEqual(penalty, added_services)
840 total_penalty += penalty
841 # Average penalty per block should not exceed
842 # N(added)/N(orig) by more than 20%, and should get closer
843 # to the ideal as we add data points.
846 len(hashes) / initial_services)
849 (120 - added_services)/100)
851 expect_penalty * 8/10)
853 min_penalty <= total_penalty <= max_penalty,
854 "With {}+{} services, {} blocks, penalty {} but expected {}..{}".format(
862 def check_64_zeros_error_order(self, verb, exc_class):
865 data = tutil.str_keep_locator(data)
866 # Arbitrary port number:
867 aport = random.randint(1024,65535)
868 api_client = self.mock_keep_services(service_port=aport, count=self.services)
869 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
870 with mock.patch('pycurl.Curl') as curl_mock, \
871 self.assertRaises(exc_class) as err_check:
872 curl_mock.return_value = tutil.FakeCurl.make(code=500, body=b'')
873 getattr(keep_client, verb)(data)
874 urls = [urllib.parse.urlparse(url)
875 for url in err_check.exception.request_errors()]
876 self.assertEqual([('keep0x' + c, aport) for c in '3eab2d5fc9681074'],
877 [(url.hostname, url.port) for url in urls])
879 def test_get_error_shows_probe_order(self):
880 self.check_64_zeros_error_order('get', arvados.errors.KeepReadError)
882 def test_put_error_shows_probe_order(self):
883 self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
885 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
886 class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase, DiskCacheBase):
889 # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
890 # 1s worth of data and then trigger bandwidth errors before running
893 BANDWIDTH_LOW_LIM = 1024
897 DiskCacheBase.tearDown(self)
899 class assertTakesBetween(unittest.TestCase):
900 def __init__(self, tmin, tmax):
905 self.t0 = time.time()
907 def __exit__(self, *args, **kwargs):
908 # Round times to milliseconds, like CURL. Otherwise, we
909 # fail when CURL reaches a 1s timeout at 0.9998s.
910 delta = round(time.time() - self.t0, 3)
911 self.assertGreaterEqual(delta, self.tmin)
912 self.assertLessEqual(delta, self.tmax)
914 class assertTakesGreater(unittest.TestCase):
915 def __init__(self, tmin):
919 self.t0 = time.time()
921 def __exit__(self, *args, **kwargs):
922 delta = round(time.time() - self.t0, 3)
923 self.assertGreaterEqual(delta, self.tmin)
925 def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
926 return arvados.KeepClient(
927 api_client=self.api_client,
928 timeout=timeouts, block_cache=self.make_block_cache(self.disk_cache))
930 def test_timeout_slow_connect(self):
931 # Can't simulate TCP delays with our own socket. Leave our
932 # stub server running uselessly, and try to connect to an
933 # unroutable IP address instead.
934 self.api_client = self.mock_keep_services(
936 service_host='240.0.0.0',
938 with self.assertTakesBetween(0.1, 0.5):
939 with self.assertRaises(arvados.errors.KeepWriteError):
940 self.keepClient().put(self.DATA, copies=1, num_retries=0)
942 def test_low_bandwidth_no_delays_success(self):
943 self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
944 kc = self.keepClient()
945 loc = kc.put(self.DATA, copies=1, num_retries=0)
946 self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
948 def test_too_low_bandwidth_no_delays_failure(self):
949 # Check that lessening bandwidth corresponds to failing
950 kc = self.keepClient()
951 loc = kc.put(self.DATA, copies=1, num_retries=0)
952 self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
953 with self.assertTakesGreater(self.TIMEOUT_TIME):
954 with self.assertRaises(arvados.errors.KeepReadError):
955 kc.get(loc, num_retries=0)
956 with self.assertTakesGreater(self.TIMEOUT_TIME):
957 with self.assertRaises(arvados.errors.KeepWriteError):
958 kc.put(self.DATA, copies=1, num_retries=0)
960 def test_low_bandwidth_with_server_response_delay_failure(self):
961 kc = self.keepClient()
962 loc = kc.put(self.DATA, copies=1, num_retries=0)
963 self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
964 # Note the actual delay must be 1s longer than the low speed
965 # limit interval in order for curl to detect it reliably.
966 self.server.setdelays(response=self.TIMEOUT_TIME+1)
967 with self.assertTakesGreater(self.TIMEOUT_TIME):
968 with self.assertRaises(arvados.errors.KeepReadError):
969 kc.get(loc, num_retries=0)
970 with self.assertTakesGreater(self.TIMEOUT_TIME):
971 with self.assertRaises(arvados.errors.KeepWriteError):
972 kc.put(self.DATA, copies=1, num_retries=0)
973 with self.assertTakesGreater(self.TIMEOUT_TIME):
974 kc.head(loc, num_retries=0)
976 def test_low_bandwidth_with_server_mid_delay_failure(self):
977 kc = self.keepClient()
978 loc = kc.put(self.DATA, copies=1, num_retries=0)
979 self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
980 # Note the actual delay must be 1s longer than the low speed
981 # limit interval in order for curl to detect it reliably.
982 self.server.setdelays(mid_write=self.TIMEOUT_TIME+1, mid_read=self.TIMEOUT_TIME+1)
983 with self.assertTakesGreater(self.TIMEOUT_TIME):
984 with self.assertRaises(arvados.errors.KeepReadError) as e:
985 kc.get(loc, num_retries=0)
986 with self.assertTakesGreater(self.TIMEOUT_TIME):
987 with self.assertRaises(arvados.errors.KeepWriteError):
988 kc.put(self.DATA, copies=1, num_retries=0)
990 def test_timeout_slow_request(self):
991 loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
992 self.server.setdelays(request=.2)
993 self._test_connect_timeout_under_200ms(loc)
994 self.server.setdelays(request=2)
995 self._test_response_timeout_under_2s(loc)
997 def test_timeout_slow_response(self):
998 loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
999 self.server.setdelays(response=.2)
1000 self._test_connect_timeout_under_200ms(loc)
1001 self.server.setdelays(response=2)
1002 self._test_response_timeout_under_2s(loc)
1004 def test_timeout_slow_response_body(self):
1005 loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
1006 self.server.setdelays(response_body=.2)
1007 self._test_connect_timeout_under_200ms(loc)
1008 self.server.setdelays(response_body=2)
1009 self._test_response_timeout_under_2s(loc)
1011 def _test_connect_timeout_under_200ms(self, loc):
1012 # Allow 100ms to connect, then 1s for response. Everything
1013 # should work, and everything should take at least 200ms to
1015 kc = self.keepClient(timeouts=(.1, 1))
1016 with self.assertTakesBetween(.2, .3):
1017 kc.put(self.DATA, copies=1, num_retries=0)
1018 with self.assertTakesBetween(.2, .3):
1019 self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
1021 def _test_response_timeout_under_2s(self, loc):
1022 # Allow 10s to connect, then 1s for response. Nothing should
1023 # work, and everything should take at least 1s to return.
1024 kc = self.keepClient(timeouts=(10, 1))
1025 with self.assertTakesBetween(1, 9):
1026 with self.assertRaises(arvados.errors.KeepReadError):
1027 kc.get(loc, num_retries=0)
1028 with self.assertTakesBetween(1, 9):
1029 with self.assertRaises(arvados.errors.KeepWriteError):
1030 kc.put(self.DATA, copies=1, num_retries=0)
1032 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1033 class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
1037 DiskCacheBase.tearDown(self)
1039 def mock_disks_and_gateways(self, disks=3, gateways=1):
1041 'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
1042 'owner_uuid': 'zzzzz-tpzed-000000000000000',
1043 'service_host': 'gatewayhost{}'.format(i),
1044 'service_port': 12345,
1045 'service_ssl_flag': True,
1046 'service_type': 'gateway:test',
1047 } for i in range(gateways)]
1048 self.gateway_roots = [
1049 "https://{service_host}:{service_port}/".format(**gw)
1050 for gw in self.gateways]
1051 self.api_client = self.mock_keep_services(
1052 count=disks, additional_services=self.gateways)
1053 self.keepClient = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
1055 @mock.patch('pycurl.Curl')
1056 def test_get_with_gateway_hint_first(self, MockCurl):
1057 MockCurl.return_value = tutil.FakeCurl.make(
1058 code=200, body='foo', headers={'Content-Length': 3})
1059 self.mock_disks_and_gateways()
1060 locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@' + self.gateways[0]['uuid']
1061 self.assertEqual(b'foo', self.keepClient.get(locator))
1062 self.assertEqual(self.gateway_roots[0]+locator,
1063 MockCurl.return_value.getopt(pycurl.URL).decode())
1064 self.assertEqual(True, self.keepClient.head(locator))
1066 @mock.patch('pycurl.Curl')
1067 def test_get_with_gateway_hints_in_order(self, MockCurl):
1071 tutil.FakeCurl.make(code=404, body='')
1072 for _ in range(gateways+disks)
1074 MockCurl.side_effect = tutil.queue_with(mocks)
1075 self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1076 locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1077 ['K@'+gw['uuid'] for gw in self.gateways])
1078 with self.assertRaises(arvados.errors.NotFoundError):
1079 self.keepClient.get(locator)
1080 # Gateways are tried first, in the order given.
1081 for i, root in enumerate(self.gateway_roots):
1082 self.assertEqual(root+locator,
1083 mocks[i].getopt(pycurl.URL).decode())
1084 # Disk services are tried next.
1085 for i in range(gateways, gateways+disks):
1087 mocks[i].getopt(pycurl.URL).decode(),
1090 @mock.patch('pycurl.Curl')
1091 def test_head_with_gateway_hints_in_order(self, MockCurl):
1095 tutil.FakeCurl.make(code=404, body=b'')
1096 for _ in range(gateways+disks)
1098 MockCurl.side_effect = tutil.queue_with(mocks)
1099 self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1100 locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1101 ['K@'+gw['uuid'] for gw in self.gateways])
1102 with self.assertRaises(arvados.errors.NotFoundError):
1103 self.keepClient.head(locator)
1104 # Gateways are tried first, in the order given.
1105 for i, root in enumerate(self.gateway_roots):
1106 self.assertEqual(root+locator,
1107 mocks[i].getopt(pycurl.URL).decode())
1108 # Disk services are tried next.
1109 for i in range(gateways, gateways+disks):
1111 mocks[i].getopt(pycurl.URL).decode(),
1114 @mock.patch('pycurl.Curl')
1115 def test_get_with_remote_proxy_hint(self, MockCurl):
1116 MockCurl.return_value = tutil.FakeCurl.make(
1117 code=200, body=b'foo', headers={'Content-Length': 3})
1118 self.mock_disks_and_gateways()
1119 locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1120 self.assertEqual(b'foo', self.keepClient.get(locator))
1121 self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1122 MockCurl.return_value.getopt(pycurl.URL).decode())
1124 @mock.patch('pycurl.Curl')
1125 def test_head_with_remote_proxy_hint(self, MockCurl):
1126 MockCurl.return_value = tutil.FakeCurl.make(
1127 code=200, body=b'foo', headers={'Content-Length': 3})
1128 self.mock_disks_and_gateways()
1129 locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1130 self.assertEqual(True, self.keepClient.head(locator))
1131 self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1132 MockCurl.return_value.getopt(pycurl.URL).decode())
1134 class KeepClientRetryTestMixin(object):
1137 # Testing with a local Keep store won't exercise the retry behavior.
1138 # Instead, our strategy is:
1139 # * Create a client with one proxy specified (pointed at a black
1140 # hole), so there's no need to instantiate an API client, and
1141 # all HTTP requests come from one place.
1142 # * Mock httplib's request method to provide simulated responses.
1143 # This lets us test the retry logic extensively without relying on any
1144 # supporting servers, and prevents side effects in case something hiccups.
1145 # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
1148 # Test classes must define TEST_PATCHER to a method that mocks
1149 # out appropriate methods in the client.
1151 PROXY_ADDR = 'http://[%s]:65535/' % (tutil.TEST_HOST,)
1152 TEST_DATA = b'testdata'
1153 TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
1156 self.client_kwargs = {'proxy': self.PROXY_ADDR, 'local_store': ''}
1158 def new_client(self, **caller_kwargs):
1159 kwargs = self.client_kwargs.copy()
1160 kwargs.update(caller_kwargs)
1161 kwargs['block_cache'] = self.make_block_cache(self.disk_cache)
1162 return arvados.KeepClient(**kwargs)
1164 def run_method(self, *args, **kwargs):
1165 raise NotImplementedError("test subclasses must define run_method")
1167 def check_success(self, expected=None, *args, **kwargs):
1168 if expected is None:
1169 expected = self.DEFAULT_EXPECT
1170 self.assertEqual(expected, self.run_method(*args, **kwargs))
1172 def check_exception(self, error_class=None, *args, **kwargs):
1173 if error_class is None:
1174 error_class = self.DEFAULT_EXCEPTION
1175 with self.assertRaises(error_class) as err:
1176 self.run_method(*args, **kwargs)
1179 def test_immediate_success(self):
1180 with self.TEST_PATCHER(self.DEFAULT_EXPECT, 200):
1181 self.check_success()
1183 def test_retry_then_success(self):
1184 with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1185 self.check_success(num_retries=3)
1187 def test_exception_then_success(self):
1188 with self.TEST_PATCHER(self.DEFAULT_EXPECT, Exception('mock err'), 200):
1189 self.check_success(num_retries=3)
1191 def test_no_retry_after_permanent_error(self):
1192 with self.TEST_PATCHER(self.DEFAULT_EXPECT, 403, 200):
1193 self.check_exception(num_retries=3)
1195 def test_error_after_retries_exhausted(self):
1196 with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 500, 200):
1197 err = self.check_exception(num_retries=1)
1198 self.assertRegex(str(err.exception), r'failed to .* after 2 attempts')
1200 def test_num_retries_instance_fallback(self):
1201 self.client_kwargs['num_retries'] = 3
1202 with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1203 self.check_success()
1207 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1208 class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1209 DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
1210 DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1211 HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1212 TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1215 DiskCacheBase.tearDown(self)
1217 def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1219 return self.new_client().get(locator, *args, **kwargs)
1221 def test_specific_exception_when_not_found(self):
1222 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1223 self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1225 def test_general_exception_with_mixed_errors(self):
1226 # get should raise a NotFoundError if no server returns the block,
1227 # and a high threshold of servers report that it's not found.
1228 # This test rigs up 50/50 disagreement between two servers, and
1229 # checks that it does not become a NotFoundError.
1230 client = self.new_client(num_retries=0)
1231 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1232 with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1233 client.get(self.HINTED_LOCATOR)
1234 self.assertNotIsInstance(
1235 exc_check.exception, arvados.errors.NotFoundError,
1236 "mixed errors raised NotFoundError")
1238 def test_hint_server_can_succeed_without_retries(self):
1239 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1240 self.check_success(locator=self.HINTED_LOCATOR)
1242 def test_try_next_server_after_timeout(self):
1243 with tutil.mock_keep_responses(
1244 (socket.timeout("timed out"), 200),
1245 (self.DEFAULT_EXPECT, 200)):
1246 self.check_success(locator=self.HINTED_LOCATOR)
1248 def test_retry_data_with_wrong_checksum(self):
1249 with tutil.mock_keep_responses(
1251 (self.DEFAULT_EXPECT, 200)):
1252 self.check_success(locator=self.HINTED_LOCATOR)
1255 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1256 class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1257 DEFAULT_EXPECT = True
1258 DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1259 HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1260 TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1263 DiskCacheBase.tearDown(self)
1265 def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1267 return self.new_client().head(locator, *args, **kwargs)
1269 def test_specific_exception_when_not_found(self):
1270 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1271 self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1273 def test_general_exception_with_mixed_errors(self):
1274 # head should raise a NotFoundError if no server returns the block,
1275 # and a high threshold of servers report that it's not found.
1276 # This test rigs up 50/50 disagreement between two servers, and
1277 # checks that it does not become a NotFoundError.
1278 client = self.new_client(num_retries=0)
1279 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1280 with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1281 client.head(self.HINTED_LOCATOR)
1282 self.assertNotIsInstance(
1283 exc_check.exception, arvados.errors.NotFoundError,
1284 "mixed errors raised NotFoundError")
1286 def test_hint_server_can_succeed_without_retries(self):
1287 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1288 self.check_success(locator=self.HINTED_LOCATOR)
1290 def test_try_next_server_after_timeout(self):
1291 with tutil.mock_keep_responses(
1292 (socket.timeout("timed out"), 200),
1293 (self.DEFAULT_EXPECT, 200)):
1294 self.check_success(locator=self.HINTED_LOCATOR)
1297 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1298 class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1299 DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
1300 DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
1301 TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1304 DiskCacheBase.tearDown(self)
1306 def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
1307 copies=1, *args, **kwargs):
1308 return self.new_client().put(data, copies, *args, **kwargs)
1310 def test_do_not_send_multiple_copies_to_same_server(self):
1311 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200):
1312 self.check_exception(copies=2, num_retries=3)
1315 class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
1317 class FakeKeepService(object):
1318 def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
1320 self.will_succeed = will_succeed
1321 self.will_raise = will_raise
1323 self._result['headers'] = {}
1324 self._result['headers']['x-keep-replicas-stored'] = str(replicas)
1325 self._result['headers']['x-keep-storage-classes-confirmed'] = 'default={}'.format(replicas)
1326 self._result['body'] = 'foobar'
1328 def put(self, data_hash, data, timeout, headers):
1329 time.sleep(self.delay)
1330 if self.will_raise is not None:
1331 raise self.will_raise
1332 return self.will_succeed
1334 def last_result(self):
1335 if self.will_succeed:
1338 return {"status_code": 500, "body": "didn't succeed"}
1345 self.pool = arvados.KeepClient.KeepWriterThreadPool(
1347 data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1348 max_service_replicas = self.copies,
1349 copies = self.copies
1352 def test_only_write_enough_on_success(self):
1354 ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1355 self.pool.add_task(ks, None)
1357 self.assertEqual(self.pool.done(), (self.copies, []))
1359 def test_only_write_enough_on_partial_success(self):
1361 ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
1362 self.pool.add_task(ks, None)
1363 ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1364 self.pool.add_task(ks, None)
1366 self.assertEqual(self.pool.done(), (self.copies, []))
1368 def test_only_write_enough_when_some_crash(self):
1370 ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1371 self.pool.add_task(ks, None)
1372 ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1373 self.pool.add_task(ks, None)
1375 self.assertEqual(self.pool.done(), (self.copies, []))
1377 def test_fail_when_too_many_crash(self):
1378 for i in range(self.copies+1):
1379 ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1380 self.pool.add_task(ks, None)
1381 for i in range(self.copies-1):
1382 ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1383 self.pool.add_task(ks, None)
1385 self.assertEqual(self.pool.done(), (self.copies-1, []))
1389 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1390 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
1393 # Test put()s that need two distinct servers to succeed, possibly
1394 # requiring multiple passes through the retry loop.
1397 self.api_client = self.mock_keep_services(count=2)
1398 self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
1401 DiskCacheBase.tearDown(self)
1403 def test_success_after_exception(self):
1404 with tutil.mock_keep_responses(
1405 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1406 Exception('mock err'), 200, 200) as req_mock:
1407 self.keep_client.put('foo', num_retries=1, copies=2)
1408 self.assertEqual(3, req_mock.call_count)
1410 def test_success_after_retryable_error(self):
1411 with tutil.mock_keep_responses(
1412 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1413 500, 200, 200) as req_mock:
1414 self.keep_client.put('foo', num_retries=1, copies=2)
1415 self.assertEqual(3, req_mock.call_count)
1417 def test_fail_after_final_error(self):
1418 # First retry loop gets a 200 (can't achieve replication by
1419 # storing again on that server) and a 400 (can't retry that
1420 # server at all), so we shouldn't try a third request.
1421 with tutil.mock_keep_responses(
1422 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1423 200, 400, 200) as req_mock:
1424 with self.assertRaises(arvados.errors.KeepWriteError):
1425 self.keep_client.put('foo', num_retries=1, copies=2)
1426 self.assertEqual(2, req_mock.call_count)
1428 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1429 class KeepClientAPIErrorTest(unittest.TestCase, DiskCacheBase):
1433 DiskCacheBase.tearDown(self)
1435 def test_api_fail(self):
1436 class ApiMock(object):
1437 def __getattr__(self, r):
1438 if r == "api_token":
1440 elif r == "insecure":
1445 raise arvados.errors.KeepReadError()
1446 keep_client = arvados.KeepClient(api_client=ApiMock(),
1447 proxy='', local_store='',
1448 block_cache=self.make_block_cache(self.disk_cache))
1450 # The bug this is testing for is that if an API (not
1451 # keepstore) exception is thrown as part of a get(), the next
1452 # attempt to get that same block will result in a deadlock.
1453 # This is why there are two get()s in a row. Unfortunately,
1454 # the failure mode for this test is that the test suite
1455 # deadlocks, there isn't a good way to avoid that without
1456 # adding a special case that has no use except for this test.
1458 with self.assertRaises(arvados.errors.KeepReadError):
1459 keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1460 with self.assertRaises(arvados.errors.KeepReadError):
1461 keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1464 class KeepDiskCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
1466 self.api_client = self.mock_keep_services(count=2)
1467 self.data = b'xyzzy'
1468 self.locator = '1271ed5ef305aadabc605b1609e24c52'
1469 self.disk_cache_dir = tempfile.mkdtemp()
1472 shutil.rmtree(self.disk_cache_dir)
1475 @mock.patch('arvados.KeepClient.KeepService.get')
1476 def test_disk_cache_read(self, get_mock):
1477 # confirm it finds an existing cache block when the cache is
1480 os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1481 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1484 # block cache should have found the existing block
1485 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1486 disk_cache_dir=self.disk_cache_dir)
1487 keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1489 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1491 get_mock.assert_not_called()
1494 @mock.patch('arvados.KeepClient.KeepService.get')
1495 def test_disk_cache_share(self, get_mock):
1496 # confirm it finds a cache block written after the disk cache
1499 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1500 disk_cache_dir=self.disk_cache_dir)
1501 keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1503 os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1504 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1507 # when we try to get the block, it'll check the disk and find it.
1508 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1510 get_mock.assert_not_called()
1513 def test_disk_cache_write(self):
1514 # confirm the cache block was created
1516 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1517 disk_cache_dir=self.disk_cache_dir)
1518 keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1520 with tutil.mock_keep_responses(self.data, 200) as mock:
1521 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1523 self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1525 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1526 self.assertTrue(tutil.binary_compare(f.read(), self.data))
1529 def test_disk_cache_clean(self):
1530 # confirm that a tmp file in the cache is cleaned up
1532 os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1533 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock"), "wb") as f:
1536 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC"), "wb") as f:
1539 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC"), "wb") as f:
1542 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1543 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1544 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1546 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1547 disk_cache_dir=self.disk_cache_dir)
1549 # The tmp still hasn't been deleted because it was created in the last 60 seconds
1550 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1551 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1552 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1554 # Set the mtime to 61s in the past
1555 os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock"), times=(time.time()-61, time.time()-61))
1556 os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC"), times=(time.time()-61, time.time()-61))
1557 os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC"), times=(time.time()-61, time.time()-61))
1559 block_cache2 = arvados.keep.KeepBlockCache(disk_cache=True,
1560 disk_cache_dir=self.disk_cache_dir)
1562 # Tmp should be gone but the other ones are safe.
1563 self.assertFalse(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1564 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1565 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1568 @mock.patch('arvados.KeepClient.KeepService.get')
1569 def test_disk_cache_cap(self, get_mock):
1570 # confirm that the cache is kept to the desired limit
1572 os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1573 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1576 os.makedirs(os.path.join(self.disk_cache_dir, "acb"))
1577 with open(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock"), "wb") as f:
1580 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1581 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1583 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1584 disk_cache_dir=self.disk_cache_dir,
1587 self.assertFalse(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1588 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1591 @mock.patch('arvados.KeepClient.KeepService.get')
1592 def test_disk_cache_share(self, get_mock):
1593 # confirm that a second cache doesn't delete files that belong to the first cache.
1595 os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1596 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1599 os.makedirs(os.path.join(self.disk_cache_dir, "acb"))
1600 with open(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock"), "wb") as f:
1603 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1604 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1606 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1607 disk_cache_dir=self.disk_cache_dir,
1610 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1611 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1613 block_cache2 = arvados.keep.KeepBlockCache(disk_cache=True,
1614 disk_cache_dir=self.disk_cache_dir,
1617 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1618 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1622 def test_disk_cache_error(self):
1623 os.chmod(self.disk_cache_dir, stat.S_IRUSR)
1625 # Fail during cache initialization.
1626 with self.assertRaises(OSError):
1627 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1628 disk_cache_dir=self.disk_cache_dir)
1631 def test_disk_cache_write_error(self):
1632 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1633 disk_cache_dir=self.disk_cache_dir)
1635 keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1637 # Make the cache dir read-only
1638 os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1639 os.chmod(os.path.join(self.disk_cache_dir, self.locator[0:3]), stat.S_IRUSR)
1642 with self.assertRaises(arvados.errors.KeepCacheError):
1643 with tutil.mock_keep_responses(self.data, 200) as mock:
1644 keep_client.get(self.locator)
1647 def test_disk_cache_retry_write_error(self):
1648 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1649 disk_cache_dir=self.disk_cache_dir)
1651 keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1654 realmmap = mmap.mmap
1655 def sideeffect_mmap(*args, **kwargs):
1659 raise OSError(errno.ENOSPC, "no space")
1661 return realmmap(*args, **kwargs)
1663 with patch('mmap.mmap') as mockmmap:
1664 mockmmap.side_effect = sideeffect_mmap
1666 cache_max_before = block_cache.cache_max
1668 with tutil.mock_keep_responses(self.data, 200) as mock:
1669 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1671 self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1673 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1674 self.assertTrue(tutil.binary_compare(f.read(), self.data))
1676 # shrank the cache in response to ENOSPC
1677 self.assertTrue(cache_max_before > block_cache.cache_max)
1680 def test_disk_cache_retry_write_error2(self):
1681 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1682 disk_cache_dir=self.disk_cache_dir)
1684 keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1687 realmmap = mmap.mmap
1688 def sideeffect_mmap(*args, **kwargs):
1692 raise OSError(errno.ENOMEM, "no memory")
1694 return realmmap(*args, **kwargs)
1696 with patch('mmap.mmap') as mockmmap:
1697 mockmmap.side_effect = sideeffect_mmap
1699 slots_before = block_cache._max_slots
1701 with tutil.mock_keep_responses(self.data, 200) as mock:
1702 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1704 self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1706 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1707 self.assertTrue(tutil.binary_compare(f.read(), self.data))
1709 # shrank the cache in response to ENOMEM
1710 self.assertTrue(slots_before > block_cache._max_slots)