1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 standard_library.install_aliases()
9 from builtins import str
10 from builtins import range
11 from builtins import object
32 from . import arvados_testutil as tutil
33 from . import keepstub
34 from . import run_test_server
36 from .arvados_testutil import DiskCacheBase
38 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
39 class KeepTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
43 block_cache_test = None
47 super(KeepTestCase, cls).setUpClass()
48 run_test_server.authorize_with("admin")
49 cls.api_client = arvados.api('v1')
50 cls.block_cache_test = DiskCacheBase()
51 cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
52 proxy='', local_store='',
53 block_cache=cls.block_cache_test.make_block_cache(cls.disk_cache))
56 def tearDownClass(cls):
57 super(KeepTestCase, cls).setUpClass()
58 cls.block_cache_test.tearDown()
60 def test_KeepBasicRWTest(self):
61 self.assertEqual(0, self.keep_client.upload_counter.get())
62 foo_locator = self.keep_client.put('foo')
65 '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
66 'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
68 # 6 bytes because uploaded 2 copies
69 self.assertEqual(6, self.keep_client.upload_counter.get())
71 self.assertEqual(0, self.keep_client.download_counter.get())
72 self.assertTrue(tutil.binary_compare(self.keep_client.get(foo_locator),
74 'wrong content from Keep.get(md5("foo"))')
75 self.assertEqual(3, self.keep_client.download_counter.get())
77 def test_KeepBinaryRWTest(self):
78 blob_str = b'\xff\xfe\xf7\x00\x01\x02'
79 blob_locator = self.keep_client.put(blob_str)
82 '^7fc7c53b45e53926ba52821140fef396\+6',
83 ('wrong locator from Keep.put(<binarydata>):' + blob_locator))
84 self.assertEqual(self.keep_client.get(blob_locator),
86 'wrong content from Keep.get(md5(<binarydata>))')
88 def test_KeepLongBinaryRWTest(self):
89 blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
90 for i in range(0, 23):
91 blob_data = blob_data + blob_data
92 blob_locator = self.keep_client.put(blob_data)
95 '^84d90fc0d8175dd5dcfab04b999bc956\+67108864',
96 ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
97 self.assertEqual(self.keep_client.get(blob_locator),
99 'wrong content from Keep.get(md5(<binarydata>))')
101 @unittest.skip("unreliable test - please fix and close #8752")
102 def test_KeepSingleCopyRWTest(self):
103 blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
104 blob_locator = self.keep_client.put(blob_data, copies=1)
107 '^c902006bc98a3eb4a3663b65ab4a6fab\+8',
108 ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
109 self.assertEqual(self.keep_client.get(blob_locator),
111 'wrong content from Keep.get(md5(<binarydata>))')
113 def test_KeepEmptyCollectionTest(self):
114 blob_locator = self.keep_client.put('', copies=1)
117 '^d41d8cd98f00b204e9800998ecf8427e\+0',
118 ('wrong locator from Keep.put(""): ' + blob_locator))
120 def test_unicode_must_be_ascii(self):
121 # If unicode type, must only consist of valid ASCII
122 foo_locator = self.keep_client.put(u'foo')
125 '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
126 'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
128 if sys.version_info < (3, 0):
129 with self.assertRaises(UnicodeEncodeError):
130 # Error if it is not ASCII
131 self.keep_client.put(u'\xe2')
133 with self.assertRaises(AttributeError):
134 # Must be bytes or have an encode() method
135 self.keep_client.put({})
137 def test_KeepHeadTest(self):
138 locator = self.keep_client.put('test_head')
141 '^b9a772c7049325feb7130fff1f8333e9\+9',
142 'wrong md5 hash from Keep.put for "test_head": ' + locator)
143 self.assertEqual(True, self.keep_client.head(locator))
144 self.assertEqual(self.keep_client.get(locator),
146 'wrong content from Keep.get for "test_head"')
148 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
149 class KeepPermissionTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
152 KEEP_SERVER = {'blob_signing': True}
155 DiskCacheBase.tearDown(self)
157 def test_KeepBasicRWTest(self):
158 run_test_server.authorize_with('active')
159 keep_client = arvados.KeepClient(block_cache=self.make_block_cache(self.disk_cache))
160 foo_locator = keep_client.put('foo')
163 r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
164 'invalid locator from Keep.put("foo"): ' + foo_locator)
165 self.assertEqual(keep_client.get(foo_locator),
167 'wrong content from Keep.get(md5("foo"))')
169 # GET with an unsigned locator => NotFound
170 bar_locator = keep_client.put('bar')
171 unsigned_bar_locator = "37b51d194a7513e45b56f6524f2d51f2+3"
174 r'^37b51d194a7513e45b56f6524f2d51f2\+3\+A[a-f0-9]+@[a-f0-9]+$',
175 'invalid locator from Keep.put("bar"): ' + bar_locator)
176 self.assertRaises(arvados.errors.NotFoundError,
178 unsigned_bar_locator)
180 # GET from a different user => NotFound
181 run_test_server.authorize_with('spectator')
182 self.assertRaises(arvados.errors.NotFoundError,
186 # Unauthenticated GET for a signed locator => NotFound
187 # Unauthenticated GET for an unsigned locator => NotFound
188 keep_client.api_token = ''
189 self.assertRaises(arvados.errors.NotFoundError,
192 self.assertRaises(arvados.errors.NotFoundError,
194 unsigned_bar_locator)
196 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
197 class KeepProxyTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
201 KEEP_PROXY_SERVER = {}
205 super(KeepProxyTestCase, cls).setUpClass()
206 run_test_server.authorize_with('active')
207 cls.api_client = arvados.api('v1')
210 super(KeepProxyTestCase, self).tearDown()
211 DiskCacheBase.tearDown(self)
213 def test_KeepProxyTest1(self):
214 # Will use ARVADOS_KEEP_SERVICES environment variable that
215 # is set by setUpClass().
216 keep_client = arvados.KeepClient(api_client=self.api_client,
217 local_store='', block_cache=self.make_block_cache(self.disk_cache))
218 baz_locator = keep_client.put('baz')
221 '^73feffa4b7f6bb68e44cf984c85f6e88\+3',
222 'wrong md5 hash from Keep.put("baz"): ' + baz_locator)
223 self.assertEqual(keep_client.get(baz_locator),
225 'wrong content from Keep.get(md5("baz"))')
226 self.assertTrue(keep_client.using_proxy)
228 def test_KeepProxyTestMultipleURIs(self):
229 # Test using ARVADOS_KEEP_SERVICES env var overriding any
230 # existing proxy setting and setting multiple proxies
231 arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
232 keep_client = arvados.KeepClient(api_client=self.api_client,
234 block_cache=self.make_block_cache(self.disk_cache))
235 uris = [x['_service_root'] for x in keep_client._keep_services]
236 self.assertEqual(uris, ['http://10.0.0.1/',
237 'https://foo.example.org:1234/'])
239 def test_KeepProxyTestInvalidURI(self):
240 arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org'
241 with self.assertRaises(arvados.errors.ArgumentError):
242 keep_client = arvados.KeepClient(api_client=self.api_client,
244 block_cache=self.make_block_cache(self.disk_cache))
246 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
247 class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
251 DiskCacheBase.tearDown(self)
253 def get_service_roots(self, api_client):
254 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
255 services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
256 return [urllib.parse.urlparse(url) for url in sorted(services)]
258 def test_ssl_flag_respected_in_roots(self):
259 for ssl_flag in [False, True]:
260 services = self.get_service_roots(self.mock_keep_services(
261 service_ssl_flag=ssl_flag))
263 ('https' if ssl_flag else 'http'), services[0].scheme)
265 def test_correct_ports_with_ipv6_addresses(self):
266 service = self.get_service_roots(self.mock_keep_services(
267 service_type='proxy', service_host='100::1', service_port=10, count=1))[0]
268 self.assertEqual('100::1', service.hostname)
269 self.assertEqual(10, service.port)
271 def test_recognize_proxy_services_in_controller_response(self):
272 keep_client = arvados.KeepClient(api_client=self.mock_keep_services(
273 service_type='proxy', service_host='localhost', service_port=9, count=1),
274 block_cache=self.make_block_cache(self.disk_cache))
276 # this will fail, but it ensures we get the service
278 keep_client.put('baz2')
281 self.assertTrue(keep_client.using_proxy)
283 def test_insecure_disables_tls_verify(self):
284 api_client = self.mock_keep_services(count=1)
285 force_timeout = socket.timeout("timed out")
287 api_client.insecure = True
288 with tutil.mock_keep_responses(b'foo', 200) as mock:
289 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
290 keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
292 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
295 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
298 api_client.insecure = False
299 with tutil.mock_keep_responses(b'foo', 200) as mock:
300 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
301 keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
302 # getopt()==None here means we didn't change the
303 # default. If we were using real pycurl instead of a mock,
304 # it would return the default value 1.
306 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
309 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
312 def test_refresh_signature(self):
313 blk_digest = '6f5902ac237024bdd0c176cb93063dc4+11'
314 blk_sig = 'da39a3ee5e6b4b0d3255bfef95601890afd80709@53bed294'
315 local_loc = blk_digest+'+A'+blk_sig
316 remote_loc = blk_digest+'+R'+blk_sig
317 api_client = self.mock_keep_services(count=1)
318 headers = {'X-Keep-Locator':local_loc}
319 with tutil.mock_keep_responses('', 200, **headers):
320 # Check that the translated locator gets returned
321 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
322 self.assertEqual(local_loc, keep_client.refresh_signature(remote_loc))
323 # Check that refresh_signature() uses the correct method and headers
324 keep_client._get_or_head = mock.MagicMock()
325 keep_client.refresh_signature(remote_loc)
326 args, kwargs = keep_client._get_or_head.call_args_list[0]
327 self.assertIn(remote_loc, args)
328 self.assertEqual("HEAD", kwargs['method'])
329 self.assertIn('X-Keep-Signature', kwargs['headers'])
331 # test_*_timeout verify that KeepClient instructs pycurl to use
332 # the appropriate connection and read timeouts. They don't care
333 # whether pycurl actually exhibits the expected timeout behavior
334 # -- those tests are in the KeepClientTimeout test class.
336 def test_get_timeout(self):
337 api_client = self.mock_keep_services(count=1)
338 force_timeout = socket.timeout("timed out")
339 with tutil.mock_keep_responses(force_timeout, 0) as mock:
340 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
341 with self.assertRaises(arvados.errors.KeepReadError):
342 keep_client.get('ffffffffffffffffffffffffffffffff')
344 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
345 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
347 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
348 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
350 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
351 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
353 def test_put_timeout(self):
354 api_client = self.mock_keep_services(count=1)
355 force_timeout = socket.timeout("timed out")
356 with tutil.mock_keep_responses(force_timeout, 0) as mock:
357 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
358 with self.assertRaises(arvados.errors.KeepWriteError):
359 keep_client.put(b'foo')
361 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
362 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
364 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
365 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
367 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
368 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
370 def test_head_timeout(self):
371 api_client = self.mock_keep_services(count=1)
372 force_timeout = socket.timeout("timed out")
373 with tutil.mock_keep_responses(force_timeout, 0) as mock:
374 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
375 with self.assertRaises(arvados.errors.KeepReadError):
376 keep_client.head('ffffffffffffffffffffffffffffffff')
378 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
379 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
381 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
384 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
387 def test_proxy_get_timeout(self):
388 api_client = self.mock_keep_services(service_type='proxy', count=1)
389 force_timeout = socket.timeout("timed out")
390 with tutil.mock_keep_responses(force_timeout, 0) as mock:
391 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
392 with self.assertRaises(arvados.errors.KeepReadError):
393 keep_client.get('ffffffffffffffffffffffffffffffff')
395 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
396 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
398 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
399 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
401 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
402 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
404 def test_proxy_head_timeout(self):
405 api_client = self.mock_keep_services(service_type='proxy', count=1)
406 force_timeout = socket.timeout("timed out")
407 with tutil.mock_keep_responses(force_timeout, 0) as mock:
408 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
409 with self.assertRaises(arvados.errors.KeepReadError):
410 keep_client.head('ffffffffffffffffffffffffffffffff')
412 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
413 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
415 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
418 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
421 def test_proxy_put_timeout(self):
422 self.disk_cache_dir = None
423 api_client = self.mock_keep_services(service_type='proxy', count=1)
424 force_timeout = socket.timeout("timed out")
425 with tutil.mock_keep_responses(force_timeout, 0) as mock:
426 keep_client = arvados.KeepClient(api_client=api_client)
427 with self.assertRaises(arvados.errors.KeepWriteError):
428 keep_client.put('foo')
430 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
431 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
433 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
434 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
436 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
437 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
439 def check_no_services_error(self, verb, exc_class):
440 api_client = mock.MagicMock(name='api_client')
441 api_client.keep_services().accessible().execute.side_effect = (
442 arvados.errors.ApiError)
443 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
444 with self.assertRaises(exc_class) as err_check:
445 getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
446 self.assertEqual(0, len(err_check.exception.request_errors()))
448 def test_get_error_with_no_services(self):
449 self.check_no_services_error('get', arvados.errors.KeepReadError)
451 def test_head_error_with_no_services(self):
452 self.check_no_services_error('head', arvados.errors.KeepReadError)
454 def test_put_error_with_no_services(self):
455 self.check_no_services_error('put', arvados.errors.KeepWriteError)
457 def check_errors_from_last_retry(self, verb, exc_class):
458 api_client = self.mock_keep_services(count=2)
459 req_mock = tutil.mock_keep_responses(
460 "retry error reporting test", 500, 500, 500, 500, 500, 500, 502, 502)
461 with req_mock, tutil.skip_sleep, \
462 self.assertRaises(exc_class) as err_check:
463 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
464 getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
466 self.assertEqual([502, 502], [
467 getattr(error, 'status_code', None)
468 for error in err_check.exception.request_errors().values()])
469 self.assertRegex(str(err_check.exception), r'failed to (read|write) .* after 4 attempts')
471 def test_get_error_reflects_last_retry(self):
472 self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
474 def test_head_error_reflects_last_retry(self):
475 self.check_errors_from_last_retry('head', arvados.errors.KeepReadError)
477 def test_put_error_reflects_last_retry(self):
478 self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
480 def test_put_error_does_not_include_successful_puts(self):
481 data = 'partial failure test'
482 data_loc = tutil.str_keep_locator(data)
483 api_client = self.mock_keep_services(count=3)
484 with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
485 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
486 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
487 keep_client.put(data)
488 self.assertEqual(2, len(exc_check.exception.request_errors()))
490 def test_proxy_put_with_no_writable_services(self):
491 data = 'test with no writable services'
492 data_loc = tutil.str_keep_locator(data)
493 api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
494 with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
495 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
496 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
497 keep_client.put(data)
498 self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
499 self.assertEqual(0, len(exc_check.exception.request_errors()))
501 def test_oddball_service_get(self):
502 body = b'oddball service get'
503 api_client = self.mock_keep_services(service_type='fancynewblobstore')
504 with tutil.mock_keep_responses(body, 200):
505 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
506 actual = keep_client.get(tutil.str_keep_locator(body))
507 self.assertEqual(body, actual)
509 def test_oddball_service_put(self):
510 body = b'oddball service put'
511 pdh = tutil.str_keep_locator(body)
512 api_client = self.mock_keep_services(service_type='fancynewblobstore')
513 with tutil.mock_keep_responses(pdh, 200):
514 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
515 actual = keep_client.put(body, copies=1)
516 self.assertEqual(pdh, actual)
518 def test_oddball_service_writer_count(self):
519 body = b'oddball service writer count'
520 pdh = tutil.str_keep_locator(body)
521 api_client = self.mock_keep_services(service_type='fancynewblobstore',
523 headers = {'x-keep-replicas-stored': 3}
524 with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
525 **headers) as req_mock:
526 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
527 actual = keep_client.put(body, copies=2)
528 self.assertEqual(pdh, actual)
529 self.assertEqual(1, req_mock.call_count)
532 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
533 class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
537 self.api_client = self.mock_keep_services(count=2)
538 self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
540 self.locator = '1271ed5ef305aadabc605b1609e24c52'
543 DiskCacheBase.tearDown(self)
545 @mock.patch('arvados.KeepClient.KeepService.get')
546 def test_get_request_cache(self, get_mock):
547 with tutil.mock_keep_responses(self.data, 200, 200):
548 self.keep_client.get(self.locator)
549 self.keep_client.get(self.locator)
550 # Request already cached, don't require more than one request
551 get_mock.assert_called_once()
553 @mock.patch('arvados.KeepClient.KeepService.get')
554 def test_head_request_cache(self, get_mock):
555 with tutil.mock_keep_responses(self.data, 200, 200):
556 self.keep_client.head(self.locator)
557 self.keep_client.head(self.locator)
558 # Don't cache HEAD requests so that they're not confused with GET reqs
559 self.assertEqual(2, get_mock.call_count)
561 @mock.patch('arvados.KeepClient.KeepService.get')
562 def test_head_and_then_get_return_different_responses(self, get_mock):
565 get_mock.side_effect = [b'first response', b'second response']
566 with tutil.mock_keep_responses(self.data, 200, 200):
567 head_resp = self.keep_client.head(self.locator)
568 get_resp = self.keep_client.get(self.locator)
569 self.assertEqual(b'first response', head_resp)
570 # First reponse was not cached because it was from a HEAD request.
571 self.assertNotEqual(head_resp, get_resp)
577 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
578 class KeepStorageClassesTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
582 self.api_client = self.mock_keep_services(count=2)
583 self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
585 self.locator = '1271ed5ef305aadabc605b1609e24c52'
588 DiskCacheBase.tearDown(self)
590 def test_multiple_default_storage_classes_req_header(self):
591 api_mock = self.api_client_mock()
592 api_mock.config.return_value = {
594 'foo': { 'Default': True },
595 'bar': { 'Default': True },
596 'baz': { 'Default': False }
599 api_client = self.mock_keep_services(api_mock=api_mock, count=2)
600 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
602 'x-keep-storage-classes-confirmed': 'foo=1, bar=1',
603 'x-keep-replicas-stored': 1
605 with tutil.mock_keep_responses(self.locator, 200, **resp_hdr) as mock:
606 keep_client.put(self.data, copies=1)
607 req_hdr = mock.responses[0]
609 'X-Keep-Storage-Classes: bar, foo', req_hdr.getopt(pycurl.HTTPHEADER))
611 def test_storage_classes_req_header(self):
613 self.api_client.config()['StorageClasses'],
614 {'default': {'Default': True}})
616 # requested, expected
617 [['foo'], 'X-Keep-Storage-Classes: foo'],
618 [['bar', 'foo'], 'X-Keep-Storage-Classes: bar, foo'],
619 [[], 'X-Keep-Storage-Classes: default'],
620 [None, 'X-Keep-Storage-Classes: default'],
622 for req_classes, expected_header in cases:
623 headers = {'x-keep-replicas-stored': 1}
624 if req_classes is None or len(req_classes) == 0:
625 confirmed_hdr = 'default=1'
626 elif len(req_classes) > 0:
627 confirmed_hdr = ', '.join(["{}=1".format(cls) for cls in req_classes])
628 headers.update({'x-keep-storage-classes-confirmed': confirmed_hdr})
629 with tutil.mock_keep_responses(self.locator, 200, **headers) as mock:
630 self.keep_client.put(self.data, copies=1, classes=req_classes)
631 req_hdr = mock.responses[0]
632 self.assertIn(expected_header, req_hdr.getopt(pycurl.HTTPHEADER))
634 def test_partial_storage_classes_put(self):
636 'x-keep-replicas-stored': 1,
637 'x-keep-storage-classes-confirmed': 'foo=1'}
638 with tutil.mock_keep_responses(self.locator, 200, 503, **headers) as mock:
639 with self.assertRaises(arvados.errors.KeepWriteError):
640 self.keep_client.put(self.data, copies=1, classes=['foo', 'bar'])
641 # 1st request, both classes pending
642 req1_headers = mock.responses[0].getopt(pycurl.HTTPHEADER)
643 self.assertIn('X-Keep-Storage-Classes: bar, foo', req1_headers)
644 # 2nd try, 'foo' class already satisfied
645 req2_headers = mock.responses[1].getopt(pycurl.HTTPHEADER)
646 self.assertIn('X-Keep-Storage-Classes: bar', req2_headers)
648 def test_successful_storage_classes_put_requests(self):
650 # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, expected_requests
651 [ 1, ['foo'], 1, 'foo=1', 1],
652 [ 1, ['foo'], 2, 'foo=2', 1],
653 [ 2, ['foo'], 2, 'foo=2', 1],
654 [ 2, ['foo'], 1, 'foo=1', 2],
655 [ 1, ['foo', 'bar'], 1, 'foo=1, bar=1', 1],
656 [ 1, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
657 [ 2, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
658 [ 2, ['foo', 'bar'], 1, 'foo=1, bar=1', 2],
659 [ 1, ['foo', 'bar'], 1, None, 1],
660 [ 1, ['foo'], 1, None, 1],
661 [ 2, ['foo'], 2, None, 1],
662 [ 2, ['foo'], 1, None, 2],
664 for w_copies, w_classes, c_copies, c_classes, e_reqs in cases:
665 headers = {'x-keep-replicas-stored': c_copies}
666 if c_classes is not None:
667 headers.update({'x-keep-storage-classes-confirmed': c_classes})
668 with tutil.mock_keep_responses(self.locator, 200, 200, **headers) as mock:
669 case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}", expected_requests={}'.format(w_copies, ', '.join(w_classes), c_copies, c_classes, e_reqs)
670 self.assertEqual(self.locator,
671 self.keep_client.put(self.data, copies=w_copies, classes=w_classes),
673 self.assertEqual(e_reqs, mock.call_count, case_desc)
675 def test_failed_storage_classes_put_requests(self):
677 # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, return_code
678 [ 1, ['foo'], 1, 'bar=1', 200],
679 [ 1, ['foo'], 1, None, 503],
680 [ 2, ['foo'], 1, 'bar=1, foo=0', 200],
681 [ 3, ['foo'], 1, 'bar=1, foo=1', 200],
682 [ 3, ['foo', 'bar'], 1, 'bar=2, foo=1', 200],
684 for w_copies, w_classes, c_copies, c_classes, return_code in cases:
685 headers = {'x-keep-replicas-stored': c_copies}
686 if c_classes is not None:
687 headers.update({'x-keep-storage-classes-confirmed': c_classes})
688 with tutil.mock_keep_responses(self.locator, return_code, return_code, **headers):
689 case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}"'.format(w_copies, ', '.join(w_classes), c_copies, c_classes)
690 with self.assertRaises(arvados.errors.KeepWriteError, msg=case_desc):
691 self.keep_client.put(self.data, copies=w_copies, classes=w_classes)
694 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
695 class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
699 self.api_client = self.mock_keep_services(count=2)
700 self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
702 self.locator = '1271ed5ef305aadabc605b1609e24c52'
703 self.test_id = arvados.util.new_request_id()
704 self.assertRegex(self.test_id, r'^req-[a-z0-9]{20}$')
705 # If we don't set request_id to None explicitly here, it will
706 # return <MagicMock name='api_client_mock.request_id'
708 self.api_client.request_id = None
711 DiskCacheBase.tearDown(self)
713 def test_default_to_api_client_request_id(self):
714 self.api_client.request_id = self.test_id
715 with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
716 self.keep_client.put(self.data)
717 self.assertEqual(2, len(mock.responses))
718 for resp in mock.responses:
719 self.assertProvidedRequestId(resp)
721 with tutil.mock_keep_responses(self.data, 200) as mock:
722 self.keep_client.get(self.locator)
723 self.assertProvidedRequestId(mock.responses[0])
725 with tutil.mock_keep_responses(b'', 200) as mock:
726 self.keep_client.head(self.locator)
727 self.assertProvidedRequestId(mock.responses[0])
729 def test_explicit_request_id(self):
730 with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
731 self.keep_client.put(self.data, request_id=self.test_id)
732 self.assertEqual(2, len(mock.responses))
733 for resp in mock.responses:
734 self.assertProvidedRequestId(resp)
736 with tutil.mock_keep_responses(self.data, 200) as mock:
737 self.keep_client.get(self.locator, request_id=self.test_id)
738 self.assertProvidedRequestId(mock.responses[0])
740 with tutil.mock_keep_responses(b'', 200) as mock:
741 self.keep_client.head(self.locator, request_id=self.test_id)
742 self.assertProvidedRequestId(mock.responses[0])
744 def test_automatic_request_id(self):
745 with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
746 self.keep_client.put(self.data)
747 self.assertEqual(2, len(mock.responses))
748 for resp in mock.responses:
749 self.assertAutomaticRequestId(resp)
751 with tutil.mock_keep_responses(self.data, 200) as mock:
752 self.keep_client.get(self.locator)
753 self.assertAutomaticRequestId(mock.responses[0])
755 with tutil.mock_keep_responses(b'', 200) as mock:
756 self.keep_client.head(self.locator)
757 self.assertAutomaticRequestId(mock.responses[0])
759 def test_request_id_in_exception(self):
760 with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
761 with self.assertRaisesRegex(arvados.errors.KeepReadError, self.test_id):
762 self.keep_client.head(self.locator, request_id=self.test_id)
764 with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
765 with self.assertRaisesRegex(arvados.errors.KeepReadError, r'req-[a-z0-9]{20}'):
766 self.keep_client.get(self.locator)
768 with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
769 with self.assertRaisesRegex(arvados.errors.KeepWriteError, self.test_id):
770 self.keep_client.put(self.data, request_id=self.test_id)
772 with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
773 with self.assertRaisesRegex(arvados.errors.KeepWriteError, r'req-[a-z0-9]{20}'):
774 self.keep_client.put(self.data)
776 def assertAutomaticRequestId(self, resp):
777 hdr = [x for x in resp.getopt(pycurl.HTTPHEADER)
778 if x.startswith('X-Request-Id: ')][0]
779 self.assertNotEqual(hdr, 'X-Request-Id: '+self.test_id)
780 self.assertRegex(hdr, r'^X-Request-Id: req-[a-z0-9]{20}$')
782 def assertProvidedRequestId(self, resp):
783 self.assertIn('X-Request-Id: '+self.test_id,
784 resp.getopt(pycurl.HTTPHEADER))
788 #@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
789 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
793 # expected_order[i] is the probe order for
794 # hash=md5(sprintf("%064x",i)) where there are 16 services
795 # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
796 # the first probe for the block consisting of 64 "0"
797 # characters is the service whose uuid is
798 # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
800 self.expected_order = [
801 list('3eab2d5fc9681074'),
802 list('097dba52e648f1c3'),
803 list('c5b4e023f8a7d691'),
804 list('9d81c02e76a3bf54'),
807 "{:064x}".format(x).encode()
808 for x in range(len(self.expected_order))]
810 hashlib.md5(self.blocks[x]).hexdigest()
811 for x in range(len(self.expected_order))]
812 self.api_client = self.mock_keep_services(count=self.services)
813 self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
816 DiskCacheBase.tearDown(self)
818 def test_weighted_service_roots_against_reference_set(self):
819 # Confirm weighted_service_roots() returns the correct order
820 for i, hash in enumerate(self.hashes):
821 roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash))
823 re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
825 self.assertEqual(self.expected_order[i], got_order)
827 def test_get_probe_order_against_reference_set(self):
828 self._test_probe_order_against_reference_set(
829 lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
831 def test_head_probe_order_against_reference_set(self):
832 self._test_probe_order_against_reference_set(
833 lambda i: self.keep_client.head(self.hashes[i], num_retries=1))
835 def test_put_probe_order_against_reference_set(self):
836 # copies=1 prevents the test from being sensitive to races
837 # between writer threads.
838 self._test_probe_order_against_reference_set(
839 lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1))
841 def _test_probe_order_against_reference_set(self, op):
842 for i in range(len(self.blocks)):
843 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \
844 self.assertRaises(arvados.errors.KeepRequestError):
847 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
848 for resp in mock.responses]
849 self.assertEqual(self.expected_order[i]*2, got_order)
851 def test_put_probe_order_multiple_copies(self):
852 for copies in range(2, 4):
853 for i in range(len(self.blocks)):
854 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \
855 self.assertRaises(arvados.errors.KeepWriteError):
856 self.keep_client.put(self.blocks[i], num_retries=2, copies=copies)
858 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
859 for resp in mock.responses]
860 # With T threads racing to make requests, the position
861 # of a given server in the sequence of HTTP requests
862 # (got_order) cannot be more than T-1 positions
863 # earlier than that server's position in the reference
864 # probe sequence (expected_order).
866 # Loop invariant: we have accounted for +pos+ expected
867 # probes, either by seeing them in +got_order+ or by
868 # putting them in +pending+ in the hope of seeing them
869 # later. As long as +len(pending)<T+, we haven't
870 # started a request too early.
872 for pos, expected in enumerate(self.expected_order[i]*3):
873 got = got_order[pos-len(pending)]
874 while got in pending:
875 del pending[pending.index(got)]
876 got = got_order[pos-len(pending)]
878 pending.append(expected)
880 len(pending), copies,
881 "pending={}, with copies={}, got {}, expected {}".format(
882 pending, copies, repr(got_order), repr(self.expected_order[i]*3)))
884 def test_probe_waste_adding_one_server(self):
886 hashlib.md5("{:064x}".format(x).encode()).hexdigest() for x in range(100)]
887 initial_services = 12
888 self.api_client = self.mock_keep_services(count=initial_services)
889 self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
891 self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
892 for added_services in range(1, 12):
893 api_client = self.mock_keep_services(count=initial_services+added_services)
894 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
896 for hash_index in range(len(hashes)):
897 probe_after = keep_client.weighted_service_roots(
898 arvados.KeepLocator(hashes[hash_index]))
899 penalty = probe_after.index(probes_before[hash_index][0])
900 self.assertLessEqual(penalty, added_services)
901 total_penalty += penalty
902 # Average penalty per block should not exceed
903 # N(added)/N(orig) by more than 20%, and should get closer
904 # to the ideal as we add data points.
907 len(hashes) / initial_services)
910 (120 - added_services)/100)
912 expect_penalty * 8/10)
914 min_penalty <= total_penalty <= max_penalty,
915 "With {}+{} services, {} blocks, penalty {} but expected {}..{}".format(
923 def check_64_zeros_error_order(self, verb, exc_class):
926 data = tutil.str_keep_locator(data)
927 # Arbitrary port number:
928 aport = random.randint(1024,65535)
929 api_client = self.mock_keep_services(service_port=aport, count=self.services)
930 keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
931 with mock.patch('pycurl.Curl') as curl_mock, \
932 self.assertRaises(exc_class) as err_check:
933 curl_mock.return_value = tutil.FakeCurl.make(code=500, body=b'')
934 getattr(keep_client, verb)(data)
935 urls = [urllib.parse.urlparse(url)
936 for url in err_check.exception.request_errors()]
937 self.assertEqual([('keep0x' + c, aport) for c in '3eab2d5fc9681074'],
938 [(url.hostname, url.port) for url in urls])
940 def test_get_error_shows_probe_order(self):
941 self.check_64_zeros_error_order('get', arvados.errors.KeepReadError)
943 def test_put_error_shows_probe_order(self):
944 self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
946 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
947 class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase, DiskCacheBase):
950 # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
951 # 1s worth of data and then trigger bandwidth errors before running
954 BANDWIDTH_LOW_LIM = 1024
958 DiskCacheBase.tearDown(self)
960 class assertTakesBetween(unittest.TestCase):
961 def __init__(self, tmin, tmax):
966 self.t0 = time.time()
968 def __exit__(self, *args, **kwargs):
969 # Round times to milliseconds, like CURL. Otherwise, we
970 # fail when CURL reaches a 1s timeout at 0.9998s.
971 delta = round(time.time() - self.t0, 3)
972 self.assertGreaterEqual(delta, self.tmin)
973 self.assertLessEqual(delta, self.tmax)
975 class assertTakesGreater(unittest.TestCase):
976 def __init__(self, tmin):
980 self.t0 = time.time()
982 def __exit__(self, *args, **kwargs):
983 delta = round(time.time() - self.t0, 3)
984 self.assertGreaterEqual(delta, self.tmin)
986 def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
987 return arvados.KeepClient(
988 api_client=self.api_client,
989 timeout=timeouts, block_cache=self.make_block_cache(self.disk_cache))
991 def test_timeout_slow_connect(self):
992 # Can't simulate TCP delays with our own socket. Leave our
993 # stub server running uselessly, and try to connect to an
994 # unroutable IP address instead.
995 self.api_client = self.mock_keep_services(
997 service_host='240.0.0.0',
999 with self.assertTakesBetween(0.1, 0.5):
1000 with self.assertRaises(arvados.errors.KeepWriteError):
1001 self.keepClient().put(self.DATA, copies=1, num_retries=0)
1003 def test_low_bandwidth_no_delays_success(self):
1004 self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
1005 kc = self.keepClient()
1006 loc = kc.put(self.DATA, copies=1, num_retries=0)
1007 self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
1009 def test_too_low_bandwidth_no_delays_failure(self):
1010 # Check that lessening bandwidth corresponds to failing
1011 kc = self.keepClient()
1012 loc = kc.put(self.DATA, copies=1, num_retries=0)
1013 self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
1014 with self.assertTakesGreater(self.TIMEOUT_TIME):
1015 with self.assertRaises(arvados.errors.KeepReadError):
1016 kc.get(loc, num_retries=0)
1017 with self.assertTakesGreater(self.TIMEOUT_TIME):
1018 with self.assertRaises(arvados.errors.KeepWriteError):
1019 kc.put(self.DATA, copies=1, num_retries=0)
1021 def test_low_bandwidth_with_server_response_delay_failure(self):
1022 kc = self.keepClient()
1023 loc = kc.put(self.DATA, copies=1, num_retries=0)
1024 self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
1025 # Note the actual delay must be 1s longer than the low speed
1026 # limit interval in order for curl to detect it reliably.
1027 self.server.setdelays(response=self.TIMEOUT_TIME+1)
1028 with self.assertTakesGreater(self.TIMEOUT_TIME):
1029 with self.assertRaises(arvados.errors.KeepReadError):
1030 kc.get(loc, num_retries=0)
1031 with self.assertTakesGreater(self.TIMEOUT_TIME):
1032 with self.assertRaises(arvados.errors.KeepWriteError):
1033 kc.put(self.DATA, copies=1, num_retries=0)
1034 with self.assertTakesGreater(self.TIMEOUT_TIME):
1035 kc.head(loc, num_retries=0)
1037 def test_low_bandwidth_with_server_mid_delay_failure(self):
1038 kc = self.keepClient()
1039 loc = kc.put(self.DATA, copies=1, num_retries=0)
1040 self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
1041 # Note the actual delay must be 1s longer than the low speed
1042 # limit interval in order for curl to detect it reliably.
1043 self.server.setdelays(mid_write=self.TIMEOUT_TIME+1, mid_read=self.TIMEOUT_TIME+1)
1044 with self.assertTakesGreater(self.TIMEOUT_TIME):
1045 with self.assertRaises(arvados.errors.KeepReadError) as e:
1046 kc.get(loc, num_retries=0)
1047 with self.assertTakesGreater(self.TIMEOUT_TIME):
1048 with self.assertRaises(arvados.errors.KeepWriteError):
1049 kc.put(self.DATA, copies=1, num_retries=0)
1051 def test_timeout_slow_request(self):
1052 loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
1053 self.server.setdelays(request=.2)
1054 self._test_connect_timeout_under_200ms(loc)
1055 self.server.setdelays(request=2)
1056 self._test_response_timeout_under_2s(loc)
1058 def test_timeout_slow_response(self):
1059 loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
1060 self.server.setdelays(response=.2)
1061 self._test_connect_timeout_under_200ms(loc)
1062 self.server.setdelays(response=2)
1063 self._test_response_timeout_under_2s(loc)
1065 def test_timeout_slow_response_body(self):
1066 loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
1067 self.server.setdelays(response_body=.2)
1068 self._test_connect_timeout_under_200ms(loc)
1069 self.server.setdelays(response_body=2)
1070 self._test_response_timeout_under_2s(loc)
1072 def _test_connect_timeout_under_200ms(self, loc):
1073 # Allow 100ms to connect, then 1s for response. Everything
1074 # should work, and everything should take at least 200ms to
1076 kc = self.keepClient(timeouts=(.1, 1))
1077 with self.assertTakesBetween(.2, .3):
1078 kc.put(self.DATA, copies=1, num_retries=0)
1079 with self.assertTakesBetween(.2, .3):
1080 self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
1082 def _test_response_timeout_under_2s(self, loc):
1083 # Allow 10s to connect, then 1s for response. Nothing should
1084 # work, and everything should take at least 1s to return.
1085 kc = self.keepClient(timeouts=(10, 1))
1086 with self.assertTakesBetween(1, 9):
1087 with self.assertRaises(arvados.errors.KeepReadError):
1088 kc.get(loc, num_retries=0)
1089 with self.assertTakesBetween(1, 9):
1090 with self.assertRaises(arvados.errors.KeepWriteError):
1091 kc.put(self.DATA, copies=1, num_retries=0)
1093 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1094 class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
1098 DiskCacheBase.tearDown(self)
1100 def mock_disks_and_gateways(self, disks=3, gateways=1):
1102 'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
1103 'owner_uuid': 'zzzzz-tpzed-000000000000000',
1104 'service_host': 'gatewayhost{}'.format(i),
1105 'service_port': 12345,
1106 'service_ssl_flag': True,
1107 'service_type': 'gateway:test',
1108 } for i in range(gateways)]
1109 self.gateway_roots = [
1110 "https://{service_host}:{service_port}/".format(**gw)
1111 for gw in self.gateways]
1112 self.api_client = self.mock_keep_services(
1113 count=disks, additional_services=self.gateways)
1114 self.keepClient = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
1116 @mock.patch('pycurl.Curl')
1117 def test_get_with_gateway_hint_first(self, MockCurl):
1118 MockCurl.return_value = tutil.FakeCurl.make(
1119 code=200, body='foo', headers={'Content-Length': 3})
1120 self.mock_disks_and_gateways()
1121 locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@' + self.gateways[0]['uuid']
1122 self.assertEqual(b'foo', self.keepClient.get(locator))
1123 self.assertEqual(self.gateway_roots[0]+locator,
1124 MockCurl.return_value.getopt(pycurl.URL).decode())
1125 self.assertEqual(True, self.keepClient.head(locator))
1127 @mock.patch('pycurl.Curl')
1128 def test_get_with_gateway_hints_in_order(self, MockCurl):
1132 tutil.FakeCurl.make(code=404, body='')
1133 for _ in range(gateways+disks)
1135 MockCurl.side_effect = tutil.queue_with(mocks)
1136 self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1137 locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1138 ['K@'+gw['uuid'] for gw in self.gateways])
1139 with self.assertRaises(arvados.errors.NotFoundError):
1140 self.keepClient.get(locator)
1141 # Gateways are tried first, in the order given.
1142 for i, root in enumerate(self.gateway_roots):
1143 self.assertEqual(root+locator,
1144 mocks[i].getopt(pycurl.URL).decode())
1145 # Disk services are tried next.
1146 for i in range(gateways, gateways+disks):
1148 mocks[i].getopt(pycurl.URL).decode(),
1151 @mock.patch('pycurl.Curl')
1152 def test_head_with_gateway_hints_in_order(self, MockCurl):
1156 tutil.FakeCurl.make(code=404, body=b'')
1157 for _ in range(gateways+disks)
1159 MockCurl.side_effect = tutil.queue_with(mocks)
1160 self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1161 locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1162 ['K@'+gw['uuid'] for gw in self.gateways])
1163 with self.assertRaises(arvados.errors.NotFoundError):
1164 self.keepClient.head(locator)
1165 # Gateways are tried first, in the order given.
1166 for i, root in enumerate(self.gateway_roots):
1167 self.assertEqual(root+locator,
1168 mocks[i].getopt(pycurl.URL).decode())
1169 # Disk services are tried next.
1170 for i in range(gateways, gateways+disks):
1172 mocks[i].getopt(pycurl.URL).decode(),
1175 @mock.patch('pycurl.Curl')
1176 def test_get_with_remote_proxy_hint(self, MockCurl):
1177 MockCurl.return_value = tutil.FakeCurl.make(
1178 code=200, body=b'foo', headers={'Content-Length': 3})
1179 self.mock_disks_and_gateways()
1180 locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1181 self.assertEqual(b'foo', self.keepClient.get(locator))
1182 self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1183 MockCurl.return_value.getopt(pycurl.URL).decode())
1185 @mock.patch('pycurl.Curl')
1186 def test_head_with_remote_proxy_hint(self, MockCurl):
1187 MockCurl.return_value = tutil.FakeCurl.make(
1188 code=200, body=b'foo', headers={'Content-Length': 3})
1189 self.mock_disks_and_gateways()
1190 locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1191 self.assertEqual(True, self.keepClient.head(locator))
1192 self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1193 MockCurl.return_value.getopt(pycurl.URL).decode())
1195 class KeepClientRetryTestMixin(object):
1198 # Testing with a local Keep store won't exercise the retry behavior.
1199 # Instead, our strategy is:
1200 # * Create a client with one proxy specified (pointed at a black
1201 # hole), so there's no need to instantiate an API client, and
1202 # all HTTP requests come from one place.
1203 # * Mock httplib's request method to provide simulated responses.
1204 # This lets us test the retry logic extensively without relying on any
1205 # supporting servers, and prevents side effects in case something hiccups.
1206 # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
1209 # Test classes must define TEST_PATCHER to a method that mocks
1210 # out appropriate methods in the client.
1212 PROXY_ADDR = 'http://[%s]:65535/' % (tutil.TEST_HOST,)
1213 TEST_DATA = b'testdata'
1214 TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
1217 self.client_kwargs = {'proxy': self.PROXY_ADDR, 'local_store': ''}
1219 def new_client(self, **caller_kwargs):
1220 kwargs = self.client_kwargs.copy()
1221 kwargs.update(caller_kwargs)
1222 kwargs['block_cache'] = self.make_block_cache(self.disk_cache)
1223 return arvados.KeepClient(**kwargs)
1225 def run_method(self, *args, **kwargs):
1226 raise NotImplementedError("test subclasses must define run_method")
1228 def check_success(self, expected=None, *args, **kwargs):
1229 if expected is None:
1230 expected = self.DEFAULT_EXPECT
1231 self.assertEqual(expected, self.run_method(*args, **kwargs))
1233 def check_exception(self, error_class=None, *args, **kwargs):
1234 if error_class is None:
1235 error_class = self.DEFAULT_EXCEPTION
1236 with self.assertRaises(error_class) as err:
1237 self.run_method(*args, **kwargs)
1240 def test_immediate_success(self):
1241 with self.TEST_PATCHER(self.DEFAULT_EXPECT, 200):
1242 self.check_success()
1244 def test_retry_then_success(self):
1245 with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1246 self.check_success(num_retries=3)
1248 def test_exception_then_success(self):
1249 with self.TEST_PATCHER(self.DEFAULT_EXPECT, Exception('mock err'), 200):
1250 self.check_success(num_retries=3)
1252 def test_no_default_retry(self):
1253 with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1254 self.check_exception()
1256 def test_no_retry_after_permanent_error(self):
1257 with self.TEST_PATCHER(self.DEFAULT_EXPECT, 403, 200):
1258 self.check_exception(num_retries=3)
1260 def test_error_after_retries_exhausted(self):
1261 with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 500, 200):
1262 err = self.check_exception(num_retries=1)
1263 self.assertRegex(str(err.exception), r'failed to .* after 2 attempts')
1265 def test_num_retries_instance_fallback(self):
1266 self.client_kwargs['num_retries'] = 3
1267 with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1268 self.check_success()
1272 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1273 class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1274 DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
1275 DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1276 HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1277 TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1280 DiskCacheBase.tearDown(self)
1282 def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1284 return self.new_client().get(locator, *args, **kwargs)
1286 def test_specific_exception_when_not_found(self):
1287 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1288 self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1290 def test_general_exception_with_mixed_errors(self):
1291 # get should raise a NotFoundError if no server returns the block,
1292 # and a high threshold of servers report that it's not found.
1293 # This test rigs up 50/50 disagreement between two servers, and
1294 # checks that it does not become a NotFoundError.
1295 client = self.new_client()
1296 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1297 with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1298 client.get(self.HINTED_LOCATOR)
1299 self.assertNotIsInstance(
1300 exc_check.exception, arvados.errors.NotFoundError,
1301 "mixed errors raised NotFoundError")
1303 def test_hint_server_can_succeed_without_retries(self):
1304 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1305 self.check_success(locator=self.HINTED_LOCATOR)
1307 def test_try_next_server_after_timeout(self):
1308 with tutil.mock_keep_responses(
1309 (socket.timeout("timed out"), 200),
1310 (self.DEFAULT_EXPECT, 200)):
1311 self.check_success(locator=self.HINTED_LOCATOR)
1313 def test_retry_data_with_wrong_checksum(self):
1314 with tutil.mock_keep_responses(
1316 (self.DEFAULT_EXPECT, 200)):
1317 self.check_success(locator=self.HINTED_LOCATOR)
1320 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1321 class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1322 DEFAULT_EXPECT = True
1323 DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1324 HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1325 TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1328 DiskCacheBase.tearDown(self)
1330 def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1332 return self.new_client().head(locator, *args, **kwargs)
1334 def test_specific_exception_when_not_found(self):
1335 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1336 self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1338 def test_general_exception_with_mixed_errors(self):
1339 # head should raise a NotFoundError if no server returns the block,
1340 # and a high threshold of servers report that it's not found.
1341 # This test rigs up 50/50 disagreement between two servers, and
1342 # checks that it does not become a NotFoundError.
1343 client = self.new_client()
1344 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1345 with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1346 client.head(self.HINTED_LOCATOR)
1347 self.assertNotIsInstance(
1348 exc_check.exception, arvados.errors.NotFoundError,
1349 "mixed errors raised NotFoundError")
1351 def test_hint_server_can_succeed_without_retries(self):
1352 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1353 self.check_success(locator=self.HINTED_LOCATOR)
1355 def test_try_next_server_after_timeout(self):
1356 with tutil.mock_keep_responses(
1357 (socket.timeout("timed out"), 200),
1358 (self.DEFAULT_EXPECT, 200)):
1359 self.check_success(locator=self.HINTED_LOCATOR)
1362 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1363 class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1364 DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
1365 DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
1366 TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1369 DiskCacheBase.tearDown(self)
1371 def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
1372 copies=1, *args, **kwargs):
1373 return self.new_client().put(data, copies, *args, **kwargs)
1375 def test_do_not_send_multiple_copies_to_same_server(self):
1376 with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200):
1377 self.check_exception(copies=2, num_retries=3)
1380 class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
1382 class FakeKeepService(object):
1383 def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
1385 self.will_succeed = will_succeed
1386 self.will_raise = will_raise
1388 self._result['headers'] = {}
1389 self._result['headers']['x-keep-replicas-stored'] = str(replicas)
1390 self._result['headers']['x-keep-storage-classes-confirmed'] = 'default={}'.format(replicas)
1391 self._result['body'] = 'foobar'
1393 def put(self, data_hash, data, timeout, headers):
1394 time.sleep(self.delay)
1395 if self.will_raise is not None:
1396 raise self.will_raise
1397 return self.will_succeed
1399 def last_result(self):
1400 if self.will_succeed:
1403 return {"status_code": 500, "body": "didn't succeed"}
1410 self.pool = arvados.KeepClient.KeepWriterThreadPool(
1412 data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1413 max_service_replicas = self.copies,
1414 copies = self.copies
1417 def test_only_write_enough_on_success(self):
1419 ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1420 self.pool.add_task(ks, None)
1422 self.assertEqual(self.pool.done(), (self.copies, []))
1424 def test_only_write_enough_on_partial_success(self):
1426 ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
1427 self.pool.add_task(ks, None)
1428 ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1429 self.pool.add_task(ks, None)
1431 self.assertEqual(self.pool.done(), (self.copies, []))
1433 def test_only_write_enough_when_some_crash(self):
1435 ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1436 self.pool.add_task(ks, None)
1437 ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1438 self.pool.add_task(ks, None)
1440 self.assertEqual(self.pool.done(), (self.copies, []))
1442 def test_fail_when_too_many_crash(self):
1443 for i in range(self.copies+1):
1444 ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1445 self.pool.add_task(ks, None)
1446 for i in range(self.copies-1):
1447 ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1448 self.pool.add_task(ks, None)
1450 self.assertEqual(self.pool.done(), (self.copies-1, []))
1454 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1455 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
1458 # Test put()s that need two distinct servers to succeed, possibly
1459 # requiring multiple passes through the retry loop.
1462 self.api_client = self.mock_keep_services(count=2)
1463 self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
1466 DiskCacheBase.tearDown(self)
1468 def test_success_after_exception(self):
1469 with tutil.mock_keep_responses(
1470 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1471 Exception('mock err'), 200, 200) as req_mock:
1472 self.keep_client.put('foo', num_retries=1, copies=2)
1473 self.assertEqual(3, req_mock.call_count)
1475 def test_success_after_retryable_error(self):
1476 with tutil.mock_keep_responses(
1477 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1478 500, 200, 200) as req_mock:
1479 self.keep_client.put('foo', num_retries=1, copies=2)
1480 self.assertEqual(3, req_mock.call_count)
1482 def test_fail_after_final_error(self):
1483 # First retry loop gets a 200 (can't achieve replication by
1484 # storing again on that server) and a 400 (can't retry that
1485 # server at all), so we shouldn't try a third request.
1486 with tutil.mock_keep_responses(
1487 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1488 200, 400, 200) as req_mock:
1489 with self.assertRaises(arvados.errors.KeepWriteError):
1490 self.keep_client.put('foo', num_retries=1, copies=2)
1491 self.assertEqual(2, req_mock.call_count)
1493 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1494 class KeepClientAPIErrorTest(unittest.TestCase, DiskCacheBase):
1498 DiskCacheBase.tearDown(self)
1500 def test_api_fail(self):
1501 class ApiMock(object):
1502 def __getattr__(self, r):
1503 if r == "api_token":
1505 elif r == "insecure":
1510 raise arvados.errors.KeepReadError()
1511 keep_client = arvados.KeepClient(api_client=ApiMock(),
1512 proxy='', local_store='',
1513 block_cache=self.make_block_cache(self.disk_cache))
1515 # The bug this is testing for is that if an API (not
1516 # keepstore) exception is thrown as part of a get(), the next
1517 # attempt to get that same block will result in a deadlock.
1518 # This is why there are two get()s in a row. Unfortunately,
1519 # the failure mode for this test is that the test suite
1520 # deadlocks, there isn't a good way to avoid that without
1521 # adding a special case that has no use except for this test.
1523 with self.assertRaises(arvados.errors.KeepReadError):
1524 keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1525 with self.assertRaises(arvados.errors.KeepReadError):
1526 keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1529 class KeepDiskCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
1531 self.api_client = self.mock_keep_services(count=2)
1532 self.data = b'xyzzy'
1533 self.locator = '1271ed5ef305aadabc605b1609e24c52'
1534 self.disk_cache_dir = tempfile.mkdtemp()
1537 shutil.rmtree(self.disk_cache_dir)
1540 @mock.patch('arvados.KeepClient.KeepService.get')
1541 def test_disk_cache_read(self, get_mock):
1542 # confirm it finds an existing cache block when the cache is
1545 os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1546 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1549 # block cache should have found the existing block
1550 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1551 disk_cache_dir=self.disk_cache_dir)
1552 keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1554 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1556 get_mock.assert_not_called()
1559 @mock.patch('arvados.KeepClient.KeepService.get')
1560 def test_disk_cache_share(self, get_mock):
1561 # confirm it finds a cache block written after the disk cache
1564 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1565 disk_cache_dir=self.disk_cache_dir)
1566 keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1568 os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1569 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1572 # when we try to get the block, it'll check the disk and find it.
1573 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1575 get_mock.assert_not_called()
1578 def test_disk_cache_write(self):
1579 # confirm the cache block was created
1581 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1582 disk_cache_dir=self.disk_cache_dir)
1583 keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1585 with tutil.mock_keep_responses(self.data, 200) as mock:
1586 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1588 self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1590 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1591 self.assertTrue(tutil.binary_compare(f.read(), self.data))
1594 def test_disk_cache_clean(self):
1595 # confirm that a tmp file in the cache is cleaned up
1597 os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1598 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock"), "wb") as f:
1601 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC"), "wb") as f:
1604 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC"), "wb") as f:
1607 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1608 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1609 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1611 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1612 disk_cache_dir=self.disk_cache_dir)
1614 # The tmp still hasn't been deleted because it was created in the last 60 seconds
1615 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1616 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1617 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1619 # Set the mtime to 61s in the past
1620 os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock"), times=(time.time()-61, time.time()-61))
1621 os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC"), times=(time.time()-61, time.time()-61))
1622 os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC"), times=(time.time()-61, time.time()-61))
1624 block_cache2 = arvados.keep.KeepBlockCache(disk_cache=True,
1625 disk_cache_dir=self.disk_cache_dir)
1627 # Tmp should be gone but the other ones are safe.
1628 self.assertFalse(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1629 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1630 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1633 @mock.patch('arvados.KeepClient.KeepService.get')
1634 def test_disk_cache_cap(self, get_mock):
1635 # confirm that the cache is kept to the desired limit
1637 os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1638 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1641 os.makedirs(os.path.join(self.disk_cache_dir, "acb"))
1642 with open(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock"), "wb") as f:
1645 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1646 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1648 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1649 disk_cache_dir=self.disk_cache_dir,
1652 self.assertFalse(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1653 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1656 @mock.patch('arvados.KeepClient.KeepService.get')
1657 def test_disk_cache_share(self, get_mock):
1658 # confirm that a second cache doesn't delete files that belong to the first cache.
1660 os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1661 with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1664 os.makedirs(os.path.join(self.disk_cache_dir, "acb"))
1665 with open(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock"), "wb") as f:
1668 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1669 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1671 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1672 disk_cache_dir=self.disk_cache_dir,
1675 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1676 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1678 block_cache2 = arvados.keep.KeepBlockCache(disk_cache=True,
1679 disk_cache_dir=self.disk_cache_dir,
1682 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1683 self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1687 def test_disk_cache_error(self):
1688 os.chmod(self.disk_cache_dir, stat.S_IRUSR)
1690 # Fail during cache initialization.
1691 with self.assertRaises(OSError):
1692 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1693 disk_cache_dir=self.disk_cache_dir)
1696 def test_disk_cache_write_error(self):
1697 block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1698 disk_cache_dir=self.disk_cache_dir)
1700 keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1702 # Make the cache dir read-only
1703 os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1704 os.chmod(os.path.join(self.disk_cache_dir, self.locator[0:3]), stat.S_IRUSR)
1707 with self.assertRaises(arvados.errors.KeepCacheError):
1708 with tutil.mock_keep_responses(self.data, 200) as mock:
1709 keep_client.get(self.locator)