15397: Remove Python2-only code.
[arvados.git] / sdk / python / tests / test_keep_client.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import hashlib
6 import os
7 import errno
8 import pycurl
9 import random
10 import re
11 import shutil
12 import socket
13 import sys
14 import stat
15 import tempfile
16 import time
17 import unittest
18 import urllib.parse
19 import mmap
20
21 from unittest import mock
22 from unittest.mock import patch
23
24 import parameterized
25
26 import arvados
27 import arvados.retry
28 import arvados.util
29 from . import arvados_testutil as tutil
30 from . import keepstub
31 from . import run_test_server
32
33 from .arvados_testutil import DiskCacheBase
34
35 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
36 class KeepTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
37     disk_cache = False
38     MAIN_SERVER = {}
39     KEEP_SERVER = {}
40     block_cache_test = None
41
42     @classmethod
43     def setUpClass(cls):
44         super(KeepTestCase, cls).setUpClass()
45         run_test_server.authorize_with("admin")
46         cls.api_client = arvados.api('v1')
47         cls.block_cache_test = DiskCacheBase()
48         cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
49                                              proxy='', local_store='',
50                                              block_cache=cls.block_cache_test.make_block_cache(cls.disk_cache))
51
52     @classmethod
53     def tearDownClass(cls):
54         super(KeepTestCase, cls).setUpClass()
55         cls.block_cache_test.tearDown()
56
57     def test_KeepBasicRWTest(self):
58         self.assertEqual(0, self.keep_client.upload_counter.get())
59         foo_locator = self.keep_client.put('foo')
60         self.assertRegex(
61             foo_locator,
62             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
63             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
64
65         # 6 bytes because uploaded 2 copies
66         self.assertEqual(6, self.keep_client.upload_counter.get())
67
68         self.assertEqual(0, self.keep_client.download_counter.get())
69         self.assertTrue(tutil.binary_compare(self.keep_client.get(foo_locator),
70                          b'foo'),
71                          'wrong content from Keep.get(md5("foo"))')
72         self.assertEqual(3, self.keep_client.download_counter.get())
73
74     def test_KeepBinaryRWTest(self):
75         blob_str = b'\xff\xfe\xf7\x00\x01\x02'
76         blob_locator = self.keep_client.put(blob_str)
77         self.assertRegex(
78             blob_locator,
79             '^7fc7c53b45e53926ba52821140fef396\+6',
80             ('wrong locator from Keep.put(<binarydata>):' + blob_locator))
81         self.assertEqual(self.keep_client.get(blob_locator),
82                          blob_str,
83                          'wrong content from Keep.get(md5(<binarydata>))')
84
85     def test_KeepLongBinaryRWTest(self):
86         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
87         for i in range(0, 23):
88             blob_data = blob_data + blob_data
89         blob_locator = self.keep_client.put(blob_data)
90         self.assertRegex(
91             blob_locator,
92             '^84d90fc0d8175dd5dcfab04b999bc956\+67108864',
93             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
94         self.assertEqual(self.keep_client.get(blob_locator),
95                          blob_data,
96                          'wrong content from Keep.get(md5(<binarydata>))')
97
98     @unittest.skip("unreliable test - please fix and close #8752")
99     def test_KeepSingleCopyRWTest(self):
100         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
101         blob_locator = self.keep_client.put(blob_data, copies=1)
102         self.assertRegex(
103             blob_locator,
104             '^c902006bc98a3eb4a3663b65ab4a6fab\+8',
105             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
106         self.assertEqual(self.keep_client.get(blob_locator),
107                          blob_data,
108                          'wrong content from Keep.get(md5(<binarydata>))')
109
110     def test_KeepEmptyCollectionTest(self):
111         blob_locator = self.keep_client.put('', copies=1)
112         self.assertRegex(
113             blob_locator,
114             '^d41d8cd98f00b204e9800998ecf8427e\+0',
115             ('wrong locator from Keep.put(""): ' + blob_locator))
116
117     def test_KeepPutDataType(self):
118         with self.assertRaises(AttributeError):
119             # Must be bytes or have an encode() method
120             self.keep_client.put({})
121
122     def test_KeepHeadTest(self):
123         locator = self.keep_client.put('test_head')
124         self.assertRegex(
125             locator,
126             '^b9a772c7049325feb7130fff1f8333e9\+9',
127             'wrong md5 hash from Keep.put for "test_head": ' + locator)
128         self.assertEqual(True, self.keep_client.head(locator))
129         self.assertEqual(self.keep_client.get(locator),
130                          b'test_head',
131                          'wrong content from Keep.get for "test_head"')
132
133 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
134 class KeepPermissionTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
135     disk_cache = False
136     MAIN_SERVER = {}
137     KEEP_SERVER = {'blob_signing': True}
138
139     def tearDown(self):
140         DiskCacheBase.tearDown(self)
141
142     def test_KeepBasicRWTest(self):
143         run_test_server.authorize_with('active')
144         keep_client = arvados.KeepClient(block_cache=self.make_block_cache(self.disk_cache))
145         foo_locator = keep_client.put('foo')
146         self.assertRegex(
147             foo_locator,
148             r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
149             'invalid locator from Keep.put("foo"): ' + foo_locator)
150         self.assertEqual(keep_client.get(foo_locator),
151                          b'foo',
152                          'wrong content from Keep.get(md5("foo"))')
153
154         # GET with an unsigned locator => bad request
155         bar_locator = keep_client.put('bar')
156         unsigned_bar_locator = "37b51d194a7513e45b56f6524f2d51f2+3"
157         self.assertRegex(
158             bar_locator,
159             r'^37b51d194a7513e45b56f6524f2d51f2\+3\+A[a-f0-9]+@[a-f0-9]+$',
160             'invalid locator from Keep.put("bar"): ' + bar_locator)
161         self.assertRaises(arvados.errors.KeepReadError,
162                           keep_client.get,
163                           unsigned_bar_locator)
164
165         # GET from a different user => bad request
166         run_test_server.authorize_with('spectator')
167         self.assertRaises(arvados.errors.KeepReadError,
168                           arvados.Keep.get,
169                           bar_locator)
170
171         # Unauthenticated GET for a signed locator => bad request
172         # Unauthenticated GET for an unsigned locator => bad request
173         keep_client.api_token = ''
174         self.assertRaises(arvados.errors.KeepReadError,
175                           keep_client.get,
176                           bar_locator)
177         self.assertRaises(arvados.errors.KeepReadError,
178                           keep_client.get,
179                           unsigned_bar_locator)
180
181 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
182 class KeepProxyTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
183     disk_cache = False
184     MAIN_SERVER = {}
185     KEEP_SERVER = {}
186     KEEP_PROXY_SERVER = {}
187
188     @classmethod
189     def setUpClass(cls):
190         super(KeepProxyTestCase, cls).setUpClass()
191         run_test_server.authorize_with('active')
192         cls.api_client = arvados.api('v1')
193
194     def tearDown(self):
195         super(KeepProxyTestCase, self).tearDown()
196         DiskCacheBase.tearDown(self)
197
198     def test_KeepProxyTest1(self):
199         # Will use ARVADOS_KEEP_SERVICES environment variable that
200         # is set by setUpClass().
201         keep_client = arvados.KeepClient(api_client=self.api_client,
202                                          local_store='', block_cache=self.make_block_cache(self.disk_cache))
203         baz_locator = keep_client.put('baz')
204         self.assertRegex(
205             baz_locator,
206             '^73feffa4b7f6bb68e44cf984c85f6e88\+3',
207             'wrong md5 hash from Keep.put("baz"): ' + baz_locator)
208         self.assertEqual(keep_client.get(baz_locator),
209                          b'baz',
210                          'wrong content from Keep.get(md5("baz"))')
211         self.assertTrue(keep_client.using_proxy)
212
213     def test_KeepProxyTestMultipleURIs(self):
214         # Test using ARVADOS_KEEP_SERVICES env var overriding any
215         # existing proxy setting and setting multiple proxies
216         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
217         keep_client = arvados.KeepClient(api_client=self.api_client,
218                                          local_store='',
219                                          block_cache=self.make_block_cache(self.disk_cache))
220         uris = [x['_service_root'] for x in keep_client._keep_services]
221         self.assertEqual(uris, ['http://10.0.0.1/',
222                                 'https://foo.example.org:1234/'])
223
224     def test_KeepProxyTestInvalidURI(self):
225         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org'
226         with self.assertRaises(arvados.errors.ArgumentError):
227             keep_client = arvados.KeepClient(api_client=self.api_client,
228                                              local_store='',
229                                              block_cache=self.make_block_cache(self.disk_cache))
230
231 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
232 class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
233     disk_cache = False
234
235     def tearDown(self):
236         DiskCacheBase.tearDown(self)
237
238     def get_service_roots(self, api_client):
239         keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
240         services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
241         return [urllib.parse.urlparse(url) for url in sorted(services)]
242
243     def test_ssl_flag_respected_in_roots(self):
244         for ssl_flag in [False, True]:
245             services = self.get_service_roots(self.mock_keep_services(
246                 service_ssl_flag=ssl_flag))
247             self.assertEqual(
248                 ('https' if ssl_flag else 'http'), services[0].scheme)
249
250     def test_correct_ports_with_ipv6_addresses(self):
251         service = self.get_service_roots(self.mock_keep_services(
252             service_type='proxy', service_host='100::1', service_port=10, count=1))[0]
253         self.assertEqual('100::1', service.hostname)
254         self.assertEqual(10, service.port)
255
256     def test_recognize_proxy_services_in_controller_response(self):
257         keep_client = arvados.KeepClient(api_client=self.mock_keep_services(
258             service_type='proxy', service_host='localhost', service_port=9, count=1),
259                                          block_cache=self.make_block_cache(self.disk_cache))
260         try:
261             # this will fail, but it ensures we get the service
262             # discovery response
263             keep_client.put('baz2', num_retries=0)
264         except:
265             pass
266         self.assertTrue(keep_client.using_proxy)
267
268     def test_insecure_disables_tls_verify(self):
269         api_client = self.mock_keep_services(count=1)
270         force_timeout = socket.timeout("timed out")
271
272         api_client.insecure = True
273         with tutil.mock_keep_responses(b'foo', 200) as mock:
274             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
275             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
276             self.assertEqual(
277                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
278                 0)
279             self.assertEqual(
280                 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
281                 0)
282
283         api_client.insecure = False
284         with tutil.mock_keep_responses(b'foo', 200) as mock:
285             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
286             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
287             # getopt()==None here means we didn't change the
288             # default. If we were using real pycurl instead of a mock,
289             # it would return the default value 1.
290             self.assertEqual(
291                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
292                 None)
293             self.assertEqual(
294                 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
295                 None)
296
297     def test_refresh_signature(self):
298         blk_digest = '6f5902ac237024bdd0c176cb93063dc4+11'
299         blk_sig = 'da39a3ee5e6b4b0d3255bfef95601890afd80709@53bed294'
300         local_loc = blk_digest+'+A'+blk_sig
301         remote_loc = blk_digest+'+R'+blk_sig
302         api_client = self.mock_keep_services(count=1)
303         headers = {'X-Keep-Locator':local_loc}
304         with tutil.mock_keep_responses('', 200, **headers):
305             # Check that the translated locator gets returned
306             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
307             self.assertEqual(local_loc, keep_client.refresh_signature(remote_loc))
308             # Check that refresh_signature() uses the correct method and headers
309             keep_client._get_or_head = mock.MagicMock()
310             keep_client.refresh_signature(remote_loc)
311             args, kwargs = keep_client._get_or_head.call_args_list[0]
312             self.assertIn(remote_loc, args)
313             self.assertEqual("HEAD", kwargs['method'])
314             self.assertIn('X-Keep-Signature', kwargs['headers'])
315
316     # test_*_timeout verify that KeepClient instructs pycurl to use
317     # the appropriate connection and read timeouts. They don't care
318     # whether pycurl actually exhibits the expected timeout behavior
319     # -- those tests are in the KeepClientTimeout test class.
320
321     def test_get_timeout(self):
322         api_client = self.mock_keep_services(count=1)
323         force_timeout = socket.timeout("timed out")
324         with tutil.mock_keep_responses(force_timeout, 0) as mock:
325             keep_client = arvados.KeepClient(
326                 api_client=api_client,
327                 block_cache=self.make_block_cache(self.disk_cache),
328                 num_retries=0,
329             )
330             with self.assertRaises(arvados.errors.KeepReadError):
331                 keep_client.get('ffffffffffffffffffffffffffffffff')
332             self.assertEqual(
333                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
334                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
335             self.assertEqual(
336                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
337                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
338             self.assertEqual(
339                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
340                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
341
342     def test_put_timeout(self):
343         api_client = self.mock_keep_services(count=1)
344         force_timeout = socket.timeout("timed out")
345         with tutil.mock_keep_responses(force_timeout, 0) as mock:
346             keep_client = arvados.KeepClient(
347                 api_client=api_client,
348                 block_cache=self.make_block_cache(self.disk_cache),
349                 num_retries=0,
350             )
351             with self.assertRaises(arvados.errors.KeepWriteError):
352                 keep_client.put(b'foo')
353             self.assertEqual(
354                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
355                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
356             self.assertEqual(
357                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
358                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
359             self.assertEqual(
360                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
361                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
362
363     def test_head_timeout(self):
364         api_client = self.mock_keep_services(count=1)
365         force_timeout = socket.timeout("timed out")
366         with tutil.mock_keep_responses(force_timeout, 0) as mock:
367             keep_client = arvados.KeepClient(
368                 api_client=api_client,
369                 block_cache=self.make_block_cache(self.disk_cache),
370                 num_retries=0,
371             )
372             with self.assertRaises(arvados.errors.KeepReadError):
373                 keep_client.head('ffffffffffffffffffffffffffffffff')
374             self.assertEqual(
375                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
376                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
377             self.assertEqual(
378                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
379                 None)
380             self.assertEqual(
381                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
382                 None)
383
384     def test_proxy_get_timeout(self):
385         api_client = self.mock_keep_services(service_type='proxy', count=1)
386         force_timeout = socket.timeout("timed out")
387         with tutil.mock_keep_responses(force_timeout, 0) as mock:
388             keep_client = arvados.KeepClient(
389                 api_client=api_client,
390                 block_cache=self.make_block_cache(self.disk_cache),
391                 num_retries=0,
392             )
393             with self.assertRaises(arvados.errors.KeepReadError):
394                 keep_client.get('ffffffffffffffffffffffffffffffff')
395             self.assertEqual(
396                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
397                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
398             self.assertEqual(
399                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
400                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
401             self.assertEqual(
402                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
403                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
404
405     def test_proxy_head_timeout(self):
406         api_client = self.mock_keep_services(service_type='proxy', count=1)
407         force_timeout = socket.timeout("timed out")
408         with tutil.mock_keep_responses(force_timeout, 0) as mock:
409             keep_client = arvados.KeepClient(
410                 api_client=api_client,
411                 block_cache=self.make_block_cache(self.disk_cache),
412                 num_retries=0,
413             )
414             with self.assertRaises(arvados.errors.KeepReadError):
415                 keep_client.head('ffffffffffffffffffffffffffffffff')
416             self.assertEqual(
417                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
418                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
419             self.assertEqual(
420                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
421                 None)
422             self.assertEqual(
423                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
424                 None)
425
426     def test_proxy_put_timeout(self):
427         self.disk_cache_dir = None
428         api_client = self.mock_keep_services(service_type='proxy', count=1)
429         force_timeout = socket.timeout("timed out")
430         with tutil.mock_keep_responses(force_timeout, 0) as mock:
431             keep_client = arvados.KeepClient(
432                 api_client=api_client,
433                 num_retries=0,
434             )
435             with self.assertRaises(arvados.errors.KeepWriteError):
436                 keep_client.put('foo')
437             self.assertEqual(
438                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
439                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
440             self.assertEqual(
441                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
442                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
443             self.assertEqual(
444                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
445                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
446
447     def check_no_services_error(self, verb, exc_class):
448         api_client = mock.MagicMock(name='api_client')
449         api_client.keep_services().accessible().execute.side_effect = (
450             arvados.errors.ApiError)
451         keep_client = arvados.KeepClient(
452             api_client=api_client,
453             block_cache=self.make_block_cache(self.disk_cache),
454             num_retries=0,
455         )
456         with self.assertRaises(exc_class) as err_check:
457             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
458         self.assertEqual(0, len(err_check.exception.request_errors()))
459
460     def test_get_error_with_no_services(self):
461         self.check_no_services_error('get', arvados.errors.KeepReadError)
462
463     def test_head_error_with_no_services(self):
464         self.check_no_services_error('head', arvados.errors.KeepReadError)
465
466     def test_put_error_with_no_services(self):
467         self.check_no_services_error('put', arvados.errors.KeepWriteError)
468
469     def check_errors_from_last_retry(self, verb, exc_class):
470         api_client = self.mock_keep_services(count=2)
471         req_mock = tutil.mock_keep_responses(
472             "retry error reporting test", 500, 500, 500, 500, 500, 500, 502, 502)
473         with req_mock, tutil.skip_sleep, \
474                 self.assertRaises(exc_class) as err_check:
475             keep_client = arvados.KeepClient(
476                 api_client=api_client,
477                 block_cache=self.make_block_cache(self.disk_cache),
478                 num_retries=0,
479             )
480             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
481                                        num_retries=3)
482         self.assertEqual([502, 502], [
483                 getattr(error, 'status_code', None)
484                 for error in err_check.exception.request_errors().values()])
485         self.assertRegex(str(err_check.exception), r'failed to (read|write) .* after 4 attempts')
486
487     def test_get_error_reflects_last_retry(self):
488         self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
489
490     def test_head_error_reflects_last_retry(self):
491         self.check_errors_from_last_retry('head', arvados.errors.KeepReadError)
492
493     def test_put_error_reflects_last_retry(self):
494         self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
495
496     def test_put_error_does_not_include_successful_puts(self):
497         data = 'partial failure test'
498         data_loc = tutil.str_keep_locator(data)
499         api_client = self.mock_keep_services(count=3)
500         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
501                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
502             keep_client = arvados.KeepClient(
503                 api_client=api_client,
504                 block_cache=self.make_block_cache(self.disk_cache),
505                 num_retries=0,
506             )
507             keep_client.put(data)
508         self.assertEqual(2, len(exc_check.exception.request_errors()))
509
510     def test_proxy_put_with_no_writable_services(self):
511         data = 'test with no writable services'
512         data_loc = tutil.str_keep_locator(data)
513         api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
514         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
515                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
516             keep_client = arvados.KeepClient(
517                 api_client=api_client,
518                 block_cache=self.make_block_cache(self.disk_cache),
519                 num_retries=0,
520             )
521             keep_client.put(data)
522         self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
523         self.assertEqual(0, len(exc_check.exception.request_errors()))
524
525     def test_oddball_service_get(self):
526         body = b'oddball service get'
527         api_client = self.mock_keep_services(service_type='fancynewblobstore')
528         with tutil.mock_keep_responses(body, 200):
529             keep_client = arvados.KeepClient(
530                 api_client=api_client,
531                 block_cache=self.make_block_cache(self.disk_cache),
532                 num_retries=0,
533             )
534             actual = keep_client.get(tutil.str_keep_locator(body))
535         self.assertEqual(body, actual)
536
537     def test_oddball_service_put(self):
538         body = b'oddball service put'
539         pdh = tutil.str_keep_locator(body)
540         api_client = self.mock_keep_services(service_type='fancynewblobstore')
541         with tutil.mock_keep_responses(pdh, 200):
542             keep_client = arvados.KeepClient(
543                 api_client=api_client,
544                 block_cache=self.make_block_cache(self.disk_cache),
545                 num_retries=0,
546             )
547             actual = keep_client.put(body, copies=1)
548         self.assertEqual(pdh, actual)
549
550     def test_oddball_service_writer_count(self):
551         body = b'oddball service writer count'
552         pdh = tutil.str_keep_locator(body)
553         api_client = self.mock_keep_services(service_type='fancynewblobstore',
554                                              count=4)
555         headers = {'x-keep-replicas-stored': 3}
556         with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
557                                        **headers) as req_mock:
558             keep_client = arvados.KeepClient(
559                 api_client=api_client,
560                 block_cache=self.make_block_cache(self.disk_cache),
561                 num_retries=0,
562             )
563             actual = keep_client.put(body, copies=2)
564         self.assertEqual(pdh, actual)
565         self.assertEqual(1, req_mock.call_count)
566
567 @tutil.skip_sleep
568 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
569 class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
570     disk_cache = False
571
572     def setUp(self):
573         self.api_client = self.mock_keep_services(count=2)
574         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
575         self.data = b'xyzzy'
576         self.locator = '1271ed5ef305aadabc605b1609e24c52'
577
578     def tearDown(self):
579         DiskCacheBase.tearDown(self)
580
581     @mock.patch('arvados.KeepClient.KeepService.get')
582     def test_get_request_cache(self, get_mock):
583         with tutil.mock_keep_responses(self.data, 200, 200):
584             self.keep_client.get(self.locator)
585             self.keep_client.get(self.locator)
586         # Request already cached, don't require more than one request
587         get_mock.assert_called_once()
588
589     @mock.patch('arvados.KeepClient.KeepService.get')
590     def test_head_request_cache(self, get_mock):
591         with tutil.mock_keep_responses(self.data, 200, 200):
592             self.keep_client.head(self.locator)
593             self.keep_client.head(self.locator)
594         # Don't cache HEAD requests so that they're not confused with GET reqs
595         self.assertEqual(2, get_mock.call_count)
596
597     @mock.patch('arvados.KeepClient.KeepService.get')
598     def test_head_and_then_get_return_different_responses(self, get_mock):
599         head_resp = None
600         get_resp = None
601         get_mock.side_effect = [b'first response', b'second response']
602         with tutil.mock_keep_responses(self.data, 200, 200):
603             head_resp = self.keep_client.head(self.locator)
604             get_resp = self.keep_client.get(self.locator)
605         self.assertEqual(b'first response', head_resp)
606         # First reponse was not cached because it was from a HEAD request.
607         self.assertNotEqual(head_resp, get_resp)
608
609
610
611
612
613 @tutil.skip_sleep
614 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
615 class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
616     disk_cache = False
617
618     def setUp(self):
619         self.api_client = self.mock_keep_services(count=2)
620         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
621         self.data = b'xyzzy'
622         self.locator = '1271ed5ef305aadabc605b1609e24c52'
623         self.test_id = arvados.util.new_request_id()
624         self.assertRegex(self.test_id, r'^req-[a-z0-9]{20}$')
625         # If we don't set request_id to None explicitly here, it will
626         # return <MagicMock name='api_client_mock.request_id'
627         # id='123456789'>:
628         self.api_client.request_id = None
629
630     def tearDown(self):
631         DiskCacheBase.tearDown(self)
632
633     def test_default_to_api_client_request_id(self):
634         self.api_client.request_id = self.test_id
635         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
636             self.keep_client.put(self.data)
637         self.assertEqual(2, len(mock.responses))
638         for resp in mock.responses:
639             self.assertProvidedRequestId(resp)
640
641         with tutil.mock_keep_responses(self.data, 200) as mock:
642             self.keep_client.get(self.locator)
643         self.assertProvidedRequestId(mock.responses[0])
644
645         with tutil.mock_keep_responses(b'', 200) as mock:
646             self.keep_client.head(self.locator)
647         self.assertProvidedRequestId(mock.responses[0])
648
649     def test_explicit_request_id(self):
650         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
651             self.keep_client.put(self.data, request_id=self.test_id)
652         self.assertEqual(2, len(mock.responses))
653         for resp in mock.responses:
654             self.assertProvidedRequestId(resp)
655
656         with tutil.mock_keep_responses(self.data, 200) as mock:
657             self.keep_client.get(self.locator, request_id=self.test_id)
658         self.assertProvidedRequestId(mock.responses[0])
659
660         with tutil.mock_keep_responses(b'', 200) as mock:
661             self.keep_client.head(self.locator, request_id=self.test_id)
662         self.assertProvidedRequestId(mock.responses[0])
663
664     def test_automatic_request_id(self):
665         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
666             self.keep_client.put(self.data)
667         self.assertEqual(2, len(mock.responses))
668         for resp in mock.responses:
669             self.assertAutomaticRequestId(resp)
670
671         with tutil.mock_keep_responses(self.data, 200) as mock:
672             self.keep_client.get(self.locator)
673         self.assertAutomaticRequestId(mock.responses[0])
674
675         with tutil.mock_keep_responses(b'', 200) as mock:
676             self.keep_client.head(self.locator)
677         self.assertAutomaticRequestId(mock.responses[0])
678
679     def test_request_id_in_exception(self):
680         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
681             with self.assertRaisesRegex(arvados.errors.KeepReadError, self.test_id):
682                 self.keep_client.head(self.locator, request_id=self.test_id)
683
684         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
685             with self.assertRaisesRegex(arvados.errors.KeepReadError, r'req-[a-z0-9]{20}'):
686                 self.keep_client.get(self.locator)
687
688         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
689             with self.assertRaisesRegex(arvados.errors.KeepWriteError, self.test_id):
690                 self.keep_client.put(self.data, request_id=self.test_id)
691
692         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
693             with self.assertRaisesRegex(arvados.errors.KeepWriteError, r'req-[a-z0-9]{20}'):
694                 self.keep_client.put(self.data)
695
696     def assertAutomaticRequestId(self, resp):
697         hdr = [x for x in resp.getopt(pycurl.HTTPHEADER)
698                if x.startswith('X-Request-Id: ')][0]
699         self.assertNotEqual(hdr, 'X-Request-Id: '+self.test_id)
700         self.assertRegex(hdr, r'^X-Request-Id: req-[a-z0-9]{20}$')
701
702     def assertProvidedRequestId(self, resp):
703         self.assertIn('X-Request-Id: '+self.test_id,
704                       resp.getopt(pycurl.HTTPHEADER))
705
706
707 @tutil.skip_sleep
708 #@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
709 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
710     disk_cache = False
711
712     def setUp(self):
713         # expected_order[i] is the probe order for
714         # hash=md5(sprintf("%064x",i)) where there are 16 services
715         # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
716         # the first probe for the block consisting of 64 "0"
717         # characters is the service whose uuid is
718         # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
719         self.services = 16
720         self.expected_order = [
721             list('3eab2d5fc9681074'),
722             list('097dba52e648f1c3'),
723             list('c5b4e023f8a7d691'),
724             list('9d81c02e76a3bf54'),
725             ]
726         self.blocks = [
727             "{:064x}".format(x).encode()
728             for x in range(len(self.expected_order))]
729         self.hashes = [
730             hashlib.md5(self.blocks[x]).hexdigest()
731             for x in range(len(self.expected_order))]
732         self.api_client = self.mock_keep_services(count=self.services)
733         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
734
735     def tearDown(self):
736         DiskCacheBase.tearDown(self)
737
738     def test_weighted_service_roots_against_reference_set(self):
739         # Confirm weighted_service_roots() returns the correct order
740         for i, hash in enumerate(self.hashes):
741             roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash))
742             got_order = [
743                 re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
744                 for root in roots]
745             self.assertEqual(self.expected_order[i], got_order)
746
747     def test_get_probe_order_against_reference_set(self):
748         self._test_probe_order_against_reference_set(
749             lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
750
751     def test_head_probe_order_against_reference_set(self):
752         self._test_probe_order_against_reference_set(
753             lambda i: self.keep_client.head(self.hashes[i], num_retries=1))
754
755     def test_put_probe_order_against_reference_set(self):
756         # copies=1 prevents the test from being sensitive to races
757         # between writer threads.
758         self._test_probe_order_against_reference_set(
759             lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1))
760
761     def _test_probe_order_against_reference_set(self, op):
762         for i in range(len(self.blocks)):
763             with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \
764                  self.assertRaises(arvados.errors.KeepRequestError):
765                 op(i)
766             got_order = [
767                 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
768                 for resp in mock.responses]
769             self.assertEqual(self.expected_order[i]*2, got_order)
770
771     def test_put_probe_order_multiple_copies(self):
772         for copies in range(2, 4):
773             for i in range(len(self.blocks)):
774                 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \
775                      self.assertRaises(arvados.errors.KeepWriteError):
776                     self.keep_client.put(self.blocks[i], num_retries=2, copies=copies)
777                 got_order = [
778                     re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
779                     for resp in mock.responses]
780                 # With T threads racing to make requests, the position
781                 # of a given server in the sequence of HTTP requests
782                 # (got_order) cannot be more than T-1 positions
783                 # earlier than that server's position in the reference
784                 # probe sequence (expected_order).
785                 #
786                 # Loop invariant: we have accounted for +pos+ expected
787                 # probes, either by seeing them in +got_order+ or by
788                 # putting them in +pending+ in the hope of seeing them
789                 # later. As long as +len(pending)<T+, we haven't
790                 # started a request too early.
791                 pending = []
792                 for pos, expected in enumerate(self.expected_order[i]*3):
793                     got = got_order[pos-len(pending)]
794                     while got in pending:
795                         del pending[pending.index(got)]
796                         got = got_order[pos-len(pending)]
797                     if got != expected:
798                         pending.append(expected)
799                         self.assertLess(
800                             len(pending), copies,
801                             "pending={}, with copies={}, got {}, expected {}".format(
802                                 pending, copies, repr(got_order), repr(self.expected_order[i]*3)))
803
804     def test_probe_waste_adding_one_server(self):
805         hashes = [
806             hashlib.md5("{:064x}".format(x).encode()).hexdigest() for x in range(100)]
807         initial_services = 12
808         self.api_client = self.mock_keep_services(count=initial_services)
809         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
810         probes_before = [
811             self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
812         for added_services in range(1, 12):
813             api_client = self.mock_keep_services(count=initial_services+added_services)
814             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
815             total_penalty = 0
816             for hash_index in range(len(hashes)):
817                 probe_after = keep_client.weighted_service_roots(
818                     arvados.KeepLocator(hashes[hash_index]))
819                 penalty = probe_after.index(probes_before[hash_index][0])
820                 self.assertLessEqual(penalty, added_services)
821                 total_penalty += penalty
822             # Average penalty per block should not exceed
823             # N(added)/N(orig) by more than 20%, and should get closer
824             # to the ideal as we add data points.
825             expect_penalty = (
826                 added_services *
827                 len(hashes) / initial_services)
828             max_penalty = (
829                 expect_penalty *
830                 (120 - added_services)/100)
831             min_penalty = (
832                 expect_penalty * 8/10)
833             self.assertTrue(
834                 min_penalty <= total_penalty <= max_penalty,
835                 "With {}+{} services, {} blocks, penalty {} but expected {}..{}".format(
836                     initial_services,
837                     added_services,
838                     len(hashes),
839                     total_penalty,
840                     min_penalty,
841                     max_penalty))
842
843     def check_64_zeros_error_order(self, verb, exc_class):
844         data = b'0' * 64
845         if verb == 'get':
846             data = tutil.str_keep_locator(data)
847         # Arbitrary port number:
848         aport = random.randint(1024,65535)
849         api_client = self.mock_keep_services(service_port=aport, count=self.services)
850         keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
851         with mock.patch('pycurl.Curl') as curl_mock, \
852              self.assertRaises(exc_class) as err_check:
853             curl_mock.return_value = tutil.FakeCurl.make(code=500, body=b'')
854             getattr(keep_client, verb)(data)
855         urls = [urllib.parse.urlparse(url)
856                 for url in err_check.exception.request_errors()]
857         self.assertEqual([('keep0x' + c, aport) for c in '3eab2d5fc9681074'],
858                          [(url.hostname, url.port) for url in urls])
859
860     def test_get_error_shows_probe_order(self):
861         self.check_64_zeros_error_order('get', arvados.errors.KeepReadError)
862
863     def test_put_error_shows_probe_order(self):
864         self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
865
866 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
867 class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase, DiskCacheBase):
868     disk_cache = False
869
870     # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
871     # 1s worth of data and then trigger bandwidth errors before running
872     # out of data.
873     DATA = b'x'*2**11
874     BANDWIDTH_LOW_LIM = 1024
875     TIMEOUT_TIME = 1.0
876
877     def tearDown(self):
878         DiskCacheBase.tearDown(self)
879
880     class assertTakesBetween(unittest.TestCase):
881         def __init__(self, tmin, tmax):
882             self.tmin = tmin
883             self.tmax = tmax
884
885         def __enter__(self):
886             self.t0 = time.time()
887
888         def __exit__(self, *args, **kwargs):
889             # Round times to milliseconds, like CURL. Otherwise, we
890             # fail when CURL reaches a 1s timeout at 0.9998s.
891             delta = round(time.time() - self.t0, 3)
892             self.assertGreaterEqual(delta, self.tmin)
893             self.assertLessEqual(delta, self.tmax)
894
895     class assertTakesGreater(unittest.TestCase):
896         def __init__(self, tmin):
897             self.tmin = tmin
898
899         def __enter__(self):
900             self.t0 = time.time()
901
902         def __exit__(self, *args, **kwargs):
903             delta = round(time.time() - self.t0, 3)
904             self.assertGreaterEqual(delta, self.tmin)
905
906     def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
907         return arvados.KeepClient(
908             api_client=self.api_client,
909             timeout=timeouts, block_cache=self.make_block_cache(self.disk_cache))
910
911     def test_timeout_slow_connect(self):
912         # Can't simulate TCP delays with our own socket. Leave our
913         # stub server running uselessly, and try to connect to an
914         # unroutable IP address instead.
915         self.api_client = self.mock_keep_services(
916             count=1,
917             service_host='240.0.0.0',
918         )
919         with self.assertTakesBetween(0.1, 0.5):
920             with self.assertRaises(arvados.errors.KeepWriteError):
921                 self.keepClient().put(self.DATA, copies=1, num_retries=0)
922
923     def test_low_bandwidth_no_delays_success(self):
924         self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
925         kc = self.keepClient()
926         loc = kc.put(self.DATA, copies=1, num_retries=0)
927         self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
928
929     def test_too_low_bandwidth_no_delays_failure(self):
930         # Check that lessening bandwidth corresponds to failing
931         kc = self.keepClient()
932         loc = kc.put(self.DATA, copies=1, num_retries=0)
933         self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
934         with self.assertTakesGreater(self.TIMEOUT_TIME):
935             with self.assertRaises(arvados.errors.KeepReadError):
936                 kc.get(loc, num_retries=0)
937         with self.assertTakesGreater(self.TIMEOUT_TIME):
938             with self.assertRaises(arvados.errors.KeepWriteError):
939                 kc.put(self.DATA, copies=1, num_retries=0)
940
941     def test_low_bandwidth_with_server_response_delay_failure(self):
942         kc = self.keepClient()
943         loc = kc.put(self.DATA, copies=1, num_retries=0)
944         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
945         # Note the actual delay must be 1s longer than the low speed
946         # limit interval in order for curl to detect it reliably.
947         self.server.setdelays(response=self.TIMEOUT_TIME+1)
948         with self.assertTakesGreater(self.TIMEOUT_TIME):
949             with self.assertRaises(arvados.errors.KeepReadError):
950                 kc.get(loc, num_retries=0)
951         with self.assertTakesGreater(self.TIMEOUT_TIME):
952             with self.assertRaises(arvados.errors.KeepWriteError):
953                 kc.put(self.DATA, copies=1, num_retries=0)
954         with self.assertTakesGreater(self.TIMEOUT_TIME):
955             kc.head(loc, num_retries=0)
956
957     def test_low_bandwidth_with_server_mid_delay_failure(self):
958         kc = self.keepClient()
959         loc = kc.put(self.DATA, copies=1, num_retries=0)
960         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
961         # Note the actual delay must be 1s longer than the low speed
962         # limit interval in order for curl to detect it reliably.
963         self.server.setdelays(mid_write=self.TIMEOUT_TIME+1, mid_read=self.TIMEOUT_TIME+1)
964         with self.assertTakesGreater(self.TIMEOUT_TIME):
965             with self.assertRaises(arvados.errors.KeepReadError) as e:
966                 kc.get(loc, num_retries=0)
967         with self.assertTakesGreater(self.TIMEOUT_TIME):
968             with self.assertRaises(arvados.errors.KeepWriteError):
969                 kc.put(self.DATA, copies=1, num_retries=0)
970
971     def test_timeout_slow_request(self):
972         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
973         self.server.setdelays(request=.2)
974         self._test_connect_timeout_under_200ms(loc)
975         self.server.setdelays(request=2)
976         self._test_response_timeout_under_2s(loc)
977
978     def test_timeout_slow_response(self):
979         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
980         self.server.setdelays(response=.2)
981         self._test_connect_timeout_under_200ms(loc)
982         self.server.setdelays(response=2)
983         self._test_response_timeout_under_2s(loc)
984
985     def test_timeout_slow_response_body(self):
986         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
987         self.server.setdelays(response_body=.2)
988         self._test_connect_timeout_under_200ms(loc)
989         self.server.setdelays(response_body=2)
990         self._test_response_timeout_under_2s(loc)
991
992     def _test_connect_timeout_under_200ms(self, loc):
993         # Allow 100ms to connect, then 1s for response. Everything
994         # should work, and everything should take at least 200ms to
995         # return.
996         kc = self.keepClient(timeouts=(.1, 1))
997         with self.assertTakesBetween(.2, .3):
998             kc.put(self.DATA, copies=1, num_retries=0)
999         with self.assertTakesBetween(.2, .3):
1000             self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
1001
1002     def _test_response_timeout_under_2s(self, loc):
1003         # Allow 10s to connect, then 1s for response. Nothing should
1004         # work, and everything should take at least 1s to return.
1005         kc = self.keepClient(timeouts=(10, 1))
1006         with self.assertTakesBetween(1, 9):
1007             with self.assertRaises(arvados.errors.KeepReadError):
1008                 kc.get(loc, num_retries=0)
1009         with self.assertTakesBetween(1, 9):
1010             with self.assertRaises(arvados.errors.KeepWriteError):
1011                 kc.put(self.DATA, copies=1, num_retries=0)
1012
1013 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1014 class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
1015     disk_cache = False
1016
1017     def tearDown(self):
1018         DiskCacheBase.tearDown(self)
1019
1020     def mock_disks_and_gateways(self, disks=3, gateways=1):
1021         self.gateways = [{
1022                 'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
1023                 'owner_uuid': 'zzzzz-tpzed-000000000000000',
1024                 'service_host': 'gatewayhost{}'.format(i),
1025                 'service_port': 12345,
1026                 'service_ssl_flag': True,
1027                 'service_type': 'gateway:test',
1028         } for i in range(gateways)]
1029         self.gateway_roots = [
1030             "https://{service_host}:{service_port}/".format(**gw)
1031             for gw in self.gateways]
1032         self.api_client = self.mock_keep_services(
1033             count=disks, additional_services=self.gateways)
1034         self.keepClient = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
1035
1036     @mock.patch('pycurl.Curl')
1037     def test_get_with_gateway_hint_first(self, MockCurl):
1038         MockCurl.return_value = tutil.FakeCurl.make(
1039             code=200, body='foo', headers={'Content-Length': 3})
1040         self.mock_disks_and_gateways()
1041         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@' + self.gateways[0]['uuid']
1042         self.assertEqual(b'foo', self.keepClient.get(locator))
1043         self.assertEqual(self.gateway_roots[0]+locator,
1044                          MockCurl.return_value.getopt(pycurl.URL).decode())
1045         self.assertEqual(True, self.keepClient.head(locator))
1046
1047     @mock.patch('pycurl.Curl')
1048     def test_get_with_gateway_hints_in_order(self, MockCurl):
1049         gateways = 4
1050         disks = 3
1051         mocks = [
1052             tutil.FakeCurl.make(code=404, body='')
1053             for _ in range(gateways+disks)
1054         ]
1055         MockCurl.side_effect = tutil.queue_with(mocks)
1056         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1057         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1058                            ['K@'+gw['uuid'] for gw in self.gateways])
1059         with self.assertRaises(arvados.errors.NotFoundError):
1060             self.keepClient.get(locator)
1061         # Gateways are tried first, in the order given.
1062         for i, root in enumerate(self.gateway_roots):
1063             self.assertEqual(root+locator,
1064                              mocks[i].getopt(pycurl.URL).decode())
1065         # Disk services are tried next.
1066         for i in range(gateways, gateways+disks):
1067             self.assertRegex(
1068                 mocks[i].getopt(pycurl.URL).decode(),
1069                 r'keep0x')
1070
1071     @mock.patch('pycurl.Curl')
1072     def test_head_with_gateway_hints_in_order(self, MockCurl):
1073         gateways = 4
1074         disks = 3
1075         mocks = [
1076             tutil.FakeCurl.make(code=404, body=b'')
1077             for _ in range(gateways+disks)
1078         ]
1079         MockCurl.side_effect = tutil.queue_with(mocks)
1080         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1081         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1082                            ['K@'+gw['uuid'] for gw in self.gateways])
1083         with self.assertRaises(arvados.errors.NotFoundError):
1084             self.keepClient.head(locator)
1085         # Gateways are tried first, in the order given.
1086         for i, root in enumerate(self.gateway_roots):
1087             self.assertEqual(root+locator,
1088                              mocks[i].getopt(pycurl.URL).decode())
1089         # Disk services are tried next.
1090         for i in range(gateways, gateways+disks):
1091             self.assertRegex(
1092                 mocks[i].getopt(pycurl.URL).decode(),
1093                 r'keep0x')
1094
1095     @mock.patch('pycurl.Curl')
1096     def test_get_with_remote_proxy_hint(self, MockCurl):
1097         MockCurl.return_value = tutil.FakeCurl.make(
1098             code=200, body=b'foo', headers={'Content-Length': 3})
1099         self.mock_disks_and_gateways()
1100         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1101         self.assertEqual(b'foo', self.keepClient.get(locator))
1102         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1103                          MockCurl.return_value.getopt(pycurl.URL).decode())
1104
1105     @mock.patch('pycurl.Curl')
1106     def test_head_with_remote_proxy_hint(self, MockCurl):
1107         MockCurl.return_value = tutil.FakeCurl.make(
1108             code=200, body=b'foo', headers={'Content-Length': 3})
1109         self.mock_disks_and_gateways()
1110         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1111         self.assertEqual(True, self.keepClient.head(locator))
1112         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1113                          MockCurl.return_value.getopt(pycurl.URL).decode())
1114
1115 class KeepClientRetryTestMixin(object):
1116     disk_cache = False
1117
1118     # Testing with a local Keep store won't exercise the retry behavior.
1119     # Instead, our strategy is:
1120     # * Create a client with one proxy specified (pointed at a black
1121     #   hole), so there's no need to instantiate an API client, and
1122     #   all HTTP requests come from one place.
1123     # * Mock httplib's request method to provide simulated responses.
1124     # This lets us test the retry logic extensively without relying on any
1125     # supporting servers, and prevents side effects in case something hiccups.
1126     # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
1127     # run_method().
1128     #
1129     # Test classes must define TEST_PATCHER to a method that mocks
1130     # out appropriate methods in the client.
1131
1132     PROXY_ADDR = 'http://[%s]:65535/' % (tutil.TEST_HOST,)
1133     TEST_DATA = b'testdata'
1134     TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
1135
1136     def setUp(self):
1137         self.client_kwargs = {'proxy': self.PROXY_ADDR, 'local_store': ''}
1138
1139     def new_client(self, **caller_kwargs):
1140         kwargs = self.client_kwargs.copy()
1141         kwargs.update(caller_kwargs)
1142         kwargs['block_cache'] = self.make_block_cache(self.disk_cache)
1143         return arvados.KeepClient(**kwargs)
1144
1145     def run_method(self, *args, **kwargs):
1146         raise NotImplementedError("test subclasses must define run_method")
1147
1148     def check_success(self, expected=None, *args, **kwargs):
1149         if expected is None:
1150             expected = self.DEFAULT_EXPECT
1151         self.assertEqual(expected, self.run_method(*args, **kwargs))
1152
1153     def check_exception(self, error_class=None, *args, **kwargs):
1154         if error_class is None:
1155             error_class = self.DEFAULT_EXCEPTION
1156         with self.assertRaises(error_class) as err:
1157             self.run_method(*args, **kwargs)
1158         return err
1159
1160     def test_immediate_success(self):
1161         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 200):
1162             self.check_success()
1163
1164     def test_retry_then_success(self):
1165         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1166             self.check_success(num_retries=3)
1167
1168     def test_exception_then_success(self):
1169         with self.TEST_PATCHER(self.DEFAULT_EXPECT, Exception('mock err'), 200):
1170             self.check_success(num_retries=3)
1171
1172     def test_no_retry_after_permanent_error(self):
1173         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 403, 200):
1174             self.check_exception(num_retries=3)
1175
1176     def test_error_after_retries_exhausted(self):
1177         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 500, 200):
1178             err = self.check_exception(num_retries=1)
1179         self.assertRegex(str(err.exception), r'failed to .* after 2 attempts')
1180
1181     def test_num_retries_instance_fallback(self):
1182         self.client_kwargs['num_retries'] = 3
1183         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1184             self.check_success()
1185
1186
1187 @tutil.skip_sleep
1188 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1189 class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1190     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
1191     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1192     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1193     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1194
1195     def tearDown(self):
1196         DiskCacheBase.tearDown(self)
1197
1198     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1199                    *args, **kwargs):
1200         return self.new_client().get(locator, *args, **kwargs)
1201
1202     def test_specific_exception_when_not_found(self):
1203         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1204             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1205
1206     def test_general_exception_with_mixed_errors(self):
1207         # get should raise a NotFoundError if no server returns the block,
1208         # and a high threshold of servers report that it's not found.
1209         # This test rigs up 50/50 disagreement between two servers, and
1210         # checks that it does not become a NotFoundError.
1211         client = self.new_client(num_retries=0)
1212         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1213             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1214                 client.get(self.HINTED_LOCATOR)
1215             self.assertNotIsInstance(
1216                 exc_check.exception, arvados.errors.NotFoundError,
1217                 "mixed errors raised NotFoundError")
1218
1219     def test_hint_server_can_succeed_without_retries(self):
1220         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1221             self.check_success(locator=self.HINTED_LOCATOR)
1222
1223     def test_try_next_server_after_timeout(self):
1224         with tutil.mock_keep_responses(
1225                 (socket.timeout("timed out"), 200),
1226                 (self.DEFAULT_EXPECT, 200)):
1227             self.check_success(locator=self.HINTED_LOCATOR)
1228
1229     def test_retry_data_with_wrong_checksum(self):
1230         with tutil.mock_keep_responses(
1231                 ('baddata', 200),
1232                 (self.DEFAULT_EXPECT, 200)):
1233             self.check_success(locator=self.HINTED_LOCATOR)
1234
1235 @tutil.skip_sleep
1236 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1237 class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1238     DEFAULT_EXPECT = True
1239     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1240     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1241     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1242
1243     def tearDown(self):
1244         DiskCacheBase.tearDown(self)
1245
1246     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1247                    *args, **kwargs):
1248         return self.new_client().head(locator, *args, **kwargs)
1249
1250     def test_specific_exception_when_not_found(self):
1251         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1252             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1253
1254     def test_general_exception_with_mixed_errors(self):
1255         # head should raise a NotFoundError if no server returns the block,
1256         # and a high threshold of servers report that it's not found.
1257         # This test rigs up 50/50 disagreement between two servers, and
1258         # checks that it does not become a NotFoundError.
1259         client = self.new_client(num_retries=0)
1260         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1261             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1262                 client.head(self.HINTED_LOCATOR)
1263             self.assertNotIsInstance(
1264                 exc_check.exception, arvados.errors.NotFoundError,
1265                 "mixed errors raised NotFoundError")
1266
1267     def test_hint_server_can_succeed_without_retries(self):
1268         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1269             self.check_success(locator=self.HINTED_LOCATOR)
1270
1271     def test_try_next_server_after_timeout(self):
1272         with tutil.mock_keep_responses(
1273                 (socket.timeout("timed out"), 200),
1274                 (self.DEFAULT_EXPECT, 200)):
1275             self.check_success(locator=self.HINTED_LOCATOR)
1276
1277 @tutil.skip_sleep
1278 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1279 class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1280     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
1281     DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
1282     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1283
1284     def tearDown(self):
1285         DiskCacheBase.tearDown(self)
1286
1287     def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
1288                    copies=1, *args, **kwargs):
1289         return self.new_client().put(data, copies, *args, **kwargs)
1290
1291     def test_do_not_send_multiple_copies_to_same_server(self):
1292         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200):
1293             self.check_exception(copies=2, num_retries=3)
1294
1295
1296 class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
1297
1298     class FakeKeepService(object):
1299         def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
1300             self.delay = delay
1301             self.will_succeed = will_succeed
1302             self.will_raise = will_raise
1303             self._result = {}
1304             self._result['headers'] = {}
1305             self._result['headers']['x-keep-replicas-stored'] = str(replicas)
1306             self._result['headers']['x-keep-storage-classes-confirmed'] = 'default={}'.format(replicas)
1307             self._result['body'] = 'foobar'
1308
1309         def put(self, data_hash, data, timeout, headers):
1310             time.sleep(self.delay)
1311             if self.will_raise is not None:
1312                 raise self.will_raise
1313             return self.will_succeed
1314
1315         def last_result(self):
1316             if self.will_succeed:
1317                 return self._result
1318             else:
1319                 return {"status_code": 500, "body": "didn't succeed"}
1320
1321         def finished(self):
1322             return False
1323
1324     def setUp(self):
1325         self.copies = 3
1326         self.pool = arvados.KeepClient.KeepWriterThreadPool(
1327             data = 'foo',
1328             data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1329             max_service_replicas = self.copies,
1330             copies = self.copies
1331         )
1332
1333     def test_only_write_enough_on_success(self):
1334         for i in range(10):
1335             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1336             self.pool.add_task(ks, None)
1337         self.pool.join()
1338         self.assertEqual(self.pool.done(), (self.copies, []))
1339
1340     def test_only_write_enough_on_partial_success(self):
1341         for i in range(5):
1342             ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
1343             self.pool.add_task(ks, None)
1344             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1345             self.pool.add_task(ks, None)
1346         self.pool.join()
1347         self.assertEqual(self.pool.done(), (self.copies, []))
1348
1349     def test_only_write_enough_when_some_crash(self):
1350         for i in range(5):
1351             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1352             self.pool.add_task(ks, None)
1353             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1354             self.pool.add_task(ks, None)
1355         self.pool.join()
1356         self.assertEqual(self.pool.done(), (self.copies, []))
1357
1358     def test_fail_when_too_many_crash(self):
1359         for i in range(self.copies+1):
1360             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1361             self.pool.add_task(ks, None)
1362         for i in range(self.copies-1):
1363             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1364             self.pool.add_task(ks, None)
1365         self.pool.join()
1366         self.assertEqual(self.pool.done(), (self.copies-1, []))
1367
1368
1369 @tutil.skip_sleep
1370 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1371 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
1372     block_cache = False
1373
1374     # Test put()s that need two distinct servers to succeed, possibly
1375     # requiring multiple passes through the retry loop.
1376
1377     def setUp(self):
1378         self.api_client = self.mock_keep_services(count=2)
1379         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
1380
1381     def tearDown(self):
1382         DiskCacheBase.tearDown(self)
1383
1384     def test_success_after_exception(self):
1385         with tutil.mock_keep_responses(
1386                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1387                 Exception('mock err'), 200, 200) as req_mock:
1388             self.keep_client.put('foo', num_retries=1, copies=2)
1389         self.assertEqual(3, req_mock.call_count)
1390
1391     def test_success_after_retryable_error(self):
1392         with tutil.mock_keep_responses(
1393                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1394                 500, 200, 200) as req_mock:
1395             self.keep_client.put('foo', num_retries=1, copies=2)
1396         self.assertEqual(3, req_mock.call_count)
1397
1398     def test_fail_after_final_error(self):
1399         # First retry loop gets a 200 (can't achieve replication by
1400         # storing again on that server) and a 400 (can't retry that
1401         # server at all), so we shouldn't try a third request.
1402         with tutil.mock_keep_responses(
1403                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1404                 200, 400, 200) as req_mock:
1405             with self.assertRaises(arvados.errors.KeepWriteError):
1406                 self.keep_client.put('foo', num_retries=1, copies=2)
1407         self.assertEqual(2, req_mock.call_count)
1408
1409 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1410 class KeepClientAPIErrorTest(unittest.TestCase, DiskCacheBase):
1411     disk_cache = False
1412
1413     def tearDown(self):
1414         DiskCacheBase.tearDown(self)
1415
1416     def test_api_fail(self):
1417         class ApiMock(object):
1418             def __getattr__(self, r):
1419                 if r == "api_token":
1420                     return "abc"
1421                 elif r == "insecure":
1422                     return False
1423                 elif r == "config":
1424                     return lambda: {}
1425                 else:
1426                     raise arvados.errors.KeepReadError()
1427         keep_client = arvados.KeepClient(api_client=ApiMock(),
1428                                          proxy='', local_store='',
1429                                          block_cache=self.make_block_cache(self.disk_cache))
1430
1431         # The bug this is testing for is that if an API (not
1432         # keepstore) exception is thrown as part of a get(), the next
1433         # attempt to get that same block will result in a deadlock.
1434         # This is why there are two get()s in a row.  Unfortunately,
1435         # the failure mode for this test is that the test suite
1436         # deadlocks, there isn't a good way to avoid that without
1437         # adding a special case that has no use except for this test.
1438
1439         with self.assertRaises(arvados.errors.KeepReadError):
1440             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1441         with self.assertRaises(arvados.errors.KeepReadError):
1442             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1443
1444
1445 class KeepDiskCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
1446     def setUp(self):
1447         self.api_client = self.mock_keep_services(count=2)
1448         self.data = b'xyzzy'
1449         self.locator = '1271ed5ef305aadabc605b1609e24c52'
1450         self.disk_cache_dir = tempfile.mkdtemp()
1451
1452     def tearDown(self):
1453         shutil.rmtree(self.disk_cache_dir)
1454
1455
1456     @mock.patch('arvados.KeepClient.KeepService.get')
1457     def test_disk_cache_read(self, get_mock):
1458         # confirm it finds an existing cache block when the cache is
1459         # initialized.
1460
1461         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1462         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1463             f.write(self.data)
1464
1465         # block cache should have found the existing block
1466         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1467                                                   disk_cache_dir=self.disk_cache_dir)
1468         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1469
1470         self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1471
1472         get_mock.assert_not_called()
1473
1474
1475     @mock.patch('arvados.KeepClient.KeepService.get')
1476     def test_disk_cache_share(self, get_mock):
1477         # confirm it finds a cache block written after the disk cache
1478         # was initialized.
1479
1480         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1481                                                   disk_cache_dir=self.disk_cache_dir)
1482         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1483
1484         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1485         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1486             f.write(self.data)
1487
1488         # when we try to get the block, it'll check the disk and find it.
1489         self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1490
1491         get_mock.assert_not_called()
1492
1493
1494     def test_disk_cache_write(self):
1495         # confirm the cache block was created
1496
1497         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1498                                                   disk_cache_dir=self.disk_cache_dir)
1499         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1500
1501         with tutil.mock_keep_responses(self.data, 200) as mock:
1502             self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1503
1504         self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1505
1506         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1507             self.assertTrue(tutil.binary_compare(f.read(), self.data))
1508
1509
1510     def test_disk_cache_clean(self):
1511         # confirm that a tmp file in the cache is cleaned up
1512
1513         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1514         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock"), "wb") as f:
1515             f.write(b"abc1")
1516
1517         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC"), "wb") as f:
1518             f.write(b"abc2")
1519
1520         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC"), "wb") as f:
1521             f.write(b"abc3")
1522
1523         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1524         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1525         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1526
1527         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1528                                                   disk_cache_dir=self.disk_cache_dir)
1529
1530         # The tmp still hasn't been deleted because it was created in the last 60 seconds
1531         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1532         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1533         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1534
1535         # Set the mtime to 61s in the past
1536         os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock"), times=(time.time()-61, time.time()-61))
1537         os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC"), times=(time.time()-61, time.time()-61))
1538         os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC"), times=(time.time()-61, time.time()-61))
1539
1540         block_cache2 = arvados.keep.KeepBlockCache(disk_cache=True,
1541                                                    disk_cache_dir=self.disk_cache_dir)
1542
1543         # Tmp should be gone but the other ones are safe.
1544         self.assertFalse(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1545         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1546         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1547
1548
1549     @mock.patch('arvados.KeepClient.KeepService.get')
1550     def test_disk_cache_cap(self, get_mock):
1551         # confirm that the cache is kept to the desired limit
1552
1553         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1554         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1555             f.write(self.data)
1556
1557         os.makedirs(os.path.join(self.disk_cache_dir, "acb"))
1558         with open(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock"), "wb") as f:
1559             f.write(b"foo")
1560
1561         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1562         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1563
1564         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1565                                                    disk_cache_dir=self.disk_cache_dir,
1566                                                    max_slots=1)
1567
1568         self.assertFalse(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1569         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1570
1571
1572     @mock.patch('arvados.KeepClient.KeepService.get')
1573     def test_disk_cache_share(self, get_mock):
1574         # confirm that a second cache doesn't delete files that belong to the first cache.
1575
1576         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1577         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1578             f.write(self.data)
1579
1580         os.makedirs(os.path.join(self.disk_cache_dir, "acb"))
1581         with open(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock"), "wb") as f:
1582             f.write(b"foo")
1583
1584         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1585         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1586
1587         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1588                                                    disk_cache_dir=self.disk_cache_dir,
1589                                                    max_slots=2)
1590
1591         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1592         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1593
1594         block_cache2 = arvados.keep.KeepBlockCache(disk_cache=True,
1595                                                    disk_cache_dir=self.disk_cache_dir,
1596                                                    max_slots=1)
1597
1598         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1599         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1600
1601
1602
1603     def test_disk_cache_error(self):
1604         os.chmod(self.disk_cache_dir, stat.S_IRUSR)
1605
1606         # Fail during cache initialization.
1607         with self.assertRaises(OSError):
1608             block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1609                                                       disk_cache_dir=self.disk_cache_dir)
1610
1611
1612     def test_disk_cache_write_error(self):
1613         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1614                                                   disk_cache_dir=self.disk_cache_dir)
1615
1616         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1617
1618         # Make the cache dir read-only
1619         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1620         os.chmod(os.path.join(self.disk_cache_dir, self.locator[0:3]), stat.S_IRUSR)
1621
1622         # Cache fails
1623         with self.assertRaises(arvados.errors.KeepCacheError):
1624             with tutil.mock_keep_responses(self.data, 200) as mock:
1625                 keep_client.get(self.locator)
1626
1627
1628     def test_disk_cache_retry_write_error(self):
1629         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1630                                                   disk_cache_dir=self.disk_cache_dir)
1631
1632         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1633
1634         called = False
1635         realmmap = mmap.mmap
1636         def sideeffect_mmap(*args, **kwargs):
1637             nonlocal called
1638             if not called:
1639                 called = True
1640                 raise OSError(errno.ENOSPC, "no space")
1641             else:
1642                 return realmmap(*args, **kwargs)
1643
1644         with patch('mmap.mmap') as mockmmap:
1645             mockmmap.side_effect = sideeffect_mmap
1646
1647             cache_max_before = block_cache.cache_max
1648
1649             with tutil.mock_keep_responses(self.data, 200) as mock:
1650                 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1651
1652             self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1653
1654         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1655             self.assertTrue(tutil.binary_compare(f.read(), self.data))
1656
1657         # shrank the cache in response to ENOSPC
1658         self.assertTrue(cache_max_before > block_cache.cache_max)
1659
1660
1661     def test_disk_cache_retry_write_error2(self):
1662         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1663                                                   disk_cache_dir=self.disk_cache_dir)
1664
1665         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1666
1667         called = False
1668         realmmap = mmap.mmap
1669         def sideeffect_mmap(*args, **kwargs):
1670             nonlocal called
1671             if not called:
1672                 called = True
1673                 raise OSError(errno.ENOMEM, "no memory")
1674             else:
1675                 return realmmap(*args, **kwargs)
1676
1677         with patch('mmap.mmap') as mockmmap:
1678             mockmmap.side_effect = sideeffect_mmap
1679
1680             slots_before = block_cache._max_slots
1681
1682             with tutil.mock_keep_responses(self.data, 200) as mock:
1683                 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1684
1685             self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1686
1687         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1688             self.assertTrue(tutil.binary_compare(f.read(), self.data))
1689
1690         # shrank the cache in response to ENOMEM
1691         self.assertTrue(slots_before > block_cache._max_slots)