Merge branch '21815-trigrams-exclude-ids'
[arvados.git] / sdk / python / tests / test_keep_client.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import errno
6 import hashlib
7 import mmap
8 import os
9 import random
10 import re
11 import shutil
12 import socket
13 import stat
14 import sys
15 import tempfile
16 import time
17 import unittest
18 import urllib.parse
19
20 from pathlib import Path
21 from unittest import mock
22 from unittest.mock import patch
23
24 import parameterized
25 import pycurl
26
27 import arvados
28 import arvados.retry
29 import arvados.util
30 from . import arvados_testutil as tutil
31 from . import keepstub
32 from . import run_test_server
33
34 from .arvados_testutil import DiskCacheBase
35
36 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
37 class KeepTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
38     disk_cache = False
39     MAIN_SERVER = {}
40     KEEP_SERVER = {}
41     block_cache_test = None
42
43     @classmethod
44     def setUpClass(cls):
45         super(KeepTestCase, cls).setUpClass()
46         run_test_server.authorize_with("admin")
47         cls.api_client = arvados.api('v1')
48         cls.block_cache_test = DiskCacheBase()
49         cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
50                                              proxy='', local_store='',
51                                              block_cache=cls.block_cache_test.make_block_cache(cls.disk_cache))
52
53     @classmethod
54     def tearDownClass(cls):
55         super(KeepTestCase, cls).setUpClass()
56         cls.block_cache_test.tearDown()
57
58     def test_KeepBasicRWTest(self):
59         self.assertEqual(0, self.keep_client.upload_counter.get())
60         foo_locator = self.keep_client.put('foo')
61         self.assertRegex(
62             foo_locator,
63             r'^acbd18db4cc2f85cedef654fccc4a4d8\+3',
64             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
65
66         # 6 bytes because uploaded 2 copies
67         self.assertEqual(6, self.keep_client.upload_counter.get())
68
69         self.assertEqual(0, self.keep_client.download_counter.get())
70         self.assertTrue(tutil.binary_compare(self.keep_client.get(foo_locator),
71                          b'foo'),
72                          'wrong content from Keep.get(md5("foo"))')
73         self.assertEqual(3, self.keep_client.download_counter.get())
74
75     def test_KeepBinaryRWTest(self):
76         blob_str = b'\xff\xfe\xf7\x00\x01\x02'
77         blob_locator = self.keep_client.put(blob_str)
78         self.assertRegex(
79             blob_locator,
80             r'^7fc7c53b45e53926ba52821140fef396\+6',
81             ('wrong locator from Keep.put(<binarydata>):' + blob_locator))
82         self.assertEqual(self.keep_client.get(blob_locator),
83                          blob_str,
84                          'wrong content from Keep.get(md5(<binarydata>))')
85
86     def test_KeepLongBinaryRWTest(self):
87         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
88         for i in range(0, 23):
89             blob_data = blob_data + blob_data
90         blob_locator = self.keep_client.put(blob_data)
91         self.assertRegex(
92             blob_locator,
93             r'^84d90fc0d8175dd5dcfab04b999bc956\+67108864',
94             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
95         self.assertEqual(self.keep_client.get(blob_locator),
96                          blob_data,
97                          'wrong content from Keep.get(md5(<binarydata>))')
98
99     @unittest.skip("unreliable test - please fix and close #8752")
100     def test_KeepSingleCopyRWTest(self):
101         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
102         blob_locator = self.keep_client.put(blob_data, copies=1)
103         self.assertRegex(
104             blob_locator,
105             r'^c902006bc98a3eb4a3663b65ab4a6fab\+8',
106             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
107         self.assertEqual(self.keep_client.get(blob_locator),
108                          blob_data,
109                          'wrong content from Keep.get(md5(<binarydata>))')
110
111     def test_KeepEmptyCollectionTest(self):
112         blob_locator = self.keep_client.put('', copies=1)
113         self.assertRegex(
114             blob_locator,
115             r'^d41d8cd98f00b204e9800998ecf8427e\+0',
116             ('wrong locator from Keep.put(""): ' + blob_locator))
117
118     def test_KeepPutDataType(self):
119         with self.assertRaises(AttributeError):
120             # Must be bytes or have an encode() method
121             self.keep_client.put({})
122
123     def test_KeepHeadTest(self):
124         locator = self.keep_client.put('test_head')
125         self.assertRegex(
126             locator,
127             r'^b9a772c7049325feb7130fff1f8333e9\+9',
128             'wrong md5 hash from Keep.put for "test_head": ' + locator)
129         self.assertEqual(True, self.keep_client.head(locator))
130         self.assertEqual(self.keep_client.get(locator),
131                          b'test_head',
132                          'wrong content from Keep.get for "test_head"')
133
134 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
135 class KeepPermissionTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
136     disk_cache = False
137     MAIN_SERVER = {}
138     KEEP_SERVER = {'blob_signing': True}
139
140     def tearDown(self):
141         DiskCacheBase.tearDown(self)
142
143     def test_KeepBasicRWTest(self):
144         run_test_server.authorize_with('active')
145         keep_client = arvados.KeepClient(block_cache=self.make_block_cache(self.disk_cache))
146         foo_locator = keep_client.put('foo')
147         self.assertRegex(
148             foo_locator,
149             r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
150             'invalid locator from Keep.put("foo"): ' + foo_locator)
151         self.assertEqual(keep_client.get(foo_locator),
152                          b'foo',
153                          'wrong content from Keep.get(md5("foo"))')
154
155         # GET with an unsigned locator => bad request
156         bar_locator = keep_client.put('bar')
157         unsigned_bar_locator = "37b51d194a7513e45b56f6524f2d51f2+3"
158         self.assertRegex(
159             bar_locator,
160             r'^37b51d194a7513e45b56f6524f2d51f2\+3\+A[a-f0-9]+@[a-f0-9]+$',
161             'invalid locator from Keep.put("bar"): ' + bar_locator)
162         self.assertRaises(arvados.errors.KeepReadError,
163                           keep_client.get,
164                           unsigned_bar_locator)
165
166         # GET from a different user => bad request
167         run_test_server.authorize_with('spectator')
168         keep_client2 = arvados.KeepClient(block_cache=self.make_block_cache(self.disk_cache))
169         self.assertRaises(arvados.errors.KeepReadError,
170                           keep_client2.get,
171                           bar_locator)
172
173         # Unauthenticated GET for a signed locator => bad request
174         # Unauthenticated GET for an unsigned locator => bad request
175         keep_client.api_token = ''
176         self.assertRaises(arvados.errors.KeepReadError,
177                           keep_client.get,
178                           bar_locator)
179         self.assertRaises(arvados.errors.KeepReadError,
180                           keep_client.get,
181                           unsigned_bar_locator)
182
183
184 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
185 class KeepProxyTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
186     disk_cache = False
187     MAIN_SERVER = {}
188     KEEP_SERVER = {}
189     KEEP_PROXY_SERVER = {}
190
191     @classmethod
192     def setUpClass(cls):
193         super(KeepProxyTestCase, cls).setUpClass()
194         run_test_server.authorize_with('active')
195         cls.api_client = arvados.api('v1')
196
197     def tearDown(self):
198         super(KeepProxyTestCase, self).tearDown()
199         DiskCacheBase.tearDown(self)
200
201     def test_KeepProxyTest1(self):
202         # Will use ARVADOS_KEEP_SERVICES environment variable that
203         # is set by setUpClass().
204         keep_client = arvados.KeepClient(api_client=self.api_client,
205                                          local_store='', block_cache=self.make_block_cache(self.disk_cache))
206         baz_locator = keep_client.put('baz')
207         self.assertRegex(
208             baz_locator,
209             r'^73feffa4b7f6bb68e44cf984c85f6e88\+3',
210             'wrong md5 hash from Keep.put("baz"): ' + baz_locator)
211         self.assertEqual(keep_client.get(baz_locator),
212                          b'baz',
213                          'wrong content from Keep.get(md5("baz"))')
214         self.assertTrue(keep_client.using_proxy)
215
216     def test_KeepProxyTestMultipleURIs(self):
217         # Test using ARVADOS_KEEP_SERVICES env var overriding any
218         # existing proxy setting and setting multiple proxies
219         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
220         keep_client = arvados.KeepClient(api_client=self.api_client,
221                                          local_store='',
222                                          block_cache=self.make_block_cache(self.disk_cache))
223         uris = [x['_service_root'] for x in keep_client._keep_services]
224         self.assertEqual(uris, ['http://10.0.0.1/',
225                                 'https://foo.example.org:1234/'])
226
227     def test_KeepProxyTestInvalidURI(self):
228         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org'
229         with self.assertRaises(arvados.errors.ArgumentError):
230             keep_client = arvados.KeepClient(api_client=self.api_client,
231                                              local_store='',
232                                              block_cache=self.make_block_cache(self.disk_cache))
233
234
235 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
236 class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
237     disk_cache = False
238
239     def tearDown(self):
240         DiskCacheBase.tearDown(self)
241
242     def get_service_roots(self, api_client):
243         keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
244         services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
245         return [urllib.parse.urlparse(url) for url in sorted(services)]
246
247     def test_ssl_flag_respected_in_roots(self):
248         for ssl_flag in [False, True]:
249             services = self.get_service_roots(self.mock_keep_services(
250                 service_ssl_flag=ssl_flag))
251             self.assertEqual(
252                 ('https' if ssl_flag else 'http'), services[0].scheme)
253
254     def test_correct_ports_with_ipv6_addresses(self):
255         service = self.get_service_roots(self.mock_keep_services(
256             service_type='proxy', service_host='100::1', service_port=10, count=1))[0]
257         self.assertEqual('100::1', service.hostname)
258         self.assertEqual(10, service.port)
259
260     def test_recognize_proxy_services_in_controller_response(self):
261         keep_client = arvados.KeepClient(api_client=self.mock_keep_services(
262             service_type='proxy', service_host='localhost', service_port=9, count=1),
263                                          block_cache=self.make_block_cache(self.disk_cache))
264         try:
265             # this will fail, but it ensures we get the service
266             # discovery response
267             keep_client.put('baz2', num_retries=0)
268         except:
269             pass
270         self.assertTrue(keep_client.using_proxy)
271
272     def test_insecure_disables_tls_verify(self):
273         api_client = self.mock_keep_services(count=1)
274         force_timeout = socket.timeout("timed out")
275
276         api_client.insecure = True
277         with tutil.mock_keep_responses(b'foo', 200) as mock:
278             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
279             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
280             self.assertEqual(
281                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
282                 0)
283             self.assertEqual(
284                 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
285                 0)
286
287         api_client.insecure = False
288         with tutil.mock_keep_responses(b'foo', 200) as mock:
289             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
290             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
291             # getopt()==None here means we didn't change the
292             # default. If we were using real pycurl instead of a mock,
293             # it would return the default value 1.
294             self.assertEqual(
295                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
296                 None)
297             self.assertEqual(
298                 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
299                 None)
300
301     def test_refresh_signature(self):
302         blk_digest = '6f5902ac237024bdd0c176cb93063dc4+11'
303         blk_sig = 'da39a3ee5e6b4b0d3255bfef95601890afd80709@53bed294'
304         local_loc = blk_digest+'+A'+blk_sig
305         remote_loc = blk_digest+'+R'+blk_sig
306         api_client = self.mock_keep_services(count=1)
307         headers = {'X-Keep-Locator':local_loc}
308         with tutil.mock_keep_responses('', 200, **headers):
309             # Check that the translated locator gets returned
310             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
311             self.assertEqual(local_loc, keep_client.refresh_signature(remote_loc))
312             # Check that refresh_signature() uses the correct method and headers
313             keep_client._get_or_head = mock.MagicMock()
314             keep_client.refresh_signature(remote_loc)
315             args, kwargs = keep_client._get_or_head.call_args_list[0]
316             self.assertIn(remote_loc, args)
317             self.assertEqual("HEAD", kwargs['method'])
318             self.assertIn('X-Keep-Signature', kwargs['headers'])
319
320     # test_*_timeout verify that KeepClient instructs pycurl to use
321     # the appropriate connection and read timeouts. They don't care
322     # whether pycurl actually exhibits the expected timeout behavior
323     # -- those tests are in the KeepClientTimeout test class.
324
325     def test_get_timeout(self):
326         api_client = self.mock_keep_services(count=1)
327         force_timeout = socket.timeout("timed out")
328         with tutil.mock_keep_responses(force_timeout, 0) as mock:
329             keep_client = arvados.KeepClient(
330                 api_client=api_client,
331                 block_cache=self.make_block_cache(self.disk_cache),
332                 num_retries=0,
333             )
334             with self.assertRaises(arvados.errors.KeepReadError):
335                 keep_client.get('ffffffffffffffffffffffffffffffff')
336             self.assertEqual(
337                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
338                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
339             self.assertEqual(
340                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
341                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
342             self.assertEqual(
343                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
344                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
345
346     def test_put_timeout(self):
347         api_client = self.mock_keep_services(count=1)
348         force_timeout = socket.timeout("timed out")
349         with tutil.mock_keep_responses(force_timeout, 0) as mock:
350             keep_client = arvados.KeepClient(
351                 api_client=api_client,
352                 block_cache=self.make_block_cache(self.disk_cache),
353                 num_retries=0,
354             )
355             with self.assertRaises(arvados.errors.KeepWriteError):
356                 keep_client.put(b'foo')
357             self.assertEqual(
358                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
359                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
360             self.assertEqual(
361                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
362                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
363             self.assertEqual(
364                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
365                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
366
367     def test_head_timeout(self):
368         api_client = self.mock_keep_services(count=1)
369         force_timeout = socket.timeout("timed out")
370         with tutil.mock_keep_responses(force_timeout, 0) as mock:
371             keep_client = arvados.KeepClient(
372                 api_client=api_client,
373                 block_cache=self.make_block_cache(self.disk_cache),
374                 num_retries=0,
375             )
376             with self.assertRaises(arvados.errors.KeepReadError):
377                 keep_client.head('ffffffffffffffffffffffffffffffff')
378             self.assertEqual(
379                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
380                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
381             self.assertEqual(
382                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
383                 None)
384             self.assertEqual(
385                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
386                 None)
387
388     def test_proxy_get_timeout(self):
389         api_client = self.mock_keep_services(service_type='proxy', count=1)
390         force_timeout = socket.timeout("timed out")
391         with tutil.mock_keep_responses(force_timeout, 0) as mock:
392             keep_client = arvados.KeepClient(
393                 api_client=api_client,
394                 block_cache=self.make_block_cache(self.disk_cache),
395                 num_retries=0,
396             )
397             with self.assertRaises(arvados.errors.KeepReadError):
398                 keep_client.get('ffffffffffffffffffffffffffffffff')
399             self.assertEqual(
400                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
401                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
402             self.assertEqual(
403                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
404                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
405             self.assertEqual(
406                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
407                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
408
409     def test_proxy_head_timeout(self):
410         api_client = self.mock_keep_services(service_type='proxy', count=1)
411         force_timeout = socket.timeout("timed out")
412         with tutil.mock_keep_responses(force_timeout, 0) as mock:
413             keep_client = arvados.KeepClient(
414                 api_client=api_client,
415                 block_cache=self.make_block_cache(self.disk_cache),
416                 num_retries=0,
417             )
418             with self.assertRaises(arvados.errors.KeepReadError):
419                 keep_client.head('ffffffffffffffffffffffffffffffff')
420             self.assertEqual(
421                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
422                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
423             self.assertEqual(
424                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
425                 None)
426             self.assertEqual(
427                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
428                 None)
429
430     def test_proxy_put_timeout(self):
431         self.disk_cache_dir = None
432         api_client = self.mock_keep_services(service_type='proxy', count=1)
433         force_timeout = socket.timeout("timed out")
434         with tutil.mock_keep_responses(force_timeout, 0) as mock:
435             keep_client = arvados.KeepClient(
436                 api_client=api_client,
437                 num_retries=0,
438             )
439             with self.assertRaises(arvados.errors.KeepWriteError):
440                 keep_client.put('foo')
441             self.assertEqual(
442                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
443                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
444             self.assertEqual(
445                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
446                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
447             self.assertEqual(
448                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
449                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
450
451     def check_no_services_error(self, verb, exc_class):
452         api_client = mock.MagicMock(name='api_client')
453         api_client.keep_services().accessible().execute.side_effect = (
454             arvados.errors.ApiError)
455         keep_client = arvados.KeepClient(
456             api_client=api_client,
457             block_cache=self.make_block_cache(self.disk_cache),
458             num_retries=0,
459         )
460         with self.assertRaises(exc_class) as err_check:
461             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
462         self.assertEqual(0, len(err_check.exception.request_errors()))
463
464     def test_get_error_with_no_services(self):
465         self.check_no_services_error('get', arvados.errors.KeepReadError)
466
467     def test_head_error_with_no_services(self):
468         self.check_no_services_error('head', arvados.errors.KeepReadError)
469
470     def test_put_error_with_no_services(self):
471         self.check_no_services_error('put', arvados.errors.KeepWriteError)
472
473     def check_errors_from_last_retry(self, verb, exc_class):
474         api_client = self.mock_keep_services(count=2)
475         req_mock = tutil.mock_keep_responses(
476             "retry error reporting test", 500, 500, 500, 500, 500, 500, 502, 502)
477         with req_mock, tutil.skip_sleep, \
478                 self.assertRaises(exc_class) as err_check:
479             keep_client = arvados.KeepClient(
480                 api_client=api_client,
481                 block_cache=self.make_block_cache(self.disk_cache),
482                 num_retries=0,
483             )
484             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
485                                        num_retries=3)
486         self.assertEqual([502, 502], [
487                 getattr(error, 'status_code', None)
488                 for error in err_check.exception.request_errors().values()])
489         self.assertRegex(str(err_check.exception), r'failed to (read|write) .* after 4 attempts')
490
491     def test_get_error_reflects_last_retry(self):
492         self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
493
494     def test_head_error_reflects_last_retry(self):
495         self.check_errors_from_last_retry('head', arvados.errors.KeepReadError)
496
497     def test_put_error_reflects_last_retry(self):
498         self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
499
500     def test_put_error_does_not_include_successful_puts(self):
501         data = 'partial failure test'
502         data_loc = tutil.str_keep_locator(data)
503         api_client = self.mock_keep_services(count=3)
504         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
505                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
506             keep_client = arvados.KeepClient(
507                 api_client=api_client,
508                 block_cache=self.make_block_cache(self.disk_cache),
509                 num_retries=0,
510             )
511             keep_client.put(data)
512         self.assertEqual(2, len(exc_check.exception.request_errors()))
513
514     def test_proxy_put_with_no_writable_services(self):
515         data = 'test with no writable services'
516         data_loc = tutil.str_keep_locator(data)
517         api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
518         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
519                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
520             keep_client = arvados.KeepClient(
521                 api_client=api_client,
522                 block_cache=self.make_block_cache(self.disk_cache),
523                 num_retries=0,
524             )
525             keep_client.put(data)
526         self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
527         self.assertEqual(0, len(exc_check.exception.request_errors()))
528
529     def test_oddball_service_get(self):
530         body = b'oddball service get'
531         api_client = self.mock_keep_services(service_type='fancynewblobstore')
532         with tutil.mock_keep_responses(body, 200):
533             keep_client = arvados.KeepClient(
534                 api_client=api_client,
535                 block_cache=self.make_block_cache(self.disk_cache),
536                 num_retries=0,
537             )
538             actual = keep_client.get(tutil.str_keep_locator(body))
539         self.assertEqual(body, actual)
540
541     def test_oddball_service_put(self):
542         body = b'oddball service put'
543         pdh = tutil.str_keep_locator(body)
544         api_client = self.mock_keep_services(service_type='fancynewblobstore')
545         with tutil.mock_keep_responses(pdh, 200):
546             keep_client = arvados.KeepClient(
547                 api_client=api_client,
548                 block_cache=self.make_block_cache(self.disk_cache),
549                 num_retries=0,
550             )
551             actual = keep_client.put(body, copies=1)
552         self.assertEqual(pdh, actual)
553
554     def test_oddball_service_writer_count(self):
555         body = b'oddball service writer count'
556         pdh = tutil.str_keep_locator(body)
557         api_client = self.mock_keep_services(service_type='fancynewblobstore',
558                                              count=4)
559         headers = {'x-keep-replicas-stored': 3}
560         with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
561                                        **headers) as req_mock:
562             keep_client = arvados.KeepClient(
563                 api_client=api_client,
564                 block_cache=self.make_block_cache(self.disk_cache),
565                 num_retries=0,
566             )
567             actual = keep_client.put(body, copies=2)
568         self.assertEqual(pdh, actual)
569         self.assertEqual(1, req_mock.call_count)
570
571
572 @tutil.skip_sleep
573 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
574 class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
575     disk_cache = False
576
577     def setUp(self):
578         self.api_client = self.mock_keep_services(count=2)
579         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
580         self.data = b'xyzzy'
581         self.locator = '1271ed5ef305aadabc605b1609e24c52'
582
583     def tearDown(self):
584         DiskCacheBase.tearDown(self)
585
586     @mock.patch('arvados.KeepClient.KeepService.get')
587     def test_get_request_cache(self, get_mock):
588         with tutil.mock_keep_responses(self.data, 200, 200):
589             self.keep_client.get(self.locator)
590             self.keep_client.get(self.locator)
591         # Request already cached, don't require more than one request
592         get_mock.assert_called_once()
593
594     @mock.patch('arvados.KeepClient.KeepService.get')
595     def test_head_request_cache(self, get_mock):
596         with tutil.mock_keep_responses(self.data, 200, 200):
597             self.keep_client.head(self.locator)
598             self.keep_client.head(self.locator)
599         # Don't cache HEAD requests so that they're not confused with GET reqs
600         self.assertEqual(2, get_mock.call_count)
601
602     @mock.patch('arvados.KeepClient.KeepService.get')
603     def test_head_and_then_get_return_different_responses(self, get_mock):
604         head_resp = None
605         get_resp = None
606         get_mock.side_effect = [b'first response', b'second response']
607         with tutil.mock_keep_responses(self.data, 200, 200):
608             head_resp = self.keep_client.head(self.locator)
609             get_resp = self.keep_client.get(self.locator)
610         self.assertEqual(b'first response', head_resp)
611         # First reponse was not cached because it was from a HEAD request.
612         self.assertNotEqual(head_resp, get_resp)
613
614
615 @tutil.skip_sleep
616 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
617 class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
618     disk_cache = False
619
620     def setUp(self):
621         self.api_client = self.mock_keep_services(count=2)
622         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
623         self.data = b'xyzzy'
624         self.locator = '1271ed5ef305aadabc605b1609e24c52'
625         self.test_id = arvados.util.new_request_id()
626         self.assertRegex(self.test_id, r'^req-[a-z0-9]{20}$')
627         # If we don't set request_id to None explicitly here, it will
628         # return <MagicMock name='api_client_mock.request_id'
629         # id='123456789'>:
630         self.api_client.request_id = None
631
632     def tearDown(self):
633         DiskCacheBase.tearDown(self)
634
635     def test_default_to_api_client_request_id(self):
636         self.api_client.request_id = self.test_id
637         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
638             self.keep_client.put(self.data)
639         self.assertEqual(2, len(mock.responses))
640         for resp in mock.responses:
641             self.assertProvidedRequestId(resp)
642
643         with tutil.mock_keep_responses(self.data, 200) as mock:
644             self.keep_client.get(self.locator)
645         self.assertProvidedRequestId(mock.responses[0])
646
647         with tutil.mock_keep_responses(b'', 200) as mock:
648             self.keep_client.head(self.locator)
649         self.assertProvidedRequestId(mock.responses[0])
650
651     def test_explicit_request_id(self):
652         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
653             self.keep_client.put(self.data, request_id=self.test_id)
654         self.assertEqual(2, len(mock.responses))
655         for resp in mock.responses:
656             self.assertProvidedRequestId(resp)
657
658         with tutil.mock_keep_responses(self.data, 200) as mock:
659             self.keep_client.get(self.locator, request_id=self.test_id)
660         self.assertProvidedRequestId(mock.responses[0])
661
662         with tutil.mock_keep_responses(b'', 200) as mock:
663             self.keep_client.head(self.locator, request_id=self.test_id)
664         self.assertProvidedRequestId(mock.responses[0])
665
666     def test_automatic_request_id(self):
667         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
668             self.keep_client.put(self.data)
669         self.assertEqual(2, len(mock.responses))
670         for resp in mock.responses:
671             self.assertAutomaticRequestId(resp)
672
673         with tutil.mock_keep_responses(self.data, 200) as mock:
674             self.keep_client.get(self.locator)
675         self.assertAutomaticRequestId(mock.responses[0])
676
677         with tutil.mock_keep_responses(b'', 200) as mock:
678             self.keep_client.head(self.locator)
679         self.assertAutomaticRequestId(mock.responses[0])
680
681     def test_request_id_in_exception(self):
682         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
683             with self.assertRaisesRegex(arvados.errors.KeepReadError, self.test_id):
684                 self.keep_client.head(self.locator, request_id=self.test_id)
685
686         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
687             with self.assertRaisesRegex(arvados.errors.KeepReadError, r'req-[a-z0-9]{20}'):
688                 self.keep_client.get(self.locator)
689
690         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
691             with self.assertRaisesRegex(arvados.errors.KeepWriteError, self.test_id):
692                 self.keep_client.put(self.data, request_id=self.test_id)
693
694         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
695             with self.assertRaisesRegex(arvados.errors.KeepWriteError, r'req-[a-z0-9]{20}'):
696                 self.keep_client.put(self.data)
697
698     def assertAutomaticRequestId(self, resp):
699         hdr = [x for x in resp.getopt(pycurl.HTTPHEADER)
700                if x.startswith('X-Request-Id: ')][0]
701         self.assertNotEqual(hdr, 'X-Request-Id: '+self.test_id)
702         self.assertRegex(hdr, r'^X-Request-Id: req-[a-z0-9]{20}$')
703
704     def assertProvidedRequestId(self, resp):
705         self.assertIn('X-Request-Id: '+self.test_id,
706                       resp.getopt(pycurl.HTTPHEADER))
707
708
709 @tutil.skip_sleep
710 #@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
711 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
712     disk_cache = False
713
714     def setUp(self):
715         # expected_order[i] is the probe order for
716         # hash=md5(sprintf("%064x",i)) where there are 16 services
717         # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
718         # the first probe for the block consisting of 64 "0"
719         # characters is the service whose uuid is
720         # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
721         self.services = 16
722         self.expected_order = [
723             list('3eab2d5fc9681074'),
724             list('097dba52e648f1c3'),
725             list('c5b4e023f8a7d691'),
726             list('9d81c02e76a3bf54'),
727             ]
728         self.blocks = [
729             "{:064x}".format(x).encode()
730             for x in range(len(self.expected_order))]
731         self.hashes = [
732             hashlib.md5(self.blocks[x]).hexdigest()
733             for x in range(len(self.expected_order))]
734         self.api_client = self.mock_keep_services(count=self.services)
735         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
736
737     def tearDown(self):
738         DiskCacheBase.tearDown(self)
739
740     def test_weighted_service_roots_against_reference_set(self):
741         # Confirm weighted_service_roots() returns the correct order
742         for i, hash in enumerate(self.hashes):
743             roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash))
744             got_order = [
745                 re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
746                 for root in roots]
747             self.assertEqual(self.expected_order[i], got_order)
748
749     def test_get_probe_order_against_reference_set(self):
750         self._test_probe_order_against_reference_set(
751             lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
752
753     def test_head_probe_order_against_reference_set(self):
754         self._test_probe_order_against_reference_set(
755             lambda i: self.keep_client.head(self.hashes[i], num_retries=1))
756
757     def test_put_probe_order_against_reference_set(self):
758         # copies=1 prevents the test from being sensitive to races
759         # between writer threads.
760         self._test_probe_order_against_reference_set(
761             lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1))
762
763     def _test_probe_order_against_reference_set(self, op):
764         for i in range(len(self.blocks)):
765             with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \
766                  self.assertRaises(arvados.errors.KeepRequestError):
767                 op(i)
768             got_order = [
769                 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
770                 for resp in mock.responses]
771             self.assertEqual(self.expected_order[i]*2, got_order)
772
773     def test_put_probe_order_multiple_copies(self):
774         for copies in range(2, 4):
775             for i in range(len(self.blocks)):
776                 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \
777                      self.assertRaises(arvados.errors.KeepWriteError):
778                     self.keep_client.put(self.blocks[i], num_retries=2, copies=copies)
779                 got_order = [
780                     re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
781                     for resp in mock.responses]
782                 # With T threads racing to make requests, the position
783                 # of a given server in the sequence of HTTP requests
784                 # (got_order) cannot be more than T-1 positions
785                 # earlier than that server's position in the reference
786                 # probe sequence (expected_order).
787                 #
788                 # Loop invariant: we have accounted for +pos+ expected
789                 # probes, either by seeing them in +got_order+ or by
790                 # putting them in +pending+ in the hope of seeing them
791                 # later. As long as +len(pending)<T+, we haven't
792                 # started a request too early.
793                 pending = []
794                 for pos, expected in enumerate(self.expected_order[i]*3):
795                     got = got_order[pos-len(pending)]
796                     while got in pending:
797                         del pending[pending.index(got)]
798                         got = got_order[pos-len(pending)]
799                     if got != expected:
800                         pending.append(expected)
801                         self.assertLess(
802                             len(pending), copies,
803                             "pending={}, with copies={}, got {}, expected {}".format(
804                                 pending, copies, repr(got_order), repr(self.expected_order[i]*3)))
805
806     def test_probe_waste_adding_one_server(self):
807         hashes = [
808             hashlib.md5("{:064x}".format(x).encode()).hexdigest() for x in range(100)]
809         initial_services = 12
810         self.api_client = self.mock_keep_services(count=initial_services)
811         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
812         probes_before = [
813             self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
814         for added_services in range(1, 12):
815             api_client = self.mock_keep_services(count=initial_services+added_services)
816             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
817             total_penalty = 0
818             for hash_index in range(len(hashes)):
819                 probe_after = keep_client.weighted_service_roots(
820                     arvados.KeepLocator(hashes[hash_index]))
821                 penalty = probe_after.index(probes_before[hash_index][0])
822                 self.assertLessEqual(penalty, added_services)
823                 total_penalty += penalty
824             # Average penalty per block should not exceed
825             # N(added)/N(orig) by more than 20%, and should get closer
826             # to the ideal as we add data points.
827             expect_penalty = (
828                 added_services *
829                 len(hashes) / initial_services)
830             max_penalty = (
831                 expect_penalty *
832                 (120 - added_services)/100)
833             min_penalty = (
834                 expect_penalty * 8/10)
835             self.assertTrue(
836                 min_penalty <= total_penalty <= max_penalty,
837                 "With {}+{} services, {} blocks, penalty {} but expected {}..{}".format(
838                     initial_services,
839                     added_services,
840                     len(hashes),
841                     total_penalty,
842                     min_penalty,
843                     max_penalty))
844
845     def check_64_zeros_error_order(self, verb, exc_class):
846         data = b'0' * 64
847         if verb == 'get':
848             data = tutil.str_keep_locator(data)
849         # Arbitrary port number:
850         aport = random.randint(1024,65535)
851         api_client = self.mock_keep_services(service_port=aport, count=self.services)
852         keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
853         with mock.patch('pycurl.Curl') as curl_mock, \
854              self.assertRaises(exc_class) as err_check:
855             curl_mock.return_value = tutil.FakeCurl.make(code=500, body=b'')
856             getattr(keep_client, verb)(data)
857         urls = [urllib.parse.urlparse(url)
858                 for url in err_check.exception.request_errors()]
859         self.assertEqual([('keep0x' + c, aport) for c in '3eab2d5fc9681074'],
860                          [(url.hostname, url.port) for url in urls])
861
862     def test_get_error_shows_probe_order(self):
863         self.check_64_zeros_error_order('get', arvados.errors.KeepReadError)
864
865     def test_put_error_shows_probe_order(self):
866         self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
867
868
869 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
870 class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase, DiskCacheBase):
871     disk_cache = False
872
873     # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
874     # 1s worth of data and then trigger bandwidth errors before running
875     # out of data.
876     DATA = b'x'*2**11
877     BANDWIDTH_LOW_LIM = 1024
878     TIMEOUT_TIME = 1.0
879
880     def tearDown(self):
881         DiskCacheBase.tearDown(self)
882
883     class assertTakesBetween(unittest.TestCase):
884         def __init__(self, tmin, tmax):
885             self.tmin = tmin
886             self.tmax = tmax
887
888         def __enter__(self):
889             self.t0 = time.time()
890
891         def __exit__(self, *args, **kwargs):
892             # Round times to milliseconds, like CURL. Otherwise, we
893             # fail when CURL reaches a 1s timeout at 0.9998s.
894             delta = round(time.time() - self.t0, 3)
895             self.assertGreaterEqual(delta, self.tmin)
896             self.assertLessEqual(delta, self.tmax)
897
898     class assertTakesGreater(unittest.TestCase):
899         def __init__(self, tmin):
900             self.tmin = tmin
901
902         def __enter__(self):
903             self.t0 = time.time()
904
905         def __exit__(self, *args, **kwargs):
906             delta = round(time.time() - self.t0, 3)
907             self.assertGreaterEqual(delta, self.tmin)
908
909     def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
910         return arvados.KeepClient(
911             api_client=self.api_client,
912             timeout=timeouts, block_cache=self.make_block_cache(self.disk_cache))
913
914     def test_timeout_slow_connect(self):
915         # Can't simulate TCP delays with our own socket. Leave our
916         # stub server running uselessly, and try to connect to an
917         # unroutable IP address instead.
918         self.api_client = self.mock_keep_services(
919             count=1,
920             service_host='240.0.0.0',
921         )
922         with self.assertTakesBetween(0.1, 0.5):
923             with self.assertRaises(arvados.errors.KeepWriteError):
924                 self.keepClient().put(self.DATA, copies=1, num_retries=0)
925
926     def test_low_bandwidth_no_delays_success(self):
927         self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
928         kc = self.keepClient()
929         loc = kc.put(self.DATA, copies=1, num_retries=0)
930         self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
931
932     def test_too_low_bandwidth_no_delays_failure(self):
933         # Check that lessening bandwidth corresponds to failing
934         kc = self.keepClient()
935         loc = kc.put(self.DATA, copies=1, num_retries=0)
936         self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
937         with self.assertTakesGreater(self.TIMEOUT_TIME):
938             with self.assertRaises(arvados.errors.KeepReadError):
939                 kc.get(loc, num_retries=0)
940         with self.assertTakesGreater(self.TIMEOUT_TIME):
941             with self.assertRaises(arvados.errors.KeepWriteError):
942                 kc.put(self.DATA, copies=1, num_retries=0)
943
944     def test_low_bandwidth_with_server_response_delay_failure(self):
945         kc = self.keepClient()
946         loc = kc.put(self.DATA, copies=1, num_retries=0)
947         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
948         # Note the actual delay must be 1s longer than the low speed
949         # limit interval in order for curl to detect it reliably.
950         self.server.setdelays(response=self.TIMEOUT_TIME+1)
951         with self.assertTakesGreater(self.TIMEOUT_TIME):
952             with self.assertRaises(arvados.errors.KeepReadError):
953                 kc.get(loc, num_retries=0)
954         with self.assertTakesGreater(self.TIMEOUT_TIME):
955             with self.assertRaises(arvados.errors.KeepWriteError):
956                 kc.put(self.DATA, copies=1, num_retries=0)
957         with self.assertTakesGreater(self.TIMEOUT_TIME):
958             kc.head(loc, num_retries=0)
959
960     def test_low_bandwidth_with_server_mid_delay_failure(self):
961         kc = self.keepClient()
962         loc = kc.put(self.DATA, copies=1, num_retries=0)
963         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
964         # Note the actual delay must be 1s longer than the low speed
965         # limit interval in order for curl to detect it reliably.
966         self.server.setdelays(mid_write=self.TIMEOUT_TIME+1, mid_read=self.TIMEOUT_TIME+1)
967         with self.assertTakesGreater(self.TIMEOUT_TIME):
968             with self.assertRaises(arvados.errors.KeepReadError) as e:
969                 kc.get(loc, num_retries=0)
970         with self.assertTakesGreater(self.TIMEOUT_TIME):
971             with self.assertRaises(arvados.errors.KeepWriteError):
972                 kc.put(self.DATA, copies=1, num_retries=0)
973
974     def test_timeout_slow_request(self):
975         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
976         self.server.setdelays(request=.2)
977         self._test_connect_timeout_under_200ms(loc)
978         self.server.setdelays(request=2)
979         self._test_response_timeout_under_2s(loc)
980
981     def test_timeout_slow_response(self):
982         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
983         self.server.setdelays(response=.2)
984         self._test_connect_timeout_under_200ms(loc)
985         self.server.setdelays(response=2)
986         self._test_response_timeout_under_2s(loc)
987
988     def test_timeout_slow_response_body(self):
989         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
990         self.server.setdelays(response_body=.2)
991         self._test_connect_timeout_under_200ms(loc)
992         self.server.setdelays(response_body=2)
993         self._test_response_timeout_under_2s(loc)
994
995     def _test_connect_timeout_under_200ms(self, loc):
996         # Allow 100ms to connect, then 1s for response. Everything
997         # should work, and everything should take at least 200ms to
998         # return.
999         kc = self.keepClient(timeouts=(.1, 1))
1000         with self.assertTakesBetween(.2, .3):
1001             kc.put(self.DATA, copies=1, num_retries=0)
1002         with self.assertTakesBetween(.2, .3):
1003             self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
1004
1005     def _test_response_timeout_under_2s(self, loc):
1006         # Allow 10s to connect, then 1s for response. Nothing should
1007         # work, and everything should take at least 1s to return.
1008         kc = self.keepClient(timeouts=(10, 1))
1009         with self.assertTakesBetween(1, 9):
1010             with self.assertRaises(arvados.errors.KeepReadError):
1011                 kc.get(loc, num_retries=0)
1012         with self.assertTakesBetween(1, 9):
1013             with self.assertRaises(arvados.errors.KeepWriteError):
1014                 kc.put(self.DATA, copies=1, num_retries=0)
1015
1016
1017 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1018 class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
1019     disk_cache = False
1020
1021     def tearDown(self):
1022         DiskCacheBase.tearDown(self)
1023
1024     def mock_disks_and_gateways(self, disks=3, gateways=1):
1025         self.gateways = [{
1026                 'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
1027                 'owner_uuid': 'zzzzz-tpzed-000000000000000',
1028                 'service_host': 'gatewayhost{}'.format(i),
1029                 'service_port': 12345,
1030                 'service_ssl_flag': True,
1031                 'service_type': 'gateway:test',
1032         } for i in range(gateways)]
1033         self.gateway_roots = [
1034             "https://{service_host}:{service_port}/".format(**gw)
1035             for gw in self.gateways]
1036         self.api_client = self.mock_keep_services(
1037             count=disks, additional_services=self.gateways)
1038         self.keepClient = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
1039
1040     @mock.patch('pycurl.Curl')
1041     def test_get_with_gateway_hint_first(self, MockCurl):
1042         MockCurl.return_value = tutil.FakeCurl.make(
1043             code=200, body='foo', headers={'Content-Length': 3})
1044         self.mock_disks_and_gateways()
1045         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@' + self.gateways[0]['uuid']
1046         self.assertEqual(b'foo', self.keepClient.get(locator))
1047         self.assertEqual(self.gateway_roots[0]+locator,
1048                          MockCurl.return_value.getopt(pycurl.URL).decode())
1049         self.assertEqual(True, self.keepClient.head(locator))
1050
1051     @mock.patch('pycurl.Curl')
1052     def test_get_with_gateway_hints_in_order(self, MockCurl):
1053         gateways = 4
1054         disks = 3
1055         mocks = [
1056             tutil.FakeCurl.make(code=404, body='')
1057             for _ in range(gateways+disks)
1058         ]
1059         MockCurl.side_effect = tutil.queue_with(mocks)
1060         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1061         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1062                            ['K@'+gw['uuid'] for gw in self.gateways])
1063         with self.assertRaises(arvados.errors.NotFoundError):
1064             self.keepClient.get(locator)
1065         # Gateways are tried first, in the order given.
1066         for i, root in enumerate(self.gateway_roots):
1067             self.assertEqual(root+locator,
1068                              mocks[i].getopt(pycurl.URL).decode())
1069         # Disk services are tried next.
1070         for i in range(gateways, gateways+disks):
1071             self.assertRegex(
1072                 mocks[i].getopt(pycurl.URL).decode(),
1073                 r'keep0x')
1074
1075     @mock.patch('pycurl.Curl')
1076     def test_head_with_gateway_hints_in_order(self, MockCurl):
1077         gateways = 4
1078         disks = 3
1079         mocks = [
1080             tutil.FakeCurl.make(code=404, body=b'')
1081             for _ in range(gateways+disks)
1082         ]
1083         MockCurl.side_effect = tutil.queue_with(mocks)
1084         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1085         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1086                            ['K@'+gw['uuid'] for gw in self.gateways])
1087         with self.assertRaises(arvados.errors.NotFoundError):
1088             self.keepClient.head(locator)
1089         # Gateways are tried first, in the order given.
1090         for i, root in enumerate(self.gateway_roots):
1091             self.assertEqual(root+locator,
1092                              mocks[i].getopt(pycurl.URL).decode())
1093         # Disk services are tried next.
1094         for i in range(gateways, gateways+disks):
1095             self.assertRegex(
1096                 mocks[i].getopt(pycurl.URL).decode(),
1097                 r'keep0x')
1098
1099     @mock.patch('pycurl.Curl')
1100     def test_get_with_remote_proxy_hint(self, MockCurl):
1101         MockCurl.return_value = tutil.FakeCurl.make(
1102             code=200, body=b'foo', headers={'Content-Length': 3})
1103         self.mock_disks_and_gateways()
1104         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1105         self.assertEqual(b'foo', self.keepClient.get(locator))
1106         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1107                          MockCurl.return_value.getopt(pycurl.URL).decode())
1108
1109     @mock.patch('pycurl.Curl')
1110     def test_head_with_remote_proxy_hint(self, MockCurl):
1111         MockCurl.return_value = tutil.FakeCurl.make(
1112             code=200, body=b'foo', headers={'Content-Length': 3})
1113         self.mock_disks_and_gateways()
1114         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1115         self.assertEqual(True, self.keepClient.head(locator))
1116         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1117                          MockCurl.return_value.getopt(pycurl.URL).decode())
1118
1119
1120 class KeepClientRetryTestMixin(object):
1121     disk_cache = False
1122
1123     # Testing with a local Keep store won't exercise the retry behavior.
1124     # Instead, our strategy is:
1125     # * Create a client with one proxy specified (pointed at a black
1126     #   hole), so there's no need to instantiate an API client, and
1127     #   all HTTP requests come from one place.
1128     # * Mock httplib's request method to provide simulated responses.
1129     # This lets us test the retry logic extensively without relying on any
1130     # supporting servers, and prevents side effects in case something hiccups.
1131     # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
1132     # run_method().
1133     #
1134     # Test classes must define TEST_PATCHER to a method that mocks
1135     # out appropriate methods in the client.
1136
1137     PROXY_ADDR = 'http://[%s]:65535/' % (tutil.TEST_HOST,)
1138     TEST_DATA = b'testdata'
1139     TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
1140
1141     def setUp(self):
1142         self.client_kwargs = {'proxy': self.PROXY_ADDR, 'local_store': ''}
1143
1144     def new_client(self, **caller_kwargs):
1145         kwargs = self.client_kwargs.copy()
1146         kwargs.update(caller_kwargs)
1147         kwargs['block_cache'] = self.make_block_cache(self.disk_cache)
1148         return arvados.KeepClient(**kwargs)
1149
1150     def run_method(self, *args, **kwargs):
1151         raise NotImplementedError("test subclasses must define run_method")
1152
1153     def check_success(self, expected=None, *args, **kwargs):
1154         if expected is None:
1155             expected = self.DEFAULT_EXPECT
1156         self.assertEqual(expected, self.run_method(*args, **kwargs))
1157
1158     def check_exception(self, error_class=None, *args, **kwargs):
1159         if error_class is None:
1160             error_class = self.DEFAULT_EXCEPTION
1161         with self.assertRaises(error_class) as err:
1162             self.run_method(*args, **kwargs)
1163         return err
1164
1165     def test_immediate_success(self):
1166         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 200):
1167             self.check_success()
1168
1169     def test_retry_then_success(self):
1170         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1171             self.check_success(num_retries=3)
1172
1173     def test_exception_then_success(self):
1174         with self.TEST_PATCHER(self.DEFAULT_EXPECT, Exception('mock err'), 200):
1175             self.check_success(num_retries=3)
1176
1177     def test_no_retry_after_permanent_error(self):
1178         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 403, 200):
1179             self.check_exception(num_retries=3)
1180
1181     def test_error_after_retries_exhausted(self):
1182         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 500, 200):
1183             err = self.check_exception(num_retries=1)
1184         self.assertRegex(str(err.exception), r'failed to .* after 2 attempts')
1185
1186     def test_num_retries_instance_fallback(self):
1187         self.client_kwargs['num_retries'] = 3
1188         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1189             self.check_success()
1190
1191
1192 @tutil.skip_sleep
1193 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1194 class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1195     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
1196     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1197     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1198     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1199
1200     def tearDown(self):
1201         DiskCacheBase.tearDown(self)
1202
1203     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1204                    *args, **kwargs):
1205         return self.new_client().get(locator, *args, **kwargs)
1206
1207     def test_specific_exception_when_not_found(self):
1208         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1209             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1210
1211     def test_general_exception_with_mixed_errors(self):
1212         # get should raise a NotFoundError if no server returns the block,
1213         # and a high threshold of servers report that it's not found.
1214         # This test rigs up 50/50 disagreement between two servers, and
1215         # checks that it does not become a NotFoundError.
1216         client = self.new_client(num_retries=0)
1217         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1218             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1219                 client.get(self.HINTED_LOCATOR)
1220             self.assertNotIsInstance(
1221                 exc_check.exception, arvados.errors.NotFoundError,
1222                 "mixed errors raised NotFoundError")
1223
1224     def test_hint_server_can_succeed_without_retries(self):
1225         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1226             self.check_success(locator=self.HINTED_LOCATOR)
1227
1228     def test_try_next_server_after_timeout(self):
1229         with tutil.mock_keep_responses(
1230                 (socket.timeout("timed out"), 200),
1231                 (self.DEFAULT_EXPECT, 200)):
1232             self.check_success(locator=self.HINTED_LOCATOR)
1233
1234     def test_retry_data_with_wrong_checksum(self):
1235         with tutil.mock_keep_responses(
1236                 ('baddata', 200),
1237                 (self.DEFAULT_EXPECT, 200)):
1238             self.check_success(locator=self.HINTED_LOCATOR)
1239
1240
1241 @tutil.skip_sleep
1242 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1243 class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1244     DEFAULT_EXPECT = True
1245     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1246     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1247     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1248
1249     def tearDown(self):
1250         DiskCacheBase.tearDown(self)
1251
1252     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1253                    *args, **kwargs):
1254         return self.new_client().head(locator, *args, **kwargs)
1255
1256     def test_specific_exception_when_not_found(self):
1257         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1258             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1259
1260     def test_general_exception_with_mixed_errors(self):
1261         # head should raise a NotFoundError if no server returns the block,
1262         # and a high threshold of servers report that it's not found.
1263         # This test rigs up 50/50 disagreement between two servers, and
1264         # checks that it does not become a NotFoundError.
1265         client = self.new_client(num_retries=0)
1266         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1267             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1268                 client.head(self.HINTED_LOCATOR)
1269             self.assertNotIsInstance(
1270                 exc_check.exception, arvados.errors.NotFoundError,
1271                 "mixed errors raised NotFoundError")
1272
1273     def test_hint_server_can_succeed_without_retries(self):
1274         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1275             self.check_success(locator=self.HINTED_LOCATOR)
1276
1277     def test_try_next_server_after_timeout(self):
1278         with tutil.mock_keep_responses(
1279                 (socket.timeout("timed out"), 200),
1280                 (self.DEFAULT_EXPECT, 200)):
1281             self.check_success(locator=self.HINTED_LOCATOR)
1282
1283
1284 @tutil.skip_sleep
1285 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1286 class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1287     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
1288     DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
1289     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1290
1291     def tearDown(self):
1292         DiskCacheBase.tearDown(self)
1293
1294     def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
1295                    copies=1, *args, **kwargs):
1296         return self.new_client().put(data, copies, *args, **kwargs)
1297
1298     def test_do_not_send_multiple_copies_to_same_server(self):
1299         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200):
1300             self.check_exception(copies=2, num_retries=3)
1301
1302
1303 class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
1304     class FakeKeepService(object):
1305         def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
1306             self.delay = delay
1307             self.will_succeed = will_succeed
1308             self.will_raise = will_raise
1309             self._result = {}
1310             self._result['headers'] = {}
1311             self._result['headers']['x-keep-replicas-stored'] = str(replicas)
1312             self._result['headers']['x-keep-storage-classes-confirmed'] = 'default={}'.format(replicas)
1313             self._result['body'] = 'foobar'
1314
1315         def put(self, data_hash, data, timeout, headers):
1316             time.sleep(self.delay)
1317             if self.will_raise is not None:
1318                 raise self.will_raise
1319             return self.will_succeed
1320
1321         def last_result(self):
1322             if self.will_succeed:
1323                 return self._result
1324             else:
1325                 return {"status_code": 500, "body": "didn't succeed"}
1326
1327         def finished(self):
1328             return False
1329
1330
1331     def setUp(self):
1332         self.copies = 3
1333         self.pool = arvados.KeepClient.KeepWriterThreadPool(
1334             data = 'foo',
1335             data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1336             max_service_replicas = self.copies,
1337             copies = self.copies
1338         )
1339
1340     def test_only_write_enough_on_success(self):
1341         for i in range(10):
1342             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1343             self.pool.add_task(ks, None)
1344         self.pool.join()
1345         self.assertEqual(self.pool.done(), (self.copies, []))
1346
1347     def test_only_write_enough_on_partial_success(self):
1348         for i in range(5):
1349             ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
1350             self.pool.add_task(ks, None)
1351             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1352             self.pool.add_task(ks, None)
1353         self.pool.join()
1354         self.assertEqual(self.pool.done(), (self.copies, []))
1355
1356     def test_only_write_enough_when_some_crash(self):
1357         for i in range(5):
1358             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1359             self.pool.add_task(ks, None)
1360             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1361             self.pool.add_task(ks, None)
1362         self.pool.join()
1363         self.assertEqual(self.pool.done(), (self.copies, []))
1364
1365     def test_fail_when_too_many_crash(self):
1366         for i in range(self.copies+1):
1367             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1368             self.pool.add_task(ks, None)
1369         for i in range(self.copies-1):
1370             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1371             self.pool.add_task(ks, None)
1372         self.pool.join()
1373         self.assertEqual(self.pool.done(), (self.copies-1, []))
1374
1375
1376 @tutil.skip_sleep
1377 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1378 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
1379     block_cache = False
1380
1381     # Test put()s that need two distinct servers to succeed, possibly
1382     # requiring multiple passes through the retry loop.
1383
1384     def setUp(self):
1385         self.api_client = self.mock_keep_services(count=2)
1386         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
1387
1388     def tearDown(self):
1389         DiskCacheBase.tearDown(self)
1390
1391     def test_success_after_exception(self):
1392         with tutil.mock_keep_responses(
1393                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1394                 Exception('mock err'), 200, 200) as req_mock:
1395             self.keep_client.put('foo', num_retries=1, copies=2)
1396         self.assertEqual(3, req_mock.call_count)
1397
1398     def test_success_after_retryable_error(self):
1399         with tutil.mock_keep_responses(
1400                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1401                 500, 200, 200) as req_mock:
1402             self.keep_client.put('foo', num_retries=1, copies=2)
1403         self.assertEqual(3, req_mock.call_count)
1404
1405     def test_fail_after_final_error(self):
1406         # First retry loop gets a 200 (can't achieve replication by
1407         # storing again on that server) and a 400 (can't retry that
1408         # server at all), so we shouldn't try a third request.
1409         with tutil.mock_keep_responses(
1410                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1411                 200, 400, 200) as req_mock:
1412             with self.assertRaises(arvados.errors.KeepWriteError):
1413                 self.keep_client.put('foo', num_retries=1, copies=2)
1414         self.assertEqual(2, req_mock.call_count)
1415
1416
1417 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1418 class KeepClientAPIErrorTest(unittest.TestCase, DiskCacheBase):
1419     disk_cache = False
1420
1421     def tearDown(self):
1422         DiskCacheBase.tearDown(self)
1423
1424     def test_api_fail(self):
1425         class ApiMock(object):
1426             def __getattr__(self, r):
1427                 if r == "api_token":
1428                     return "abc"
1429                 elif r == "insecure":
1430                     return False
1431                 elif r == "config":
1432                     return lambda: {}
1433                 else:
1434                     raise arvados.errors.KeepReadError()
1435         keep_client = arvados.KeepClient(api_client=ApiMock(),
1436                                          proxy='', local_store='',
1437                                          block_cache=self.make_block_cache(self.disk_cache))
1438
1439         # The bug this is testing for is that if an API (not
1440         # keepstore) exception is thrown as part of a get(), the next
1441         # attempt to get that same block will result in a deadlock.
1442         # This is why there are two get()s in a row.  Unfortunately,
1443         # the failure mode for this test is that the test suite
1444         # deadlocks, there isn't a good way to avoid that without
1445         # adding a special case that has no use except for this test.
1446
1447         with self.assertRaises(arvados.errors.KeepReadError):
1448             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1449         with self.assertRaises(arvados.errors.KeepReadError):
1450             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1451
1452
1453 class KeepDiskCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
1454     def setUp(self):
1455         self.api_client = self.mock_keep_services(count=2)
1456         self.data = b'xyzzy'
1457         self.locator = '1271ed5ef305aadabc605b1609e24c52'
1458         self.disk_cache_dir = tempfile.mkdtemp()
1459
1460     def tearDown(self):
1461         shutil.rmtree(self.disk_cache_dir)
1462
1463     @mock.patch('arvados.util._BaseDirectories.storage_path')
1464     def test_default_disk_cache_dir(self, storage_path):
1465         expected = Path(self.disk_cache_dir)
1466         storage_path.return_value = expected
1467         cache = arvados.keep.KeepBlockCache(disk_cache=True)
1468         storage_path.assert_called_with('keep')
1469         self.assertEqual(cache._disk_cache_dir, str(expected))
1470
1471     @mock.patch('arvados.KeepClient.KeepService.get')
1472     def test_disk_cache_read(self, get_mock):
1473         # confirm it finds an existing cache block when the cache is
1474         # initialized.
1475
1476         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1477         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1478             f.write(self.data)
1479
1480         # block cache should have found the existing block
1481         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1482                                                   disk_cache_dir=self.disk_cache_dir)
1483         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1484
1485         self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1486
1487         get_mock.assert_not_called()
1488
1489     @mock.patch('arvados.KeepClient.KeepService.get')
1490     def test_disk_cache_share(self, get_mock):
1491         # confirm it finds a cache block written after the disk cache
1492         # was initialized.
1493
1494         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1495                                                   disk_cache_dir=self.disk_cache_dir)
1496         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1497
1498         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1499         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1500             f.write(self.data)
1501
1502         # when we try to get the block, it'll check the disk and find it.
1503         self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1504
1505         get_mock.assert_not_called()
1506
1507     def test_disk_cache_write(self):
1508         # confirm the cache block was created
1509
1510         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1511                                                   disk_cache_dir=self.disk_cache_dir)
1512         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1513
1514         with tutil.mock_keep_responses(self.data, 200) as mock:
1515             self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1516
1517         self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1518
1519         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1520             self.assertTrue(tutil.binary_compare(f.read(), self.data))
1521
1522     def test_disk_cache_clean(self):
1523         # confirm that a tmp file in the cache is cleaned up
1524
1525         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1526         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock"), "wb") as f:
1527             f.write(b"abc1")
1528
1529         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC"), "wb") as f:
1530             f.write(b"abc2")
1531
1532         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC"), "wb") as f:
1533             f.write(b"abc3")
1534
1535         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1536         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1537         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1538
1539         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1540                                                   disk_cache_dir=self.disk_cache_dir)
1541
1542         # The tmp still hasn't been deleted because it was created in the last 60 seconds
1543         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1544         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1545         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1546
1547         # Set the mtime to 61s in the past
1548         os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock"), times=(time.time()-61, time.time()-61))
1549         os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC"), times=(time.time()-61, time.time()-61))
1550         os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC"), times=(time.time()-61, time.time()-61))
1551
1552         block_cache2 = arvados.keep.KeepBlockCache(disk_cache=True,
1553                                                    disk_cache_dir=self.disk_cache_dir)
1554
1555         # Tmp should be gone but the other ones are safe.
1556         self.assertFalse(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1557         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1558         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1559
1560     @mock.patch('arvados.KeepClient.KeepService.get')
1561     def test_disk_cache_cap(self, get_mock):
1562         # confirm that the cache is kept to the desired limit
1563
1564         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1565         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1566             f.write(self.data)
1567
1568         os.makedirs(os.path.join(self.disk_cache_dir, "acb"))
1569         with open(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock"), "wb") as f:
1570             f.write(b"foo")
1571
1572         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1573         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1574
1575         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1576                                                    disk_cache_dir=self.disk_cache_dir,
1577                                                    max_slots=1)
1578
1579         self.assertFalse(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1580         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1581
1582     @mock.patch('arvados.KeepClient.KeepService.get')
1583     def test_disk_cache_share(self, get_mock):
1584         # confirm that a second cache doesn't delete files that belong to the first cache.
1585
1586         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1587         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1588             f.write(self.data)
1589
1590         os.makedirs(os.path.join(self.disk_cache_dir, "acb"))
1591         with open(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock"), "wb") as f:
1592             f.write(b"foo")
1593
1594         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1595         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1596
1597         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1598                                                    disk_cache_dir=self.disk_cache_dir,
1599                                                    max_slots=2)
1600
1601         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1602         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1603
1604         block_cache2 = arvados.keep.KeepBlockCache(disk_cache=True,
1605                                                    disk_cache_dir=self.disk_cache_dir,
1606                                                    max_slots=1)
1607
1608         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1609         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1610
1611     def test_disk_cache_error(self):
1612         os.chmod(self.disk_cache_dir, stat.S_IRUSR)
1613
1614         # Fail during cache initialization.
1615         with self.assertRaises(OSError):
1616             block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1617                                                       disk_cache_dir=self.disk_cache_dir)
1618
1619     def test_disk_cache_write_error(self):
1620         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1621                                                   disk_cache_dir=self.disk_cache_dir)
1622
1623         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1624
1625         # Make the cache dir read-only
1626         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1627         os.chmod(os.path.join(self.disk_cache_dir, self.locator[0:3]), stat.S_IRUSR)
1628
1629         # Cache fails
1630         with self.assertRaises(arvados.errors.KeepCacheError):
1631             with tutil.mock_keep_responses(self.data, 200) as mock:
1632                 keep_client.get(self.locator)
1633
1634     def test_disk_cache_retry_write_error(self):
1635         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1636                                                   disk_cache_dir=self.disk_cache_dir)
1637
1638         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1639
1640         called = False
1641         realmmap = mmap.mmap
1642         def sideeffect_mmap(*args, **kwargs):
1643             nonlocal called
1644             if not called:
1645                 called = True
1646                 raise OSError(errno.ENOSPC, "no space")
1647             else:
1648                 return realmmap(*args, **kwargs)
1649
1650         with patch('mmap.mmap') as mockmmap:
1651             mockmmap.side_effect = sideeffect_mmap
1652
1653             cache_max_before = block_cache.cache_max
1654
1655             with tutil.mock_keep_responses(self.data, 200) as mock:
1656                 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1657
1658             self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1659
1660         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1661             self.assertTrue(tutil.binary_compare(f.read(), self.data))
1662
1663         # shrank the cache in response to ENOSPC
1664         self.assertTrue(cache_max_before > block_cache.cache_max)
1665
1666     def test_disk_cache_retry_write_error2(self):
1667         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1668                                                   disk_cache_dir=self.disk_cache_dir)
1669
1670         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1671
1672         called = False
1673         realmmap = mmap.mmap
1674         def sideeffect_mmap(*args, **kwargs):
1675             nonlocal called
1676             if not called:
1677                 called = True
1678                 raise OSError(errno.ENOMEM, "no memory")
1679             else:
1680                 return realmmap(*args, **kwargs)
1681
1682         with patch('mmap.mmap') as mockmmap:
1683             mockmmap.side_effect = sideeffect_mmap
1684
1685             slots_before = block_cache._max_slots
1686
1687             with tutil.mock_keep_responses(self.data, 200) as mock:
1688                 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1689
1690             self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1691
1692         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1693             self.assertTrue(tutil.binary_compare(f.read(), self.data))
1694
1695         # shrank the cache in response to ENOMEM
1696         self.assertTrue(slots_before > block_cache._max_slots)