1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 standard_library.install_aliases()
9 from builtins import str
10 from builtins import range
11 from builtins import object
33 from . import arvados_testutil as tutil
34 from . import keepstub
35 from . import run_test_server
37 from .arvados_testutil import DiskCacheBase
39 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
40 class KeepTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
44 block_cache_test = None
48 super(KeepTestCase, cls).setUpClass()
49 run_test_server.authorize_with("admin")
50 cls.api_client = arvados.api('v1')
51 cls.block_cache_test = DiskCacheBase()
52 cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
53 proxy='', local_store='',
54 block_cache=cls.block_cache_test.make_block_cache(cls.disk_cache))
57 def tearDownClass(cls):
58 super(KeepTestCase, cls).setUpClass()
59 cls.block_cache_test.tearDown()
61 def test_KeepBasicRWTest(self):
62 self.assertEqual(0, self.keep_client.upload_counter.get())
63 foo_locator = self.keep_client.put('foo')
66 '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
67 'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
69 # 6 bytes because uploaded 2 copies
70 self.assertEqual(6, self.keep_client.upload_counter.get())
72 self.assertEqual(0, self.keep_client.download_counter.get())
73 self.assertTrue(tutil.binary_compare(self.keep_client.get(foo_locator),
75 'wrong content from Keep.get(md5("foo"))')
76 self.assertEqual(3, self.keep_client.download_counter.get())
78 def test_KeepBinaryRWTest(self):
79 blob_str = b'\xff\xfe\xf7\x00\x01\x02'
80 blob_locator = self.keep_client.put(blob_str)
83 '^7fc7c53b45e53926ba52821140fef396\+6',
84 ('wrong locator from Keep.put(<binarydata>):' + blob_locator))
85 self.assertEqual(self.keep_client.get(blob_locator),
87 'wrong content from Keep.get(md5(<binarydata>))')
89 def test_KeepLongBinaryRWTest(self):
90 blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
91 for i in range(0, 23):
92 blob_data = blob_data + blob_data
93 blob_locator = self.keep_client.put(blob_data)
96 '^84d90fc0d8175dd5dcfab04b999bc956\+67108864',
97 ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
98 self.assertEqual(self.keep_client.get(blob_locator),
100 'wrong content from Keep.get(md5(<binarydata>))')
102 @unittest.skip("unreliable test - please fix and close #8752")
103 def test_KeepSingleCopyRWTest(self):
104 blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
105 blob_locator = self.keep_client.put(blob_data, copies=1)
108 '^c902006bc98a3eb4a3663b65ab4a6fab\+8',
109 ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
110 self.assertEqual(self.keep_client.get(blob_locator),
112 'wrong content from Keep.get(md5(<binarydata>))')
114 def test_KeepEmptyCollectionTest(self):
115 blob_locator = self.keep_client.put('', copies=1)
118 '^d41d8cd98f00b204e9800998ecf8427e\+0',
119 ('wrong locator from Keep.put(""): ' + blob_locator))
121 def test_unicode_must_be_ascii(self):
122 # If unicode type, must only consist of valid ASCII
123 foo_locator = self.keep_client.put(u'foo')
126 '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
127 'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
129 if sys.version_info < (3, 0):
130 with self.assertRaises(UnicodeEncodeError):
131 # Error if it is not ASCII
132 self.keep_client.put(u'\xe2')
134 with self.assertRaises(AttributeError):
135 # Must be bytes or have an encode() method
136 self.keep_client.put({})
138 def test_KeepHeadTest(self):
139 locator = self.keep_client.put('test_head')
142 '^b9a772c7049325feb7130fff1f8333e9\+9',
143 'wrong md5 hash from Keep.put for "test_head": ' + locator)
144 self.assertEqual(True, self.keep_client.head(locator))
145 self.assertEqual(self.keep_client.get(locator),
147 'wrong content from Keep.get for "test_head"')
149 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
150 class KeepPermissionTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
153 KEEP_SERVER = {'blob_signing': True}
156 DiskCacheBase.tearDown(self)
158 def test_KeepBasicRWTest(self):
159 run_test_server.authorize_with('active')
160 keep_client = arvados.KeepClient(block_cache=self.make_block_cache(self.disk_cache))
161 foo_locator = keep_client.put('foo')
164 r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
165 'invalid locator from Keep.put("foo"): ' + foo_locator)
166 self.assertEqual(keep_client.get(foo_locator),
168 'wrong content from Keep.get(md5("foo"))')
170 # GET with an unsigned locator => NotFound
171 bar_locator = keep_client.put('bar')
172 unsigned_bar_locator = "37b51d194a7513e45b56f6524f2d51f2+3"
175 r'^37b51d194a7513e45b56f6524f2d51f2\+3\+A[a-f0-9]+@[a-f0-9]+$',
176 'invalid locator from Keep.put("bar"): ' + bar_locator)
177 self.assertRaises(arvados.errors.NotFoundError,
179 unsigned_bar_locator)
181 # GET from a different user => NotFound
182 run_test_server.authorize_with('spectator')
183 self.assertRaises(arvados.errors.NotFoundError,
187 # Unauthenticated GET for a signed locator => NotFound
188 # Unauthenticated GET for an unsigned locator => NotFound
189 keep_client.api_token = ''
190 self.assertRaises(arvados.errors.NotFoundError,
193 self.assertRaises(arvados.errors.NotFoundError,
195 unsigned_bar_locator)
197 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
198 class KeepProxyTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
202 KEEP_PROXY_SERVER = {}
206 super(KeepProxyTestCase, cls).setUpClass()
207 run_test_server.authorize_with('active')
208 cls.api_client = arvados.api('v1')
211 super(KeepProxyTestCase, self).tearDown()
212 DiskCacheBase.tearDown(self)
214 def test_KeepProxyTest1(self):
215 # Will use ARVADOS_KEEP_SERVICES environment variable that
216 # is set by setUpClass().
217 keep_client = arvados.KeepClient(api_client=self.api_client,
218 local_store='', block_cache=self.make_block_cache(self.disk_cache))
219 baz_locator = keep_client.put('baz')
222 '^73feffa4b7f6bb68e44cf984c85f6e88\+3',
223 'wrong md5 hash from Keep.put("baz"): ' + baz_locator)
224 self.assertEqual(keep_client.get(baz_locator),
226 'wrong content from Keep.get(md5("baz"))')
227 self.assertTrue(keep_client.using_proxy)
229 def test_KeepProxyTestMultipleURIs(self):
230 # Test using ARVADOS_KEEP_SERVICES env var overriding any
231 # existing proxy setting and setting multiple proxies
232 arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
233 keep_client = arvados.KeepClient(api_client=self.api_client,
235 block_cache=self.make_block_cache(self.disk_cache))
236 uris = [x['_service_root'] for x in keep_client._keep_services]
237 self.assertEqual(uris, ['http://10.0.0.1/',
238 'https://foo.example.org:1234/'])
240 def test_KeepProxyTestInvalidURI(self):
241 arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org'
242 with self.assertRaises(arvados.errors.ArgumentError):
243 keep_client = arvados.KeepClient(api_client=self.api_client,
245 block_cache=self.make_block_cache(self.disk_cache))
247 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
248 class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
252 DiskCacheBase.tearDown(self)
254 def get_service_roots(self, api_client):
255 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
256 services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
257 return [urllib.parse.urlparse(url) for url in sorted(services)]
259 def test_ssl_flag_respected_in_roots(self):
260 for ssl_flag in [False, True]:
261 services = self.get_service_roots(self.mock_keep_services(
262 service_ssl_flag=ssl_flag))
264 ('https' if ssl_flag else 'http'), services[0].scheme)
266 def test_correct_ports_with_ipv6_addresses(self):
267 service = self.get_service_roots(self.mock_keep_services(
268 service_type='proxy', service_host='100::1', service_port=10, count=1))[0]
269 self.assertEqual('100::1', service.hostname)
270 self.assertEqual(10, service.port)
272 def test_recognize_proxy_services_in_controller_response(self):
273 keep_client = arvados.KeepClient(api_client=self.mock_keep_services(
274 service_type='proxy', service_host='localhost', service_port=9, count=1),
275 block_cache=self.make_block_cache(self.disk_cache))
277 # this will fail, but it ensures we get the service
279 keep_client.put('baz2', num_retries=0)
282 self.assertTrue(keep_client.using_proxy)
284 def test_insecure_disables_tls_verify(self):
285 api_client = self.mock_keep_services(count=1)
286 force_timeout = socket.timeout("timed out")
288 api_client.insecure = True
289 with tutil.mock_keep_responses(b'foo', 200) as mock:
290 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
291 keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
293 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
296 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
299 api_client.insecure = False
300 with tutil.mock_keep_responses(b'foo', 200) as mock:
301 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
302 keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
303 # getopt()==None here means we didn't change the
304 # default. If we were using real pycurl instead of a mock,
305 # it would return the default value 1.
307 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
310 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
313 def test_refresh_signature(self):
314 blk_digest = '6f5902ac237024bdd0c176cb93063dc4+11'
315 blk_sig = 'da39a3ee5e6b4b0d3255bfef95601890afd80709@53bed294'
316 local_loc = blk_digest+'+A'+blk_sig
317 remote_loc = blk_digest+'+R'+blk_sig
318 api_client = self.mock_keep_services(count=1)
319 headers = {'X-Keep-Locator':local_loc}
320 with tutil.mock_keep_responses('', 200, **headers):
321 # Check that the translated locator gets returned
322 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
323 self.assertEqual(local_loc, keep_client.refresh_signature(remote_loc))
324 # Check that refresh_signature() uses the correct method and headers
325 keep_client._get_or_head = mock.MagicMock()
326 keep_client.refresh_signature(remote_loc)
327 args, kwargs = keep_client._get_or_head.call_args_list[0]
328 self.assertIn(remote_loc, args)
329 self.assertEqual("HEAD", kwargs['method'])
330 self.assertIn('X-Keep-Signature', kwargs['headers'])
332 # test_*_timeout verify that KeepClient instructs pycurl to use
333 # the appropriate connection and read timeouts. They don't care
334 # whether pycurl actually exhibits the expected timeout behavior
335 # -- those tests are in the KeepClientTimeout test class.
337 def test_get_timeout(self):
338 api_client = self.mock_keep_services(count=1)
339 force_timeout = socket.timeout("timed out")
340 with tutil.mock_keep_responses(force_timeout, 0) as mock:
341 keep_client = arvados.KeepClient(
342 api_client=api_client,
343 block_cache=self.make_block_cache(self.disk_cache),
346 with self.assertRaises(arvados.errors.KeepReadError):
347 keep_client.get('ffffffffffffffffffffffffffffffff')
349 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
350 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
352 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
353 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
355 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
356 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
358 def test_put_timeout(self):
359 api_client = self.mock_keep_services(count=1)
360 force_timeout = socket.timeout("timed out")
361 with tutil.mock_keep_responses(force_timeout, 0) as mock:
362 keep_client = arvados.KeepClient(
363 api_client=api_client,
364 block_cache=self.make_block_cache(self.disk_cache),
367 with self.assertRaises(arvados.errors.KeepWriteError):
368 keep_client.put(b'foo')
370 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
371 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
373 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
374 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
376 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
377 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
379 def test_head_timeout(self):
380 api_client = self.mock_keep_services(count=1)
381 force_timeout = socket.timeout("timed out")
382 with tutil.mock_keep_responses(force_timeout, 0) as mock:
383 keep_client = arvados.KeepClient(
384 api_client=api_client,
385 block_cache=self.make_block_cache(self.disk_cache),
388 with self.assertRaises(arvados.errors.KeepReadError):
389 keep_client.head('ffffffffffffffffffffffffffffffff')
391 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
392 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
394 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
397 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
400 def test_proxy_get_timeout(self):
401 api_client = self.mock_keep_services(service_type='proxy', count=1)
402 force_timeout = socket.timeout("timed out")
403 with tutil.mock_keep_responses(force_timeout, 0) as mock:
404 keep_client = arvados.KeepClient(
405 api_client=api_client,
406 block_cache=self.make_block_cache(self.disk_cache),
409 with self.assertRaises(arvados.errors.KeepReadError):
410 keep_client.get('ffffffffffffffffffffffffffffffff')
412 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
413 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
415 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
416 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
418 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
419 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
421 def test_proxy_head_timeout(self):
422 api_client = self.mock_keep_services(service_type='proxy', count=1)
423 force_timeout = socket.timeout("timed out")
424 with tutil.mock_keep_responses(force_timeout, 0) as mock:
425 keep_client = arvados.KeepClient(
426 api_client=api_client,
427 block_cache=self.make_block_cache(self.disk_cache),
430 with self.assertRaises(arvados.errors.KeepReadError):
431 keep_client.head('ffffffffffffffffffffffffffffffff')
433 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
434 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
436 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
439 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
442 def test_proxy_put_timeout(self):
443 self.disk_cache_dir = None
444 api_client = self.mock_keep_services(service_type='proxy', count=1)
445 force_timeout = socket.timeout("timed out")
446 with tutil.mock_keep_responses(force_timeout, 0) as mock:
447 keep_client = arvados.KeepClient(
448 api_client=api_client,
451 with self.assertRaises(arvados.errors.KeepWriteError):
452 keep_client.put('foo')
454 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
455 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
457 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
458 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
460 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
461 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
463 def check_no_services_error(self, verb, exc_class):
464 api_client = mock.MagicMock(name='api_client')
465 api_client.keep_services().accessible().execute.side_effect = (
466 arvados.errors.ApiError)
467 keep_client = arvados.KeepClient(
468 api_client=api_client,
469 block_cache=self.make_block_cache(self.disk_cache),
472 with self.assertRaises(exc_class) as err_check:
473 getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
474 self.assertEqual(0, len(err_check.exception.request_errors()))
476 def test_get_error_with_no_services(self):
477 self.check_no_services_error('get', arvados.errors.KeepReadError)
479 def test_head_error_with_no_services(self):
480 self.check_no_services_error('head', arvados.errors.KeepReadError)
482 def test_put_error_with_no_services(self):
483 self.check_no_services_error('put', arvados.errors.KeepWriteError)
485 def check_errors_from_last_retry(self, verb, exc_class):
486 api_client = self.mock_keep_services(count=2)
487 req_mock = tutil.mock_keep_responses(
488 "retry error reporting test", 500, 500, 500, 500, 500, 500, 502, 502)
489 with req_mock, tutil.skip_sleep, \
490 self.assertRaises(exc_class) as err_check:
491 keep_client = arvados.KeepClient(
492 api_client=api_client,
493 block_cache=self.make_block_cache(self.disk_cache),
496 getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
498 self.assertEqual([502, 502], [
499 getattr(error, 'status_code', None)
500 for error in err_check.exception.request_errors().values()])
501 self.assertRegex(str(err_check.exception), r'failed to (read|write) .* after 4 attempts')
503 def test_get_error_reflects_last_retry(self):
504 self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
506 def test_head_error_reflects_last_retry(self):
507 self.check_errors_from_last_retry('head', arvados.errors.KeepReadError)
509 def test_put_error_reflects_last_retry(self):
510 self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
512 def test_put_error_does_not_include_successful_puts(self):
513 data = 'partial failure test'
514 data_loc = tutil.str_keep_locator(data)
515 api_client = self.mock_keep_services(count=3)
516 with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
517 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
518 keep_client = arvados.KeepClient(
519 api_client=api_client,
520 block_cache=self.make_block_cache(self.disk_cache),
523 keep_client.put(data)
524 self.assertEqual(2, len(exc_check.exception.request_errors()))
526 def test_proxy_put_with_no_writable_services(self):
527 data = 'test with no writable services'
528 data_loc = tutil.str_keep_locator(data)
529 api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
530 with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
531 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
532 keep_client = arvados.KeepClient(
533 api_client=api_client,
534 block_cache=self.make_block_cache(self.disk_cache),
537 keep_client.put(data)
538 self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
539 self.assertEqual(0, len(exc_check.exception.request_errors()))
541 def test_oddball_service_get(self):
542 body = b'oddball service get'
543 api_client = self.mock_keep_services(service_type='fancynewblobstore')
544 with tutil.mock_keep_responses(body, 200):
545 keep_client = arvados.KeepClient(
546 api_client=api_client,
547 block_cache=self.make_block_cache(self.disk_cache),
550 actual = keep_client.get(tutil.str_keep_locator(body))
551 self.assertEqual(body, actual)
553 def test_oddball_service_put(self):
554 body = b'oddball service put'
555 pdh = tutil.str_keep_locator(body)
556 api_client = self.mock_keep_services(service_type='fancynewblobstore')
557 with tutil.mock_keep_responses(pdh, 200):
558 keep_client = arvados.KeepClient(
559 api_client=api_client,
560 block_cache=self.make_block_cache(self.disk_cache),
563 actual = keep_client.put(body, copies=1)
564 self.assertEqual(pdh, actual)
566 def test_oddball_service_writer_count(self):
567 body = b'oddball service writer count'
568 pdh = tutil.str_keep_locator(body)
569 api_client = self.mock_keep_services(service_type='fancynewblobstore',
571 headers = {'x-keep-replicas-stored': 3}
572 with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
573 **headers) as req_mock:
574 keep_client = arvados.KeepClient(
575 api_client=api_client,
576 block_cache=self.make_block_cache(self.disk_cache),
579 actual = keep_client.put(body, copies=2)
580 self.assertEqual(pdh, actual)
581 self.assertEqual(1, req_mock.call_count)
584 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
585 class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
589 self.api_client = self.mock_keep_services(count=2)
590 self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
592 self.locator = '1271ed5ef305aadabc605b1609e24c52'
595 DiskCacheBase.tearDown(self)
597 @mock.patch('arvados.KeepClient.KeepService.get')
598 def test_get_request_cache(self, get_mock):
599 with tutil.mock_keep_responses(self.data, 200, 200):
600 self.keep_client.get(self.locator)
601 self.keep_client.get(self.locator)
602 # Request already cached, don't require more than one request
603 get_mock.assert_called_once()
605 @mock.patch('arvados.KeepClient.KeepService.get')
606 def test_head_request_cache(self, get_mock):
607 with tutil.mock_keep_responses(self.data, 200, 200):
608 self.keep_client.head(self.locator)
609 self.keep_client.head(self.locator)
610 # Don't cache HEAD requests so that they're not confused with GET reqs
611 self.assertEqual(2, get_mock.call_count)
613 @mock.patch('arvados.KeepClient.KeepService.get')
614 def test_head_and_then_get_return_different_responses(self, get_mock):
617 get_mock.side_effect = [b'first response', b'second response']
618 with tutil.mock_keep_responses(self.data, 200, 200):
619 head_resp = self.keep_client.head(self.locator)
620 get_resp = self.keep_client.get(self.locator)
621 self.assertEqual(b'first response', head_resp)
622 # First reponse was not cached because it was from a HEAD request.
623 self.assertNotEqual(head_resp, get_resp)
629 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
630 class KeepStorageClassesTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
634 self.api_client = self.mock_keep_services(count=2)
635 self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
637 self.locator = '1271ed5ef305aadabc605b1609e24c52'
640 DiskCacheBase.tearDown(self)
642 def test_multiple_default_storage_classes_req_header(self):
643 api_mock = self.api_client_mock()
644 api_mock.config.return_value = {
646 'foo': { 'Default': True },
647 'bar': { 'Default': True },
648 'baz': { 'Default': False }
651 api_client = self.mock_keep_services(api_mock=api_mock, count=2)
652 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
654 'x-keep-storage-classes-confirmed': 'foo=1, bar=1',
655 'x-keep-replicas-stored': 1
657 with tutil.mock_keep_responses(self.locator, 200, **resp_hdr) as mock:
658 keep_client.put(self.data, copies=1)
659 req_hdr = mock.responses[0]
661 'X-Keep-Storage-Classes: bar, foo', req_hdr.getopt(pycurl.HTTPHEADER))
663 def test_storage_classes_req_header(self):
665 self.api_client.config()['StorageClasses'],
666 {'default': {'Default': True}})
668 # requested, expected
669 [['foo'], 'X-Keep-Storage-Classes: foo'],
670 [['bar', 'foo'], 'X-Keep-Storage-Classes: bar, foo'],
671 [[], 'X-Keep-Storage-Classes: default'],
672 [None, 'X-Keep-Storage-Classes: default'],
674 for req_classes, expected_header in cases:
675 headers = {'x-keep-replicas-stored': 1}
676 if req_classes is None or len(req_classes) == 0:
677 confirmed_hdr = 'default=1'
678 elif len(req_classes) > 0:
679 confirmed_hdr = ', '.join(["{}=1".format(cls) for cls in req_classes])
680 headers.update({'x-keep-storage-classes-confirmed': confirmed_hdr})
681 with tutil.mock_keep_responses(self.locator, 200, **headers) as mock:
682 self.keep_client.put(self.data, copies=1, classes=req_classes)
683 req_hdr = mock.responses[0]
684 self.assertIn(expected_header, req_hdr.getopt(pycurl.HTTPHEADER))
686 def test_partial_storage_classes_put(self):
688 'x-keep-replicas-stored': 1,
689 'x-keep-storage-classes-confirmed': 'foo=1'}
690 with tutil.mock_keep_responses(self.locator, 200, 503, **headers) as mock:
691 with self.assertRaises(arvados.errors.KeepWriteError):
692 self.keep_client.put(self.data, copies=1, classes=['foo', 'bar'], num_retries=0)
693 # 1st request, both classes pending
694 req1_headers = mock.responses[0].getopt(pycurl.HTTPHEADER)
695 self.assertIn('X-Keep-Storage-Classes: bar, foo', req1_headers)
696 # 2nd try, 'foo' class already satisfied
697 req2_headers = mock.responses[1].getopt(pycurl.HTTPHEADER)
698 self.assertIn('X-Keep-Storage-Classes: bar', req2_headers)
700 def test_successful_storage_classes_put_requests(self):
702 # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, expected_requests
703 [ 1, ['foo'], 1, 'foo=1', 1],
704 [ 1, ['foo'], 2, 'foo=2', 1],
705 [ 2, ['foo'], 2, 'foo=2', 1],
706 [ 2, ['foo'], 1, 'foo=1', 2],
707 [ 1, ['foo', 'bar'], 1, 'foo=1, bar=1', 1],
708 [ 1, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
709 [ 2, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
710 [ 2, ['foo', 'bar'], 1, 'foo=1, bar=1', 2],
711 [ 1, ['foo', 'bar'], 1, None, 1],
712 [ 1, ['foo'], 1, None, 1],
713 [ 2, ['foo'], 2, None, 1],
714 [ 2, ['foo'], 1, None, 2],
716 for w_copies, w_classes, c_copies, c_classes, e_reqs in cases:
717 headers = {'x-keep-replicas-stored': c_copies}
718 if c_classes is not None:
719 headers.update({'x-keep-storage-classes-confirmed': c_classes})
720 with tutil.mock_keep_responses(self.locator, 200, 200, **headers) as mock:
721 case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}", expected_requests={}'.format(w_copies, ', '.join(w_classes), c_copies, c_classes, e_reqs)
722 self.assertEqual(self.locator,
723 self.keep_client.put(self.data, copies=w_copies, classes=w_classes),
725 self.assertEqual(e_reqs, mock.call_count, case_desc)
727 def test_failed_storage_classes_put_requests(self):
729 # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, return_code
730 [ 1, ['foo'], 1, 'bar=1', 200],
731 [ 1, ['foo'], 1, None, 503],
732 [ 2, ['foo'], 1, 'bar=1, foo=0', 200],
733 [ 3, ['foo'], 1, 'bar=1, foo=1', 200],
734 [ 3, ['foo', 'bar'], 1, 'bar=2, foo=1', 200],
736 for w_copies, w_classes, c_copies, c_classes, return_code in cases:
737 headers = {'x-keep-replicas-stored': c_copies}
738 if c_classes is not None:
739 headers.update({'x-keep-storage-classes-confirmed': c_classes})
740 with tutil.mock_keep_responses(self.locator, return_code, return_code, **headers):
741 case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}"'.format(w_copies, ', '.join(w_classes), c_copies, c_classes)
742 with self.assertRaises(arvados.errors.KeepWriteError, msg=case_desc):
743 self.keep_client.put(self.data, copies=w_copies, classes=w_classes, num_retries=0)
746 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
747 class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
751 self.api_client = self.mock_keep_services(count=2)
752 self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
754 self.locator = '1271ed5ef305aadabc605b1609e24c52'
755 self.test_id = arvados.util.new_request_id()
756 self.assertRegex(self.test_id, r'^req-[a-z0-9]{20}$')
757 # If we don't set request_id to None explicitly here, it will
758 # return <MagicMock name='api_client_mock.request_id'
760 self.api_client.request_id = None
763 DiskCacheBase.tearDown(self)
765 def test_default_to_api_client_request_id(self):
766 self.api_client.request_id = self.test_id
767 with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
768 self.keep_client.put(self.data)
769 self.assertEqual(2, len(mock.responses))
770 for resp in mock.responses:
771 self.assertProvidedRequestId(resp)
773 with tutil.mock_keep_responses(self.data, 200) as mock:
774 self.keep_client.get(self.locator)
775 self.assertProvidedRequestId(mock.responses[0])
777 with tutil.mock_keep_responses(b'', 200) as mock:
778 self.keep_client.head(self.locator)
779 self.assertProvidedRequestId(mock.responses[0])
781 def test_explicit_request_id(self):
782 with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
783 self.keep_client.put(self.data, request_id=self.test_id)
784 self.assertEqual(2, len(mock.responses))
785 for resp in mock.responses:
786 self.assertProvidedRequestId(resp)
788 with tutil.mock_keep_responses(self.data, 200) as mock:
789 self.keep_client.get(self.locator, request_id=self.test_id)
790 self.assertProvidedRequestId(mock.responses[0])
792 with tutil.mock_keep_responses(b'', 200) as mock:
793 self.keep_client.head(self.locator, request_id=self.test_id)
794 self.assertProvidedRequestId(mock.responses[0])
796 def test_automatic_request_id(self):
797 with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
798 self.keep_client.put(self.data)
799 self.assertEqual(2, len(mock.responses))
800 for resp in mock.responses:
801 self.assertAutomaticRequestId(resp)
803 with tutil.mock_keep_responses(self.data, 200) as mock:
804 self.keep_client.get(self.locator)
805 self.assertAutomaticRequestId(mock.responses[0])
807 with tutil.mock_keep_responses(b'', 200) as mock:
808 self.keep_client.head(self.locator)
809 self.assertAutomaticRequestId(mock.responses[0])
811 def test_request_id_in_exception(self):
812 with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
813 with self.assertRaisesRegex(arvados.errors.KeepReadError, self.test_id):
814 self.keep_client.head(self.locator, request_id=self.test_id)
816 with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
817 with self.assertRaisesRegex(arvados.errors.KeepReadError, r'req-[a-z0-9]{20}'):
818 self.keep_client.get(self.locator)
820 with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
821 with self.assertRaisesRegex(arvados.errors.KeepWriteError, self.test_id):
822 self.keep_client.put(self.data, request_id=self.test_id)
824 with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
825 with self.assertRaisesRegex(arvados.errors.KeepWriteError, r'req-[a-z0-9]{20}'):
826 self.keep_client.put(self.data)
828 def assertAutomaticRequestId(self, resp):
829 hdr = [x for x in resp.getopt(pycurl.HTTPHEADER)
830 if x.startswith('X-Request-Id: ')][0]
831 self.assertNotEqual(hdr, 'X-Request-Id: '+self.test_id)
832 self.assertRegex(hdr, r'^X-Request-Id: req-[a-z0-9]{20}$')
834 def assertProvidedRequestId(self, resp):
835 self.assertIn('X-Request-Id: '+self.test_id,
836 resp.getopt(pycurl.HTTPHEADER))
840 #@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
841 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
845 # expected_order[i] is the probe order for
846 # hash=md5(sprintf("%064x",i)) where there are 16 services
847 # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
848 # the first probe for the block consisting of 64 "0"
849 # characters is the service whose uuid is
850 # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
852 self.expected_order = [
853 list('3eab2d5fc9681074'),
854 list('097dba52e648f1c3'),
855 list('c5b4e023f8a7d691'),
856 list('9d81c02e76a3bf54'),
859 "{:064x}".format(x).encode()
860 for x in range(len(self.expected_order))]
862 hashlib.md5(self.blocks[x]).hexdigest()
863 for x in range(len(self.expected_order))]
864 self.api_client = self.mock_keep_services(count=self.services)
865 self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
868 DiskCacheBase.tearDown(self)
870 def test_weighted_service_roots_against_reference_set(self):
871 # Confirm weighted_service_roots() returns the correct order
872 for i, hash in enumerate(self.hashes):
873 roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash))
875 re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
877 self.assertEqual(self.expected_order[i], got_order)
879 def test_get_probe_order_against_reference_set(self):
880 self._test_probe_order_against_reference_set(
881 lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
883 def test_head_probe_order_against_reference_set(self):
884 self._test_probe_order_against_reference_set(
885 lambda i: self.keep_client.head(self.hashes[i], num_retries=1))
887 def test_put_probe_order_against_reference_set(self):
888 # copies=1 prevents the test from being sensitive to races
889 # between writer threads.
890 self._test_probe_order_against_reference_set(
891 lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1))
893 def _test_probe_order_against_reference_set(self, op):
894 for i in range(len(self.blocks)):
895 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \
896 self.assertRaises(arvados.errors.KeepRequestError):
899 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
900 for resp in mock.responses]
901 self.assertEqual(self.expected_order[i]*2, got_order)
903 def test_put_probe_order_multiple_copies(self):
904 for copies in range(2, 4):
905 for i in range(len(self.blocks)):
906 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \
907 self.assertRaises(arvados.errors.KeepWriteError):
908 self.keep_client.put(self.blocks[i], num_retries=2, copies=copies)
910 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
911 for resp in mock.responses]
912 # With T threads racing to make requests, the position
913 # of a given server in the sequence of HTTP requests
914 # (got_order) cannot be more than T-1 positions
915 # earlier than that server's position in the reference
916 # probe sequence (expected_order).
918 # Loop invariant: we have accounted for +pos+ expected
919 # probes, either by seeing them in +got_order+ or by
920 # putting them in +pending+ in the hope of seeing them
921 # later. As long as +len(pending)<T+, we haven't
922 # started a request too early.
924 for pos, expected in enumerate(self.expected_order[i]*3):
925 got = got_order[pos-len(pending)]
926 while got in pending:
927 del pending[pending.index(got)]
928 got = got_order[pos-len(pending)]
930 pending.append(expected)
932 len(pending), copies,
933 "pending={}, with copies={}, got {}, expected {}".format(
934 pending, copies, repr(got_order), repr(self.expected_order[i]*3)))
936 def test_probe_waste_adding_one_server(self):
938 hashlib.md5("{:064x}".format(x).encode()).hexdigest() for x in range(100)]
939 initial_services = 12
940 self.api_client = self.mock_keep_services(count=initial_services)
941 self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
943 self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
944 for added_services in range(1, 12):
945 api_client = self.mock_keep_services(count=initial_services+added_services)
946 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
948 for hash_index in range(len(hashes)):
949 probe_after = keep_client.weighted_service_roots(
950 arvados.KeepLocator(hashes[hash_index]))
951 penalty = probe_after.index(probes_before[hash_index][0])
952 self.assertLessEqual(penalty, added_services)
953 total_penalty += penalty
954 # Average penalty per block should not exceed
955 # N(added)/N(orig) by more than 20%, and should get closer
956 # to the ideal as we add data points.
959 len(hashes) / initial_services)
962 (120 - added_services)/100)
964 expect_penalty * 8/10)
966 min_penalty <= total_penalty <= max_penalty,
967 "With {}+{} services, {} blocks, penalty {} but expected {}..{}".format(
975 def check_64_zeros_error_order(self, verb, exc_class):
978 data = tutil.str_keep_locator(data)
979 # Arbitrary port number:
980 aport = random.randint(1024,65535)
981 api_client = self.mock_keep_services(service_port=aport, count=self.services)
982 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
983 with mock.patch('pycurl.Curl') as curl_mock, \
984 self.assertRaises(exc_class) as err_check:
985 curl_mock.return_value = tutil.FakeCurl.make(code=500, body=b'')
986 getattr(keep_client, verb)(data)
987 urls = [urllib.parse.urlparse(url)
988 for url in err_check.exception.request_errors()]
989 self.assertEqual([('keep0x' + c, aport) for c in '3eab2d5fc9681074'],
990 [(url.hostname, url.port) for url in urls])
992 def test_get_error_shows_probe_order(self):
993 self.check_64_zeros_error_order('get', arvados.errors.KeepReadError)
995 def test_put_error_shows_probe_order(self):
996 self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
998 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
999 class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase, DiskCacheBase):
1002 # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
1003 # 1s worth of data and then trigger bandwidth errors before running
1006 BANDWIDTH_LOW_LIM = 1024
1010 DiskCacheBase.tearDown(self)
1012 class assertTakesBetween(unittest.TestCase):
1013 def __init__(self, tmin, tmax):
1017 def __enter__(self):
1018 self.t0 = time.time()
1020 def __exit__(self, *args, **kwargs):
1021 # Round times to milliseconds, like CURL. Otherwise, we
1022 # fail when CURL reaches a 1s timeout at 0.9998s.
1023 delta = round(time.time() - self.t0, 3)
1024 self.assertGreaterEqual(delta, self.tmin)
1025 self.assertLessEqual(delta, self.tmax)
1027 class assertTakesGreater(unittest.TestCase):
1028 def __init__(self, tmin):
1031 def __enter__(self):
1032 self.t0 = time.time()
1034 def __exit__(self, *args, **kwargs):
1035 delta = round(time.time() - self.t0, 3)
1036 self.assertGreaterEqual(delta, self.tmin)
1038 def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
1039 return arvados.KeepClient(
1040 api_client=self.api_client,
1041 timeout=timeouts, block_cache=self.make_block_cache(self.disk_cache))
1043 def test_timeout_slow_connect(self):
1044 # Can't simulate TCP delays with our own socket. Leave our
1045 # stub server running uselessly, and try to connect to an
1046 # unroutable IP address instead.
1047 self.api_client = self.mock_keep_services(
1049 service_host='240.0.0.0',
1051 with self.assertTakesBetween(0.1, 0.5):
1052 with self.assertRaises(arvados.errors.KeepWriteError):
1053 self.keepClient().put(self.DATA, copies=1, num_retries=0)
1055 def test_low_bandwidth_no_delays_success(self):
1056 self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
1057 kc = self.keepClient()
1058 loc = kc.put(self.DATA, copies=1, num_retries=0)
1059 self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
1061 def test_too_low_bandwidth_no_delays_failure(self):
1062 # Check that lessening bandwidth corresponds to failing
1063 kc = self.keepClient()
1064 loc = kc.put(self.DATA, copies=1, num_retries=0)
1065 self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
1066 with self.assertTakesGreater(self.TIMEOUT_TIME):
1067 with self.assertRaises(arvados.errors.KeepReadError):
1068 kc.get(loc, num_retries=0)
1069 with self.assertTakesGreater(self.TIMEOUT_TIME):
1070 with self.assertRaises(arvados.errors.KeepWriteError):
1071 kc.put(self.DATA, copies=1, num_retries=0)
1073 def test_low_bandwidth_with_server_response_delay_failure(self):
1074 kc = self.keepClient()
1075 loc = kc.put(self.DATA, copies=1, num_retries=0)
1076 self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
1077 # Note the actual delay must be 1s longer than the low speed
1078 # limit interval in order for curl to detect it reliably.
1079 self.server.setdelays(response=self.TIMEOUT_TIME+1)
1080 with self.assertTakesGreater(self.TIMEOUT_TIME):
1081 with self.assertRaises(arvados.errors.KeepReadError):
1082 kc.get(loc, num_retries=0)
1083 with self.assertTakesGreater(self.TIMEOUT_TIME):
1084 with self.assertRaises(arvados.errors.KeepWriteError):
1085 kc.put(self.DATA, copies=1, num_retries=0)
1086 with self.assertTakesGreater(self.TIMEOUT_TIME):
1087 kc.head(loc, num_retries=0)
1089 def test_low_bandwidth_with_server_mid_delay_failure(self):
1090 kc = self.keepClient()
1091 loc = kc.put(self.DATA, copies=1, num_retries=0)
1092 self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
1093 # Note the actual delay must be 1s longer than the low speed
1094 # limit interval in order for curl to detect it reliably.
1095 self.server.setdelays(mid_write=self.TIMEOUT_TIME+1, mid_read=self.TIMEOUT_TIME+1)
1096 with self.assertTakesGreater(self.TIMEOUT_TIME):
1097 with self.assertRaises(arvados.errors.KeepReadError) as e:
1098 kc.get(loc, num_retries=0)
1099 with self.assertTakesGreater(self.TIMEOUT_TIME):
1100 with self.assertRaises(arvados.errors.KeepWriteError):
1101 kc.put(self.DATA, copies=1, num_retries=0)
1103 def test_timeout_slow_request(self):
1104 loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
1105 self.server.setdelays(request=.2)
1106 self._test_connect_timeout_under_200ms(loc)
1107 self.server.setdelays(request=2)
1108 self._test_response_timeout_under_2s(loc)
1110 def test_timeout_slow_response(self):
1111 loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
1112 self.server.setdelays(response=.2)
1113 self._test_connect_timeout_under_200ms(loc)
1114 self.server.setdelays(response=2)
1115 self._test_response_timeout_under_2s(loc)
1117 def test_timeout_slow_response_body(self):
1118 loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
1119 self.server.setdelays(response_body=.2)
1120 self._test_connect_timeout_under_200ms(loc)
1121 self.server.setdelays(response_body=2)
1122 self._test_response_timeout_under_2s(loc)
1124 def _test_connect_timeout_under_200ms(self, loc):
1125 # Allow 100ms to connect, then 1s for response. Everything
1126 # should work, and everything should take at least 200ms to
1128 kc = self.keepClient(timeouts=(.1, 1))
1129 with self.assertTakesBetween(.2, .3):
1130 kc.put(self.DATA, copies=1, num_retries=0)
1131 with self.assertTakesBetween(.2, .3):
1132 self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
1134 def _test_response_timeout_under_2s(self, loc):
1135 # Allow 10s to connect, then 1s for response. Nothing should
1136 # work, and everything should take at least 1s to return.
1137 kc = self.keepClient(timeouts=(10, 1))
1138 with self.assertTakesBetween(1, 9):
1139 with self.assertRaises(arvados.errors.KeepReadError):
1140 kc.get(loc, num_retries=0)
1141 with self.assertTakesBetween(1, 9):
1142 with self.assertRaises(arvados.errors.KeepWriteError):
1143 kc.put(self.DATA, copies=1, num_retries=0)
1145 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1146 class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
1150 DiskCacheBase.tearDown(self)
1152 def mock_disks_and_gateways(self, disks=3, gateways=1):
1154 'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
1155 'owner_uuid': 'zzzzz-tpzed-000000000000000',
1156 'service_host': 'gatewayhost{}'.format(i),
1157 'service_port': 12345,
1158 'service_ssl_flag': True,
1159 'service_type': 'gateway:test',
1160 } for i in range(gateways)]
1161 self.gateway_roots = [
1162 "https://{service_host}:{service_port}/".format(**gw)
1163 for gw in self.gateways]
1164 self.api_client = self.mock_keep_services(
1165 count=disks, additional_services=self.gateways)
1166 self.keepClient = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
1168 @mock.patch('pycurl.Curl')
1169 def test_get_with_gateway_hint_first(self, MockCurl):
1170 MockCurl.return_value = tutil.FakeCurl.make(
1171 code=200, body='foo', headers={'Content-Length': 3})
1172 self.mock_disks_and_gateways()
1173 locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@' + self.gateways[0]['uuid']
1174 self.assertEqual(b'foo', self.keepClient.get(locator))
1175 self.assertEqual(self.gateway_roots[0]+locator,
1176 MockCurl.return_value.getopt(pycurl.URL).decode())
1177 self.assertEqual(True, self.keepClient.head(locator))
1179 @mock.patch('pycurl.Curl')
1180 def test_get_with_gateway_hints_in_order(self, MockCurl):
1184 tutil.FakeCurl.make(code=404, body='')
1185 for _ in range(gateways+disks)
1187 MockCurl.side_effect = tutil.queue_with(mocks)
1188 self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1189 locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1190 ['K@'+gw['uuid'] for gw in self.gateways])
1191 with self.assertRaises(arvados.errors.NotFoundError):
1192 self.keepClient.get(locator)
1193 # Gateways are tried first, in the order given.
1194 for i, root in enumerate(self.gateway_roots):
1195 self.assertEqual(root+locator,
1196 mocks[i].getopt(pycurl.URL).decode())
1197 # Disk services are tried next.
1198 for i in range(gateways, gateways+disks):
1200 mocks[i].getopt(pycurl.URL).decode(),
1203 @mock.patch('pycurl.Curl')
1204 def test_head_with_gateway_hints_in_order(self, MockCurl):
1208 tutil.FakeCurl.make(code=404, body=b'')
1209 for _ in range(gateways+disks)
1211 MockCurl.side_effect = tutil.queue_with(mocks)
1212 self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1213 locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1214 ['K@'+gw['uuid'] for gw in self.gateways])
1215 with self.assertRaises(arvados.errors.NotFoundError):
1216 self.keepClient.head(locator)
1217 # Gateways are tried first, in the order given.
1218 for i, root in enumerate(self.gateway_roots):
1219 self.assertEqual(root+locator,
1220 mocks[i].getopt(pycurl.URL).decode())
1221 # Disk services are tried next.
1222 for i in range(gateways, gateways+disks):
1224 mocks[i].getopt(pycurl.URL).decode(),
1227 @mock.patch('pycurl.Curl')
1228 def test_get_with_remote_proxy_hint(self, MockCurl):
1229 MockCurl.return_value = tutil.FakeCurl.make(
1230 code=200, body=b'foo', headers={'Content-Length': 3})
1231 self.mock_disks_and_gateways()
1232 locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1233 self.assertEqual(b'foo', self.keepClient.get(locator))
1234 self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1235 MockCurl.return_value.getopt(pycurl.URL).decode())
1237 @mock.patch('pycurl.Curl')
1238 def test_head_with_remote_proxy_hint(self, MockCurl):
1239 MockCurl.return_value = tutil.FakeCurl.make(
1240 code=200, body=b'foo', headers={'Content-Length': 3})
1241 self.mock_disks_and_gateways()
1242 locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1243 self.assertEqual(True, self.keepClient.head(locator))
1244 self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1245 MockCurl.return_value.getopt(pycurl.URL).decode())
1247 class KeepClientRetryTestMixin(object):
1250 # Testing with a local Keep store won't exercise the retry behavior.
1251 # Instead, our strategy is:
1252 # * Create a client with one proxy specified (pointed at a black
1253 # hole), so there's no need to instantiate an API client, and
1254 # all HTTP requests come from one place.
1255 # * Mock httplib's request method to provide simulated responses.
1256 # This lets us test the retry logic extensively without relying on any
1257 # supporting servers, and prevents side effects in case something hiccups.
1258 # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
1261 # Test classes must define TEST_PATCHER to a method that mocks
1262 # out appropriate methods in the client.
1264 PROXY_ADDR = 'http://[%s]:65535/' % (tutil.TEST_HOST,)
1265 TEST_DATA = b'testdata'
1266 TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
1269 self.client_kwargs = {'proxy': self.PROXY_ADDR, 'local_store': ''}
1271 def new_client(self, **caller_kwargs):
1272 kwargs = self.client_kwargs.copy()
1273 kwargs.update(caller_kwargs)
1274 kwargs['block_cache'] = self.make_block_cache(self.disk_cache)
1275 return arvados.KeepClient(**kwargs)
1277 def run_method(self, *args, **kwargs):
1278 raise NotImplementedError("test subclasses must define run_method")
1280 def check_success(self, expected=None, *args, **kwargs):
1281 if expected is None:
1282 expected = self.DEFAULT_EXPECT
1283 self.assertEqual(expected, self.run_method(*args, **kwargs))
1285 def check_exception(self, error_class=None, *args, **kwargs):
1286 if error_class is None:
1287 error_class = self.DEFAULT_EXCEPTION
1288 with self.assertRaises(error_class) as err:
1289 self.run_method(*args, **kwargs)
1292 def test_immediate_success(self):
1293 with self.TEST_PATCHER(self.DEFAULT_EXPECT, 200):
1294 self.check_success()
1296 def test_retry_then_success(self):
1297 with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1298 self.check_success(num_retries=3)
1300 def test_exception_then_success(self):
1301 with self.TEST_PATCHER(self.DEFAULT_EXPECT, Exception('mock err'), 200):
1302 self.check_success(num_retries=3)
1304 def test_no_retry_after_permanent_error(self):
1305 with self.TEST_PATCHER(self.DEFAULT_EXPECT, 403, 200):
1306 self.check_exception(num_retries=3)
1308 def test_error_after_retries_exhausted(self):
1309 with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 500, 200):
1310 err = self.check_exception(num_retries=1)
1311 self.assertRegex(str(err.exception), r'failed to .* after 2 attempts')
1313 def test_num_retries_instance_fallback(self):
1314 self.client_kwargs['num_retries'] = 3
1315 with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1316 self.check_success()
1320 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1321 class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1322 DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
1323 DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1324 HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1325 TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1328 DiskCacheBase.tearDown(self)
1330 def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1332 return self.new_client().get(locator, *args, **kwargs)
1334 def test_specific_exception_when_not_found(self):
1335 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1336 self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1338 def test_general_exception_with_mixed_errors(self):
1339 # get should raise a NotFoundError if no server returns the block,
1340 # and a high threshold of servers report that it's not found.
1341 # This test rigs up 50/50 disagreement between two servers, and
1342 # checks that it does not become a NotFoundError.
1343 client = self.new_client(num_retries=0)
1344 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1345 with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1346 client.get(self.HINTED_LOCATOR)
1347 self.assertNotIsInstance(
1348 exc_check.exception, arvados.errors.NotFoundError,
1349 "mixed errors raised NotFoundError")
1351 def test_hint_server_can_succeed_without_retries(self):
1352 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1353 self.check_success(locator=self.HINTED_LOCATOR)
1355 def test_try_next_server_after_timeout(self):
1356 with tutil.mock_keep_responses(
1357 (socket.timeout("timed out"), 200),
1358 (self.DEFAULT_EXPECT, 200)):
1359 self.check_success(locator=self.HINTED_LOCATOR)
1361 def test_retry_data_with_wrong_checksum(self):
1362 with tutil.mock_keep_responses(
1364 (self.DEFAULT_EXPECT, 200)):
1365 self.check_success(locator=self.HINTED_LOCATOR)
1368 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1369 class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1370 DEFAULT_EXPECT = True
1371 DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1372 HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1373 TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1376 DiskCacheBase.tearDown(self)
1378 def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1380 return self.new_client().head(locator, *args, **kwargs)
1382 def test_specific_exception_when_not_found(self):
1383 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1384 self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1386 def test_general_exception_with_mixed_errors(self):
1387 # head should raise a NotFoundError if no server returns the block,
1388 # and a high threshold of servers report that it's not found.
1389 # This test rigs up 50/50 disagreement between two servers, and
1390 # checks that it does not become a NotFoundError.
1391 client = self.new_client(num_retries=0)
1392 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1393 with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1394 client.head(self.HINTED_LOCATOR)
1395 self.assertNotIsInstance(
1396 exc_check.exception, arvados.errors.NotFoundError,
1397 "mixed errors raised NotFoundError")
1399 def test_hint_server_can_succeed_without_retries(self):
1400 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1401 self.check_success(locator=self.HINTED_LOCATOR)
1403 def test_try_next_server_after_timeout(self):
1404 with tutil.mock_keep_responses(
1405 (socket.timeout("timed out"), 200),
1406 (self.DEFAULT_EXPECT, 200)):
1407 self.check_success(locator=self.HINTED_LOCATOR)
1410 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1411 class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1412 DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
1413 DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
1414 TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1417 DiskCacheBase.tearDown(self)
1419 def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
1420 copies=1, *args, **kwargs):
1421 return self.new_client().put(data, copies, *args, **kwargs)
1423 def test_do_not_send_multiple_copies_to_same_server(self):
1424 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200):
1425 self.check_exception(copies=2, num_retries=3)
1428 class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
1430 class FakeKeepService(object):
1431 def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
1433 self.will_succeed = will_succeed
1434 self.will_raise = will_raise
1436 self._result['headers'] = {}
1437 self._result['headers']['x-keep-replicas-stored'] = str(replicas)
1438 self._result['headers']['x-keep-storage-classes-confirmed'] = 'default={}'.format(replicas)
1439 self._result['body'] = 'foobar'
1441 def put(self, data_hash, data, timeout, headers):
1442 time.sleep(self.delay)
1443 if self.will_raise is not None:
1444 raise self.will_raise
1445 return self.will_succeed
1447 def last_result(self):
1448 if self.will_succeed:
1451 return {"status_code": 500, "body": "didn't succeed"}
1458 self.pool = arvados.KeepClient.KeepWriterThreadPool(
1460 data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1461 max_service_replicas = self.copies,
1462 copies = self.copies
1465 def test_only_write_enough_on_success(self):
1467 ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1468 self.pool.add_task(ks, None)
1470 self.assertEqual(self.pool.done(), (self.copies, []))
1472 def test_only_write_enough_on_partial_success(self):
1474 ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
1475 self.pool.add_task(ks, None)
1476 ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1477 self.pool.add_task(ks, None)
1479 self.assertEqual(self.pool.done(), (self.copies, []))
1481 def test_only_write_enough_when_some_crash(self):
1483 ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1484 self.pool.add_task(ks, None)
1485 ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1486 self.pool.add_task(ks, None)
1488 self.assertEqual(self.pool.done(), (self.copies, []))
1490 def test_fail_when_too_many_crash(self):
1491 for i in range(self.copies+1):
1492 ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1493 self.pool.add_task(ks, None)
1494 for i in range(self.copies-1):
1495 ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1496 self.pool.add_task(ks, None)
1498 self.assertEqual(self.pool.done(), (self.copies-1, []))
1502 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1503 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
1506 # Test put()s that need two distinct servers to succeed, possibly
1507 # requiring multiple passes through the retry loop.
1510 self.api_client = self.mock_keep_services(count=2)
1511 self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
1514 DiskCacheBase.tearDown(self)
1516 def test_success_after_exception(self):
1517 with tutil.mock_keep_responses(
1518 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1519 Exception('mock err'), 200, 200) as req_mock:
1520 self.keep_client.put('foo', num_retries=1, copies=2)
1521 self.assertEqual(3, req_mock.call_count)
1523 def test_success_after_retryable_error(self):
1524 with tutil.mock_keep_responses(
1525 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1526 500, 200, 200) as req_mock:
1527 self.keep_client.put('foo', num_retries=1, copies=2)
1528 self.assertEqual(3, req_mock.call_count)
1530 def test_fail_after_final_error(self):
1531 # First retry loop gets a 200 (can't achieve replication by
1532 # storing again on that server) and a 400 (can't retry that
1533 # server at all), so we shouldn't try a third request.
1534 with tutil.mock_keep_responses(
1535 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1536 200, 400, 200) as req_mock:
1537 with self.assertRaises(arvados.errors.KeepWriteError):
1538 self.keep_client.put('foo', num_retries=1, copies=2)
1539 self.assertEqual(2, req_mock.call_count)
1541 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1542 class KeepClientAPIErrorTest(unittest.TestCase, DiskCacheBase):
1546 DiskCacheBase.tearDown(self)
1548 def test_api_fail(self):
1549 class ApiMock(object):
1550 def __getattr__(self, r):
1551 if r == "api_token":
1553 elif r == "insecure":
1558 raise arvados.errors.KeepReadError()
1559 keep_client = arvados.KeepClient(api_client=ApiMock(),
1560 proxy='', local_store='',
1561 block_cache=self.make_block_cache(self.disk_cache))
1563 # The bug this is testing for is that if an API (not
1564 # keepstore) exception is thrown as part of a get(), the next
1565 # attempt to get that same block will result in a deadlock.
1566 # This is why there are two get()s in a row. Unfortunately,
1567 # the failure mode for this test is that the test suite
1568 # deadlocks, there isn't a good way to avoid that without
1569 # adding a special case that has no use except for this test.
1571 with self.assertRaises(arvados.errors.KeepReadError):
1572 keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1573 with self.assertRaises(arvados.errors.KeepReadError):
1574 keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1577 class KeepDiskCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
1579 self.api_client = self.mock_keep_services(count=2)
1580 self.data = b'xyzzy'
1581 self.locator = '1271ed5ef305aadabc605b1609e24c52'
1582 self.disk_cache_dir = tempfile.mkdtemp()
1585 shutil.rmtree(self.disk_cache_dir)
1588 @mock.patch('arvados.KeepClient.KeepService.get')
1589 def test_disk_cache_read(self, get_mock):
1590 # confirm it finds an existing cache block when the cache is
1593 os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1594 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1597 # block cache should have found the existing block
1598 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1599 disk_cache_dir=self.disk_cache_dir)
1600 keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1602 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1604 get_mock.assert_not_called()
1607 @mock.patch('arvados.KeepClient.KeepService.get')
1608 def test_disk_cache_share(self, get_mock):
1609 # confirm it finds a cache block written after the disk cache
1612 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1613 disk_cache_dir=self.disk_cache_dir)
1614 keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1616 os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1617 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1620 # when we try to get the block, it'll check the disk and find it.
1621 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1623 get_mock.assert_not_called()
1626 def test_disk_cache_write(self):
1627 # confirm the cache block was created
1629 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1630 disk_cache_dir=self.disk_cache_dir)
1631 keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1633 with tutil.mock_keep_responses(self.data, 200) as mock:
1634 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1636 self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1638 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1639 self.assertTrue(tutil.binary_compare(f.read(), self.data))
1642 def test_disk_cache_clean(self):
1643 # confirm that a tmp file in the cache is cleaned up
1645 os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1646 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock"), "wb") as f:
1649 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC"), "wb") as f:
1652 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC"), "wb") as f:
1655 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1656 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1657 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1659 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1660 disk_cache_dir=self.disk_cache_dir)
1662 # The tmp still hasn't been deleted because it was created in the last 60 seconds
1663 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1664 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1665 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1667 # Set the mtime to 61s in the past
1668 os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock"), times=(time.time()-61, time.time()-61))
1669 os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC"), times=(time.time()-61, time.time()-61))
1670 os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC"), times=(time.time()-61, time.time()-61))
1672 block_cache2 = arvados.keep.KeepBlockCache(disk_cache=True,
1673 disk_cache_dir=self.disk_cache_dir)
1675 # Tmp should be gone but the other ones are safe.
1676 self.assertFalse(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1677 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1678 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1681 @mock.patch('arvados.KeepClient.KeepService.get')
1682 def test_disk_cache_cap(self, get_mock):
1683 # confirm that the cache is kept to the desired limit
1685 os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1686 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1689 os.makedirs(os.path.join(self.disk_cache_dir, "acb"))
1690 with open(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock"), "wb") as f:
1693 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1694 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1696 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1697 disk_cache_dir=self.disk_cache_dir,
1700 self.assertFalse(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1701 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1704 @mock.patch('arvados.KeepClient.KeepService.get')
1705 def test_disk_cache_share(self, get_mock):
1706 # confirm that a second cache doesn't delete files that belong to the first cache.
1708 os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1709 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1712 os.makedirs(os.path.join(self.disk_cache_dir, "acb"))
1713 with open(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock"), "wb") as f:
1716 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1717 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1719 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1720 disk_cache_dir=self.disk_cache_dir,
1723 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1724 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1726 block_cache2 = arvados.keep.KeepBlockCache(disk_cache=True,
1727 disk_cache_dir=self.disk_cache_dir,
1730 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1731 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1735 def test_disk_cache_error(self):
1736 os.chmod(self.disk_cache_dir, stat.S_IRUSR)
1738 # Fail during cache initialization.
1739 with self.assertRaises(OSError):
1740 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1741 disk_cache_dir=self.disk_cache_dir)
1744 def test_disk_cache_write_error(self):
1745 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1746 disk_cache_dir=self.disk_cache_dir)
1748 keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1750 # Make the cache dir read-only
1751 os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1752 os.chmod(os.path.join(self.disk_cache_dir, self.locator[0:3]), stat.S_IRUSR)
1755 with self.assertRaises(arvados.errors.KeepCacheError):
1756 with tutil.mock_keep_responses(self.data, 200) as mock:
1757 keep_client.get(self.locator)
1760 @mock.patch('mmap.mmap')
1761 def test_disk_cache_retry_write_error(self, mockmmap):
1762 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1763 disk_cache_dir=self.disk_cache_dir)
1765 keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1767 mockmmap.side_effect = (OSError(errno.ENOSPC, "no space"), self.data)
1769 cache_max_before = block_cache.cache_max
1771 with tutil.mock_keep_responses(self.data, 200) as mock:
1772 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1774 self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1776 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1777 self.assertTrue(tutil.binary_compare(f.read(), self.data))
1779 # shrank the cache in response to ENOSPC
1780 self.assertTrue(cache_max_before > block_cache.cache_max)
1783 @mock.patch('mmap.mmap')
1784 def test_disk_cache_retry_write_error2(self, mockmmap):
1785 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1786 disk_cache_dir=self.disk_cache_dir)
1788 keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1790 mockmmap.side_effect = (OSError(errno.ENOMEM, "no memory"), self.data)
1792 slots_before = block_cache._max_slots
1794 with tutil.mock_keep_responses(self.data, 200) as mock:
1795 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1797 self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1799 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1800 self.assertTrue(tutil.binary_compare(f.read(), self.data))
1802 # shrank the cache in response to ENOMEM
1803 self.assertTrue(slots_before > block_cache._max_slots)