Merge branch '22204-group-sharing' refs #22204
[arvados.git] / sdk / python / tests / test_keep_client.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import errno
6 import hashlib
7 import mmap
8 import os
9 import random
10 import re
11 import shutil
12 import socket
13 import stat
14 import sys
15 import tempfile
16 import time
17 import unittest
18 import urllib.parse
19
20 from pathlib import Path
21 from unittest import mock
22 from unittest.mock import patch
23
24 import parameterized
25 import pycurl
26
27 import arvados
28 import arvados.retry
29 import arvados.util
30
31 from . import arvados_testutil as tutil
32 from . import keepstub
33 from . import run_test_server
34
35 from .arvados_testutil import DiskCacheBase
36
37 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
38 class KeepTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
39     disk_cache = False
40     MAIN_SERVER = {}
41     KEEP_SERVER = {}
42     block_cache_test = None
43
44     @classmethod
45     def setUpClass(cls):
46         super(KeepTestCase, cls).setUpClass()
47         run_test_server.authorize_with("admin")
48         cls.api_client = arvados.api('v1')
49         cls.block_cache_test = DiskCacheBase()
50         cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
51                                              proxy='', local_store='',
52                                              block_cache=cls.block_cache_test.make_block_cache(cls.disk_cache))
53
54     @classmethod
55     def tearDownClass(cls):
56         super(KeepTestCase, cls).setUpClass()
57         cls.block_cache_test.tearDown()
58
59     def test_KeepBasicRWTest(self):
60         self.assertEqual(0, self.keep_client.upload_counter.get())
61         foo_locator = self.keep_client.put('foo')
62         self.assertRegex(
63             foo_locator,
64             r'^acbd18db4cc2f85cedef654fccc4a4d8\+3',
65             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
66
67         # 6 bytes because uploaded 2 copies
68         self.assertEqual(6, self.keep_client.upload_counter.get())
69
70         self.assertEqual(0, self.keep_client.download_counter.get())
71         self.assertTrue(tutil.binary_compare(self.keep_client.get(foo_locator),
72                          b'foo'),
73                          'wrong content from Keep.get(md5("foo"))')
74         self.assertEqual(3, self.keep_client.download_counter.get())
75
76     def test_KeepBinaryRWTest(self):
77         blob_str = b'\xff\xfe\xf7\x00\x01\x02'
78         blob_locator = self.keep_client.put(blob_str)
79         self.assertRegex(
80             blob_locator,
81             r'^7fc7c53b45e53926ba52821140fef396\+6',
82             ('wrong locator from Keep.put(<binarydata>):' + blob_locator))
83         self.assertEqual(self.keep_client.get(blob_locator),
84                          blob_str,
85                          'wrong content from Keep.get(md5(<binarydata>))')
86
87     def test_KeepLongBinaryRWTest(self):
88         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
89         for i in range(0, 23):
90             blob_data = blob_data + blob_data
91         blob_locator = self.keep_client.put(blob_data)
92         self.assertRegex(
93             blob_locator,
94             r'^84d90fc0d8175dd5dcfab04b999bc956\+67108864',
95             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
96         self.assertEqual(self.keep_client.get(blob_locator),
97                          blob_data,
98                          'wrong content from Keep.get(md5(<binarydata>))')
99
100     @unittest.skip("unreliable test - please fix and close #8752")
101     def test_KeepSingleCopyRWTest(self):
102         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
103         blob_locator = self.keep_client.put(blob_data, copies=1)
104         self.assertRegex(
105             blob_locator,
106             r'^c902006bc98a3eb4a3663b65ab4a6fab\+8',
107             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
108         self.assertEqual(self.keep_client.get(blob_locator),
109                          blob_data,
110                          'wrong content from Keep.get(md5(<binarydata>))')
111
112     def test_KeepEmptyCollectionTest(self):
113         blob_locator = self.keep_client.put('', copies=1)
114         self.assertRegex(
115             blob_locator,
116             r'^d41d8cd98f00b204e9800998ecf8427e\+0',
117             ('wrong locator from Keep.put(""): ' + blob_locator))
118
119     def test_KeepPutDataType(self):
120         with self.assertRaises(AttributeError):
121             # Must be bytes or have an encode() method
122             self.keep_client.put({})
123
124     def test_KeepHeadTest(self):
125         locator = self.keep_client.put('test_head')
126         self.assertRegex(
127             locator,
128             r'^b9a772c7049325feb7130fff1f8333e9\+9',
129             'wrong md5 hash from Keep.put for "test_head": ' + locator)
130         self.assertEqual(True, self.keep_client.head(locator))
131         self.assertEqual(self.keep_client.get(locator),
132                          b'test_head',
133                          'wrong content from Keep.get for "test_head"')
134
135 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
136 class KeepPermissionTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
137     disk_cache = False
138     MAIN_SERVER = {}
139     KEEP_SERVER = {'blob_signing': True}
140
141     def tearDown(self):
142         DiskCacheBase.tearDown(self)
143
144     def test_KeepBasicRWTest(self):
145         run_test_server.authorize_with('active')
146         keep_client = arvados.KeepClient(block_cache=self.make_block_cache(self.disk_cache))
147         foo_locator = keep_client.put('foo')
148         self.assertRegex(
149             foo_locator,
150             r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
151             'invalid locator from Keep.put("foo"): ' + foo_locator)
152         self.assertEqual(keep_client.get(foo_locator),
153                          b'foo',
154                          'wrong content from Keep.get(md5("foo"))')
155
156         # GET with an unsigned locator => bad request
157         bar_locator = keep_client.put('bar')
158         unsigned_bar_locator = "37b51d194a7513e45b56f6524f2d51f2+3"
159         self.assertRegex(
160             bar_locator,
161             r'^37b51d194a7513e45b56f6524f2d51f2\+3\+A[a-f0-9]+@[a-f0-9]+$',
162             'invalid locator from Keep.put("bar"): ' + bar_locator)
163         self.assertRaises(arvados.errors.KeepReadError,
164                           keep_client.get,
165                           unsigned_bar_locator)
166
167         # GET from a different user => bad request
168         run_test_server.authorize_with('spectator')
169         keep_client2 = arvados.KeepClient(block_cache=self.make_block_cache(self.disk_cache))
170         self.assertRaises(arvados.errors.KeepReadError,
171                           keep_client2.get,
172                           bar_locator)
173
174         # Unauthenticated GET for a signed locator => bad request
175         # Unauthenticated GET for an unsigned locator => bad request
176         keep_client.api_token = ''
177         self.assertRaises(arvados.errors.KeepReadError,
178                           keep_client.get,
179                           bar_locator)
180         self.assertRaises(arvados.errors.KeepReadError,
181                           keep_client.get,
182                           unsigned_bar_locator)
183
184
185 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
186 class KeepProxyTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
187     disk_cache = False
188     MAIN_SERVER = {}
189     KEEP_SERVER = {}
190     KEEP_PROXY_SERVER = {}
191
192     @classmethod
193     def setUpClass(cls):
194         super(KeepProxyTestCase, cls).setUpClass()
195         run_test_server.authorize_with('active')
196         cls.api_client = arvados.api('v1')
197
198     def tearDown(self):
199         super(KeepProxyTestCase, self).tearDown()
200         DiskCacheBase.tearDown(self)
201
202     def test_KeepProxyTest1(self):
203         # Will use ARVADOS_KEEP_SERVICES environment variable that
204         # is set by setUpClass().
205         keep_client = arvados.KeepClient(api_client=self.api_client,
206                                          local_store='', block_cache=self.make_block_cache(self.disk_cache))
207         baz_locator = keep_client.put('baz')
208         self.assertRegex(
209             baz_locator,
210             r'^73feffa4b7f6bb68e44cf984c85f6e88\+3',
211             'wrong md5 hash from Keep.put("baz"): ' + baz_locator)
212         self.assertEqual(keep_client.get(baz_locator),
213                          b'baz',
214                          'wrong content from Keep.get(md5("baz"))')
215         self.assertTrue(keep_client.using_proxy)
216
217     def test_KeepProxyTestMultipleURIs(self):
218         # Test using ARVADOS_KEEP_SERVICES env var overriding any
219         # existing proxy setting and setting multiple proxies
220         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
221         keep_client = arvados.KeepClient(api_client=self.api_client,
222                                          local_store='',
223                                          block_cache=self.make_block_cache(self.disk_cache))
224         uris = [x['_service_root'] for x in keep_client._keep_services]
225         self.assertEqual(uris, ['http://10.0.0.1/',
226                                 'https://foo.example.org:1234/'])
227
228     def test_KeepProxyTestInvalidURI(self):
229         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org'
230         with self.assertRaises(arvados.errors.ArgumentError):
231             keep_client = arvados.KeepClient(api_client=self.api_client,
232                                              local_store='',
233                                              block_cache=self.make_block_cache(self.disk_cache))
234
235
236 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
237 class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
238     disk_cache = False
239
240     def tearDown(self):
241         DiskCacheBase.tearDown(self)
242
243     def get_service_roots(self, api_client):
244         keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
245         services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
246         return [urllib.parse.urlparse(url) for url in sorted(services)]
247
248     def test_ssl_flag_respected_in_roots(self):
249         for ssl_flag in [False, True]:
250             services = self.get_service_roots(self.mock_keep_services(
251                 service_ssl_flag=ssl_flag))
252             self.assertEqual(
253                 ('https' if ssl_flag else 'http'), services[0].scheme)
254
255     def test_correct_ports_with_ipv6_addresses(self):
256         service = self.get_service_roots(self.mock_keep_services(
257             service_type='proxy', service_host='100::1', service_port=10, count=1))[0]
258         self.assertEqual('100::1', service.hostname)
259         self.assertEqual(10, service.port)
260
261     def test_recognize_proxy_services_in_controller_response(self):
262         keep_client = arvados.KeepClient(api_client=self.mock_keep_services(
263             service_type='proxy', service_host='localhost', service_port=9, count=1),
264                                          block_cache=self.make_block_cache(self.disk_cache))
265         try:
266             # this will fail, but it ensures we get the service
267             # discovery response
268             keep_client.put('baz2', num_retries=0)
269         except:
270             pass
271         self.assertTrue(keep_client.using_proxy)
272
273     def test_insecure_disables_tls_verify(self):
274         api_client = self.mock_keep_services(count=1)
275         force_timeout = socket.timeout("timed out")
276
277         api_client.insecure = True
278         with tutil.mock_keep_responses(b'foo', 200) as mock:
279             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
280             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
281             self.assertEqual(
282                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
283                 0)
284             self.assertEqual(
285                 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
286                 0)
287
288         api_client.insecure = False
289         with tutil.mock_keep_responses(b'foo', 200) as mock:
290             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
291             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
292             # getopt()==None here means we didn't change the
293             # default. If we were using real pycurl instead of a mock,
294             # it would return the default value 1.
295             self.assertEqual(
296                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
297                 None)
298             self.assertEqual(
299                 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
300                 None)
301
302     def test_refresh_signature(self):
303         blk_digest = '6f5902ac237024bdd0c176cb93063dc4+11'
304         blk_sig = 'da39a3ee5e6b4b0d3255bfef95601890afd80709@53bed294'
305         local_loc = blk_digest+'+A'+blk_sig
306         remote_loc = blk_digest+'+R'+blk_sig
307         api_client = self.mock_keep_services(count=1)
308         headers = {'X-Keep-Locator':local_loc}
309         with tutil.mock_keep_responses('', 200, **headers):
310             # Check that the translated locator gets returned
311             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
312             self.assertEqual(local_loc, keep_client.refresh_signature(remote_loc))
313             # Check that refresh_signature() uses the correct method and headers
314             keep_client._get_or_head = mock.MagicMock()
315             keep_client.refresh_signature(remote_loc)
316             args, kwargs = keep_client._get_or_head.call_args_list[0]
317             self.assertIn(remote_loc, args)
318             self.assertEqual("HEAD", kwargs['method'])
319             self.assertIn('X-Keep-Signature', kwargs['headers'])
320
321     # test_*_timeout verify that KeepClient instructs pycurl to use
322     # the appropriate connection and read timeouts. They don't care
323     # whether pycurl actually exhibits the expected timeout behavior
324     # -- those tests are in the KeepClientTimeout test class.
325
326     def test_get_timeout(self):
327         api_client = self.mock_keep_services(count=1)
328         force_timeout = socket.timeout("timed out")
329         with tutil.mock_keep_responses(force_timeout, 0) as mock:
330             keep_client = arvados.KeepClient(
331                 api_client=api_client,
332                 block_cache=self.make_block_cache(self.disk_cache),
333                 num_retries=0,
334             )
335             with self.assertRaises(arvados.errors.KeepReadError):
336                 keep_client.get('ffffffffffffffffffffffffffffffff')
337             self.assertEqual(
338                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
339                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
340             self.assertEqual(
341                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
342                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
343             self.assertEqual(
344                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
345                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
346
347     def test_put_timeout(self):
348         api_client = self.mock_keep_services(count=1)
349         force_timeout = socket.timeout("timed out")
350         with tutil.mock_keep_responses(force_timeout, 0) as mock:
351             keep_client = arvados.KeepClient(
352                 api_client=api_client,
353                 block_cache=self.make_block_cache(self.disk_cache),
354                 num_retries=0,
355             )
356             with self.assertRaises(arvados.errors.KeepWriteError):
357                 keep_client.put(b'foo')
358             self.assertEqual(
359                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
360                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
361             self.assertEqual(
362                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
363                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
364             self.assertEqual(
365                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
366                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
367
368     def test_head_timeout(self):
369         api_client = self.mock_keep_services(count=1)
370         force_timeout = socket.timeout("timed out")
371         with tutil.mock_keep_responses(force_timeout, 0) as mock:
372             keep_client = arvados.KeepClient(
373                 api_client=api_client,
374                 block_cache=self.make_block_cache(self.disk_cache),
375                 num_retries=0,
376             )
377             with self.assertRaises(arvados.errors.KeepReadError):
378                 keep_client.head('ffffffffffffffffffffffffffffffff')
379             self.assertEqual(
380                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
381                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
382             self.assertEqual(
383                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
384                 None)
385             self.assertEqual(
386                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
387                 None)
388
389     def test_proxy_get_timeout(self):
390         api_client = self.mock_keep_services(service_type='proxy', count=1)
391         force_timeout = socket.timeout("timed out")
392         with tutil.mock_keep_responses(force_timeout, 0) as mock:
393             keep_client = arvados.KeepClient(
394                 api_client=api_client,
395                 block_cache=self.make_block_cache(self.disk_cache),
396                 num_retries=0,
397             )
398             with self.assertRaises(arvados.errors.KeepReadError):
399                 keep_client.get('ffffffffffffffffffffffffffffffff')
400             self.assertEqual(
401                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
402                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
403             self.assertEqual(
404                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
405                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
406             self.assertEqual(
407                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
408                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
409
410     def test_proxy_head_timeout(self):
411         api_client = self.mock_keep_services(service_type='proxy', count=1)
412         force_timeout = socket.timeout("timed out")
413         with tutil.mock_keep_responses(force_timeout, 0) as mock:
414             keep_client = arvados.KeepClient(
415                 api_client=api_client,
416                 block_cache=self.make_block_cache(self.disk_cache),
417                 num_retries=0,
418             )
419             with self.assertRaises(arvados.errors.KeepReadError):
420                 keep_client.head('ffffffffffffffffffffffffffffffff')
421             self.assertEqual(
422                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
423                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
424             self.assertEqual(
425                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
426                 None)
427             self.assertEqual(
428                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
429                 None)
430
431     def test_proxy_put_timeout(self):
432         self.disk_cache_dir = None
433         api_client = self.mock_keep_services(service_type='proxy', count=1)
434         force_timeout = socket.timeout("timed out")
435         with tutil.mock_keep_responses(force_timeout, 0) as mock:
436             keep_client = arvados.KeepClient(
437                 api_client=api_client,
438                 num_retries=0,
439             )
440             with self.assertRaises(arvados.errors.KeepWriteError):
441                 keep_client.put('foo')
442             self.assertEqual(
443                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
444                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
445             self.assertEqual(
446                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
447                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
448             self.assertEqual(
449                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
450                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
451
452     def check_no_services_error(self, verb, exc_class):
453         api_client = mock.MagicMock(name='api_client')
454         api_client.keep_services().accessible().execute.side_effect = (
455             arvados.errors.ApiError)
456         keep_client = arvados.KeepClient(
457             api_client=api_client,
458             block_cache=self.make_block_cache(self.disk_cache),
459             num_retries=0,
460         )
461         with self.assertRaises(exc_class) as err_check:
462             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
463         self.assertEqual(0, len(err_check.exception.request_errors()))
464
465     def test_get_error_with_no_services(self):
466         self.check_no_services_error('get', arvados.errors.KeepReadError)
467
468     def test_head_error_with_no_services(self):
469         self.check_no_services_error('head', arvados.errors.KeepReadError)
470
471     def test_put_error_with_no_services(self):
472         self.check_no_services_error('put', arvados.errors.KeepWriteError)
473
474     def check_errors_from_last_retry(self, verb, exc_class):
475         api_client = self.mock_keep_services(count=2)
476         req_mock = tutil.mock_keep_responses(
477             "retry error reporting test", 500, 500, 500, 500, 500, 500, 502, 502)
478         with req_mock, tutil.skip_sleep, \
479                 self.assertRaises(exc_class) as err_check:
480             keep_client = arvados.KeepClient(
481                 api_client=api_client,
482                 block_cache=self.make_block_cache(self.disk_cache),
483                 num_retries=0,
484             )
485             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
486                                        num_retries=3)
487         self.assertEqual([502, 502], [
488                 getattr(error, 'status_code', None)
489                 for error in err_check.exception.request_errors().values()])
490         self.assertRegex(str(err_check.exception), r'failed to (read|write) .* after 4 attempts')
491
492     def test_get_error_reflects_last_retry(self):
493         self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
494
495     def test_head_error_reflects_last_retry(self):
496         self.check_errors_from_last_retry('head', arvados.errors.KeepReadError)
497
498     def test_put_error_reflects_last_retry(self):
499         self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
500
501     def test_put_error_does_not_include_successful_puts(self):
502         data = 'partial failure test'
503         data_loc = tutil.str_keep_locator(data)
504         api_client = self.mock_keep_services(count=3)
505         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
506                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
507             keep_client = arvados.KeepClient(
508                 api_client=api_client,
509                 block_cache=self.make_block_cache(self.disk_cache),
510                 num_retries=0,
511             )
512             keep_client.put(data)
513         self.assertEqual(2, len(exc_check.exception.request_errors()))
514
515     def test_proxy_put_with_no_writable_services(self):
516         data = 'test with no writable services'
517         data_loc = tutil.str_keep_locator(data)
518         api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
519         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
520                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
521             keep_client = arvados.KeepClient(
522                 api_client=api_client,
523                 block_cache=self.make_block_cache(self.disk_cache),
524                 num_retries=0,
525             )
526             keep_client.put(data)
527         self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
528         self.assertEqual(0, len(exc_check.exception.request_errors()))
529
530     def test_oddball_service_get(self):
531         body = b'oddball service get'
532         api_client = self.mock_keep_services(service_type='fancynewblobstore')
533         with tutil.mock_keep_responses(body, 200):
534             keep_client = arvados.KeepClient(
535                 api_client=api_client,
536                 block_cache=self.make_block_cache(self.disk_cache),
537                 num_retries=0,
538             )
539             actual = keep_client.get(tutil.str_keep_locator(body))
540         self.assertEqual(body, actual)
541
542     def test_oddball_service_put(self):
543         body = b'oddball service put'
544         pdh = tutil.str_keep_locator(body)
545         api_client = self.mock_keep_services(service_type='fancynewblobstore')
546         with tutil.mock_keep_responses(pdh, 200):
547             keep_client = arvados.KeepClient(
548                 api_client=api_client,
549                 block_cache=self.make_block_cache(self.disk_cache),
550                 num_retries=0,
551             )
552             actual = keep_client.put(body, copies=1)
553         self.assertEqual(pdh, actual)
554
555     def test_oddball_service_writer_count(self):
556         body = b'oddball service writer count'
557         pdh = tutil.str_keep_locator(body)
558         api_client = self.mock_keep_services(service_type='fancynewblobstore',
559                                              count=4)
560         headers = {'x-keep-replicas-stored': 3}
561         with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
562                                        **headers) as req_mock:
563             keep_client = arvados.KeepClient(
564                 api_client=api_client,
565                 block_cache=self.make_block_cache(self.disk_cache),
566                 num_retries=0,
567             )
568             actual = keep_client.put(body, copies=2)
569         self.assertEqual(pdh, actual)
570         self.assertEqual(1, req_mock.call_count)
571
572
573 @tutil.skip_sleep
574 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
575 class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
576     disk_cache = False
577
578     def setUp(self):
579         self.api_client = self.mock_keep_services(count=2)
580         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
581         self.data = b'xyzzy'
582         self.locator = '1271ed5ef305aadabc605b1609e24c52'
583
584     def tearDown(self):
585         DiskCacheBase.tearDown(self)
586
587     @mock.patch('arvados.KeepClient._KeepService.get')
588     def test_get_request_cache(self, get_mock):
589         with tutil.mock_keep_responses(self.data, 200, 200):
590             self.keep_client.get(self.locator)
591             self.keep_client.get(self.locator)
592         # Request already cached, don't require more than one request
593         get_mock.assert_called_once()
594
595     @mock.patch('arvados.KeepClient._KeepService.get')
596     def test_head_request_cache(self, get_mock):
597         with tutil.mock_keep_responses(self.data, 200, 200):
598             self.keep_client.head(self.locator)
599             self.keep_client.head(self.locator)
600         # Don't cache HEAD requests so that they're not confused with GET reqs
601         self.assertEqual(2, get_mock.call_count)
602
603     @mock.patch('arvados.KeepClient._KeepService.get')
604     def test_head_and_then_get_return_different_responses(self, get_mock):
605         head_resp = None
606         get_resp = None
607         get_mock.side_effect = [b'first response', b'second response']
608         with tutil.mock_keep_responses(self.data, 200, 200):
609             head_resp = self.keep_client.head(self.locator)
610             get_resp = self.keep_client.get(self.locator)
611         self.assertEqual(b'first response', head_resp)
612         # First reponse was not cached because it was from a HEAD request.
613         self.assertNotEqual(head_resp, get_resp)
614
615
616 @tutil.skip_sleep
617 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
618 class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
619     disk_cache = False
620
621     def setUp(self):
622         self.api_client = self.mock_keep_services(count=2)
623         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
624         self.data = b'xyzzy'
625         self.locator = '1271ed5ef305aadabc605b1609e24c52'
626         self.test_id = arvados.util.new_request_id()
627         self.assertRegex(self.test_id, r'^req-[a-z0-9]{20}$')
628         # If we don't set request_id to None explicitly here, it will
629         # return <MagicMock name='api_client_mock.request_id'
630         # id='123456789'>:
631         self.api_client.request_id = None
632
633     def tearDown(self):
634         DiskCacheBase.tearDown(self)
635
636     def test_default_to_api_client_request_id(self):
637         self.api_client.request_id = self.test_id
638         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
639             self.keep_client.put(self.data)
640         self.assertEqual(2, len(mock.responses))
641         for resp in mock.responses:
642             self.assertProvidedRequestId(resp)
643
644         with tutil.mock_keep_responses(self.data, 200) as mock:
645             self.keep_client.get(self.locator)
646         self.assertProvidedRequestId(mock.responses[0])
647
648         with tutil.mock_keep_responses(b'', 200) as mock:
649             self.keep_client.head(self.locator)
650         self.assertProvidedRequestId(mock.responses[0])
651
652     def test_explicit_request_id(self):
653         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
654             self.keep_client.put(self.data, request_id=self.test_id)
655         self.assertEqual(2, len(mock.responses))
656         for resp in mock.responses:
657             self.assertProvidedRequestId(resp)
658
659         with tutil.mock_keep_responses(self.data, 200) as mock:
660             self.keep_client.get(self.locator, request_id=self.test_id)
661         self.assertProvidedRequestId(mock.responses[0])
662
663         with tutil.mock_keep_responses(b'', 200) as mock:
664             self.keep_client.head(self.locator, request_id=self.test_id)
665         self.assertProvidedRequestId(mock.responses[0])
666
667     def test_automatic_request_id(self):
668         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
669             self.keep_client.put(self.data)
670         self.assertEqual(2, len(mock.responses))
671         for resp in mock.responses:
672             self.assertAutomaticRequestId(resp)
673
674         with tutil.mock_keep_responses(self.data, 200) as mock:
675             self.keep_client.get(self.locator)
676         self.assertAutomaticRequestId(mock.responses[0])
677
678         with tutil.mock_keep_responses(b'', 200) as mock:
679             self.keep_client.head(self.locator)
680         self.assertAutomaticRequestId(mock.responses[0])
681
682     def test_request_id_in_exception(self):
683         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
684             with self.assertRaisesRegex(arvados.errors.KeepReadError, self.test_id):
685                 self.keep_client.head(self.locator, request_id=self.test_id)
686
687         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
688             with self.assertRaisesRegex(arvados.errors.KeepReadError, r'req-[a-z0-9]{20}'):
689                 self.keep_client.get(self.locator)
690
691         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
692             with self.assertRaisesRegex(arvados.errors.KeepWriteError, self.test_id):
693                 self.keep_client.put(self.data, request_id=self.test_id)
694
695         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
696             with self.assertRaisesRegex(arvados.errors.KeepWriteError, r'req-[a-z0-9]{20}'):
697                 self.keep_client.put(self.data)
698
699     def assertAutomaticRequestId(self, resp):
700         hdr = [x for x in resp.getopt(pycurl.HTTPHEADER)
701                if x.startswith('X-Request-Id: ')][0]
702         self.assertNotEqual(hdr, 'X-Request-Id: '+self.test_id)
703         self.assertRegex(hdr, r'^X-Request-Id: req-[a-z0-9]{20}$')
704
705     def assertProvidedRequestId(self, resp):
706         self.assertIn('X-Request-Id: '+self.test_id,
707                       resp.getopt(pycurl.HTTPHEADER))
708
709
710 @tutil.skip_sleep
711 #@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
712 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
713     disk_cache = False
714
715     def setUp(self):
716         # expected_order[i] is the probe order for
717         # hash=md5(sprintf("%064x",i)) where there are 16 services
718         # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
719         # the first probe for the block consisting of 64 "0"
720         # characters is the service whose uuid is
721         # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
722         self.services = 16
723         self.expected_order = [
724             list('3eab2d5fc9681074'),
725             list('097dba52e648f1c3'),
726             list('c5b4e023f8a7d691'),
727             list('9d81c02e76a3bf54'),
728             ]
729         self.blocks = [
730             "{:064x}".format(x).encode()
731             for x in range(len(self.expected_order))]
732         self.hashes = [
733             hashlib.md5(self.blocks[x]).hexdigest()
734             for x in range(len(self.expected_order))]
735         self.api_client = self.mock_keep_services(count=self.services)
736         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
737
738     def tearDown(self):
739         DiskCacheBase.tearDown(self)
740
741     def test_weighted_service_roots_against_reference_set(self):
742         # Confirm weighted_service_roots() returns the correct order
743         for i, hash in enumerate(self.hashes):
744             roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash))
745             got_order = [
746                 re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
747                 for root in roots]
748             self.assertEqual(self.expected_order[i], got_order)
749
750     def test_get_probe_order_against_reference_set(self):
751         self._test_probe_order_against_reference_set(
752             lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
753
754     def test_head_probe_order_against_reference_set(self):
755         self._test_probe_order_against_reference_set(
756             lambda i: self.keep_client.head(self.hashes[i], num_retries=1))
757
758     def test_put_probe_order_against_reference_set(self):
759         # copies=1 prevents the test from being sensitive to races
760         # between writer threads.
761         self._test_probe_order_against_reference_set(
762             lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1))
763
764     def _test_probe_order_against_reference_set(self, op):
765         for i in range(len(self.blocks)):
766             with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \
767                  self.assertRaises(arvados.errors.KeepRequestError):
768                 op(i)
769             got_order = [
770                 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
771                 for resp in mock.responses]
772             self.assertEqual(self.expected_order[i]*2, got_order)
773
774     def test_put_probe_order_multiple_copies(self):
775         for copies in range(2, 4):
776             for i in range(len(self.blocks)):
777                 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \
778                      self.assertRaises(arvados.errors.KeepWriteError):
779                     self.keep_client.put(self.blocks[i], num_retries=2, copies=copies)
780                 got_order = [
781                     re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
782                     for resp in mock.responses]
783                 # With T threads racing to make requests, the position
784                 # of a given server in the sequence of HTTP requests
785                 # (got_order) cannot be more than T-1 positions
786                 # earlier than that server's position in the reference
787                 # probe sequence (expected_order).
788                 #
789                 # Loop invariant: we have accounted for +pos+ expected
790                 # probes, either by seeing them in +got_order+ or by
791                 # putting them in +pending+ in the hope of seeing them
792                 # later. As long as +len(pending)<T+, we haven't
793                 # started a request too early.
794                 pending = []
795                 for pos, expected in enumerate(self.expected_order[i]*3):
796                     got = got_order[pos-len(pending)]
797                     while got in pending:
798                         del pending[pending.index(got)]
799                         got = got_order[pos-len(pending)]
800                     if got != expected:
801                         pending.append(expected)
802                         self.assertLess(
803                             len(pending), copies,
804                             "pending={}, with copies={}, got {}, expected {}".format(
805                                 pending, copies, repr(got_order), repr(self.expected_order[i]*3)))
806
807     def test_probe_waste_adding_one_server(self):
808         hashes = [
809             hashlib.md5("{:064x}".format(x).encode()).hexdigest() for x in range(100)]
810         initial_services = 12
811         self.api_client = self.mock_keep_services(count=initial_services)
812         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
813         probes_before = [
814             self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
815         for added_services in range(1, 12):
816             api_client = self.mock_keep_services(count=initial_services+added_services)
817             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
818             total_penalty = 0
819             for hash_index in range(len(hashes)):
820                 probe_after = keep_client.weighted_service_roots(
821                     arvados.KeepLocator(hashes[hash_index]))
822                 penalty = probe_after.index(probes_before[hash_index][0])
823                 self.assertLessEqual(penalty, added_services)
824                 total_penalty += penalty
825             # Average penalty per block should not exceed
826             # N(added)/N(orig) by more than 20%, and should get closer
827             # to the ideal as we add data points.
828             expect_penalty = (
829                 added_services *
830                 len(hashes) / initial_services)
831             max_penalty = (
832                 expect_penalty *
833                 (120 - added_services)/100)
834             min_penalty = (
835                 expect_penalty * 8/10)
836             self.assertTrue(
837                 min_penalty <= total_penalty <= max_penalty,
838                 "With {}+{} services, {} blocks, penalty {} but expected {}..{}".format(
839                     initial_services,
840                     added_services,
841                     len(hashes),
842                     total_penalty,
843                     min_penalty,
844                     max_penalty))
845
846     def check_64_zeros_error_order(self, verb, exc_class):
847         data = b'0' * 64
848         if verb == 'get':
849             data = tutil.str_keep_locator(data)
850         # Arbitrary port number:
851         aport = random.randint(1024,65535)
852         api_client = self.mock_keep_services(service_port=aport, count=self.services)
853         keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
854         with mock.patch('pycurl.Curl') as curl_mock, \
855              self.assertRaises(exc_class) as err_check:
856             curl_mock.return_value = tutil.FakeCurl.make(code=500, body=b'')
857             getattr(keep_client, verb)(data)
858         urls = [urllib.parse.urlparse(url)
859                 for url in err_check.exception.request_errors()]
860         self.assertEqual([('keep0x' + c, aport) for c in '3eab2d5fc9681074'],
861                          [(url.hostname, url.port) for url in urls])
862
863     def test_get_error_shows_probe_order(self):
864         self.check_64_zeros_error_order('get', arvados.errors.KeepReadError)
865
866     def test_put_error_shows_probe_order(self):
867         self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
868
869
870 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
871 class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase, DiskCacheBase):
872     disk_cache = False
873
874     # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
875     # 1s worth of data and then trigger bandwidth errors before running
876     # out of data.
877     DATA = b'x'*2**11
878     BANDWIDTH_LOW_LIM = 1024
879     TIMEOUT_TIME = 1.0
880
881     def tearDown(self):
882         DiskCacheBase.tearDown(self)
883
884     class assertTakesBetween(unittest.TestCase):
885         def __init__(self, tmin, tmax):
886             self.tmin = tmin
887             self.tmax = tmax
888
889         def __enter__(self):
890             self.t0 = time.time()
891
892         def __exit__(self, *args, **kwargs):
893             # Round times to milliseconds, like CURL. Otherwise, we
894             # fail when CURL reaches a 1s timeout at 0.9998s.
895             delta = round(time.time() - self.t0, 3)
896             self.assertGreaterEqual(delta, self.tmin)
897             self.assertLessEqual(delta, self.tmax)
898
899     class assertTakesGreater(unittest.TestCase):
900         def __init__(self, tmin):
901             self.tmin = tmin
902
903         def __enter__(self):
904             self.t0 = time.time()
905
906         def __exit__(self, *args, **kwargs):
907             delta = round(time.time() - self.t0, 3)
908             self.assertGreaterEqual(delta, self.tmin)
909
910     def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
911         return arvados.KeepClient(
912             api_client=self.api_client,
913             timeout=timeouts, block_cache=self.make_block_cache(self.disk_cache))
914
915     def test_timeout_slow_connect(self):
916         # Can't simulate TCP delays with our own socket. Leave our
917         # stub server running uselessly, and try to connect to an
918         # unroutable IP address instead.
919         self.api_client = self.mock_keep_services(
920             count=1,
921             service_host='240.0.0.0',
922         )
923         with self.assertTakesBetween(0.1, 0.5):
924             with self.assertRaises(arvados.errors.KeepWriteError):
925                 self.keepClient().put(self.DATA, copies=1, num_retries=0)
926
927     def test_low_bandwidth_no_delays_success(self):
928         self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
929         kc = self.keepClient()
930         loc = kc.put(self.DATA, copies=1, num_retries=0)
931         self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
932
933     def test_too_low_bandwidth_no_delays_failure(self):
934         # Check that lessening bandwidth corresponds to failing
935         kc = self.keepClient()
936         loc = kc.put(self.DATA, copies=1, num_retries=0)
937         self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
938         with self.assertTakesGreater(self.TIMEOUT_TIME):
939             with self.assertRaises(arvados.errors.KeepReadError):
940                 kc.get(loc, num_retries=0)
941         with self.assertTakesGreater(self.TIMEOUT_TIME):
942             with self.assertRaises(arvados.errors.KeepWriteError):
943                 kc.put(self.DATA, copies=1, num_retries=0)
944
945     def test_low_bandwidth_with_server_response_delay_failure(self):
946         kc = self.keepClient()
947         loc = kc.put(self.DATA, copies=1, num_retries=0)
948         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
949         # Note the actual delay must be 1s longer than the low speed
950         # limit interval in order for curl to detect it reliably.
951         self.server.setdelays(response=self.TIMEOUT_TIME+1)
952         with self.assertTakesGreater(self.TIMEOUT_TIME):
953             with self.assertRaises(arvados.errors.KeepReadError):
954                 kc.get(loc, num_retries=0)
955         with self.assertTakesGreater(self.TIMEOUT_TIME):
956             with self.assertRaises(arvados.errors.KeepWriteError):
957                 kc.put(self.DATA, copies=1, num_retries=0)
958         with self.assertTakesGreater(self.TIMEOUT_TIME):
959             kc.head(loc, num_retries=0)
960
961     def test_low_bandwidth_with_server_mid_delay_failure(self):
962         kc = self.keepClient()
963         loc = kc.put(self.DATA, copies=1, num_retries=0)
964         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
965         # Note the actual delay must be 1s longer than the low speed
966         # limit interval in order for curl to detect it reliably.
967         self.server.setdelays(mid_write=self.TIMEOUT_TIME+1, mid_read=self.TIMEOUT_TIME+1)
968         with self.assertTakesGreater(self.TIMEOUT_TIME):
969             with self.assertRaises(arvados.errors.KeepReadError) as e:
970                 kc.get(loc, num_retries=0)
971         with self.assertTakesGreater(self.TIMEOUT_TIME):
972             with self.assertRaises(arvados.errors.KeepWriteError):
973                 kc.put(self.DATA, copies=1, num_retries=0)
974
975     def test_timeout_slow_request(self):
976         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
977         self.server.setdelays(request=.2)
978         self._test_connect_timeout_under_200ms(loc)
979         self.server.setdelays(request=2)
980         self._test_response_timeout_under_2s(loc)
981
982     def test_timeout_slow_response(self):
983         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
984         self.server.setdelays(response=.2)
985         self._test_connect_timeout_under_200ms(loc)
986         self.server.setdelays(response=2)
987         self._test_response_timeout_under_2s(loc)
988
989     def test_timeout_slow_response_body(self):
990         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
991         self.server.setdelays(response_body=.2)
992         self._test_connect_timeout_under_200ms(loc)
993         self.server.setdelays(response_body=2)
994         self._test_response_timeout_under_2s(loc)
995
996     def _test_connect_timeout_under_200ms(self, loc):
997         # Allow 100ms to connect, then 1s for response. Everything
998         # should work, and everything should take at least 200ms to
999         # return.
1000         kc = self.keepClient(timeouts=(.1, 1))
1001         with self.assertTakesBetween(.2, .3):
1002             kc.put(self.DATA, copies=1, num_retries=0)
1003         with self.assertTakesBetween(.2, .3):
1004             self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
1005
1006     def _test_response_timeout_under_2s(self, loc):
1007         # Allow 10s to connect, then 1s for response. Nothing should
1008         # work, and everything should take at least 1s to return.
1009         kc = self.keepClient(timeouts=(10, 1))
1010         with self.assertTakesBetween(1, 9):
1011             with self.assertRaises(arvados.errors.KeepReadError):
1012                 kc.get(loc, num_retries=0)
1013         with self.assertTakesBetween(1, 9):
1014             with self.assertRaises(arvados.errors.KeepWriteError):
1015                 kc.put(self.DATA, copies=1, num_retries=0)
1016
1017
1018 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1019 class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
1020     disk_cache = False
1021
1022     def tearDown(self):
1023         DiskCacheBase.tearDown(self)
1024
1025     def mock_disks_and_gateways(self, disks=3, gateways=1):
1026         self.gateways = [{
1027                 'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
1028                 'owner_uuid': 'zzzzz-tpzed-000000000000000',
1029                 'service_host': 'gatewayhost{}'.format(i),
1030                 'service_port': 12345,
1031                 'service_ssl_flag': True,
1032                 'service_type': 'gateway:test',
1033         } for i in range(gateways)]
1034         self.gateway_roots = [
1035             "https://{service_host}:{service_port}/".format(**gw)
1036             for gw in self.gateways]
1037         self.api_client = self.mock_keep_services(
1038             count=disks, additional_services=self.gateways)
1039         self.keepClient = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
1040
1041     @mock.patch('pycurl.Curl')
1042     def test_get_with_gateway_hint_first(self, MockCurl):
1043         MockCurl.return_value = tutil.FakeCurl.make(
1044             code=200, body='foo', headers={'Content-Length': 3})
1045         self.mock_disks_and_gateways()
1046         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@' + self.gateways[0]['uuid']
1047         self.assertEqual(b'foo', self.keepClient.get(locator))
1048         self.assertEqual(self.gateway_roots[0]+locator,
1049                          MockCurl.return_value.getopt(pycurl.URL).decode())
1050         self.assertEqual(True, self.keepClient.head(locator))
1051
1052     @mock.patch('pycurl.Curl')
1053     def test_get_with_gateway_hints_in_order(self, MockCurl):
1054         gateways = 4
1055         disks = 3
1056         mocks = [
1057             tutil.FakeCurl.make(code=404, body='')
1058             for _ in range(gateways+disks)
1059         ]
1060         MockCurl.side_effect = tutil.queue_with(mocks)
1061         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1062         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1063                            ['K@'+gw['uuid'] for gw in self.gateways])
1064         with self.assertRaises(arvados.errors.NotFoundError):
1065             self.keepClient.get(locator)
1066         # Gateways are tried first, in the order given.
1067         for i, root in enumerate(self.gateway_roots):
1068             self.assertEqual(root+locator,
1069                              mocks[i].getopt(pycurl.URL).decode())
1070         # Disk services are tried next.
1071         for i in range(gateways, gateways+disks):
1072             self.assertRegex(
1073                 mocks[i].getopt(pycurl.URL).decode(),
1074                 r'keep0x')
1075
1076     @mock.patch('pycurl.Curl')
1077     def test_head_with_gateway_hints_in_order(self, MockCurl):
1078         gateways = 4
1079         disks = 3
1080         mocks = [
1081             tutil.FakeCurl.make(code=404, body=b'')
1082             for _ in range(gateways+disks)
1083         ]
1084         MockCurl.side_effect = tutil.queue_with(mocks)
1085         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1086         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1087                            ['K@'+gw['uuid'] for gw in self.gateways])
1088         with self.assertRaises(arvados.errors.NotFoundError):
1089             self.keepClient.head(locator)
1090         # Gateways are tried first, in the order given.
1091         for i, root in enumerate(self.gateway_roots):
1092             self.assertEqual(root+locator,
1093                              mocks[i].getopt(pycurl.URL).decode())
1094         # Disk services are tried next.
1095         for i in range(gateways, gateways+disks):
1096             self.assertRegex(
1097                 mocks[i].getopt(pycurl.URL).decode(),
1098                 r'keep0x')
1099
1100     @mock.patch('pycurl.Curl')
1101     def test_get_with_remote_proxy_hint(self, MockCurl):
1102         MockCurl.return_value = tutil.FakeCurl.make(
1103             code=200, body=b'foo', headers={'Content-Length': 3})
1104         self.mock_disks_and_gateways()
1105         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1106         self.assertEqual(b'foo', self.keepClient.get(locator))
1107         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1108                          MockCurl.return_value.getopt(pycurl.URL).decode())
1109
1110     @mock.patch('pycurl.Curl')
1111     def test_head_with_remote_proxy_hint(self, MockCurl):
1112         MockCurl.return_value = tutil.FakeCurl.make(
1113             code=200, body=b'foo', headers={'Content-Length': 3})
1114         self.mock_disks_and_gateways()
1115         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1116         self.assertEqual(True, self.keepClient.head(locator))
1117         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1118                          MockCurl.return_value.getopt(pycurl.URL).decode())
1119
1120
1121 class KeepClientRetryTestMixin(object):
1122     disk_cache = False
1123
1124     # Testing with a local Keep store won't exercise the retry behavior.
1125     # Instead, our strategy is:
1126     # * Create a client with one proxy specified (pointed at a black
1127     #   hole), so there's no need to instantiate an API client, and
1128     #   all HTTP requests come from one place.
1129     # * Mock httplib's request method to provide simulated responses.
1130     # This lets us test the retry logic extensively without relying on any
1131     # supporting servers, and prevents side effects in case something hiccups.
1132     # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
1133     # run_method().
1134     #
1135     # Test classes must define TEST_PATCHER to a method that mocks
1136     # out appropriate methods in the client.
1137
1138     PROXY_ADDR = 'http://[%s]:65535/' % (tutil.TEST_HOST,)
1139     TEST_DATA = b'testdata'
1140     TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
1141
1142     def setUp(self):
1143         self.client_kwargs = {'proxy': self.PROXY_ADDR, 'local_store': ''}
1144
1145     def new_client(self, **caller_kwargs):
1146         kwargs = self.client_kwargs.copy()
1147         kwargs.update(caller_kwargs)
1148         kwargs['block_cache'] = self.make_block_cache(self.disk_cache)
1149         return arvados.KeepClient(**kwargs)
1150
1151     def run_method(self, *args, **kwargs):
1152         raise NotImplementedError("test subclasses must define run_method")
1153
1154     def check_success(self, expected=None, *args, **kwargs):
1155         if expected is None:
1156             expected = self.DEFAULT_EXPECT
1157         self.assertEqual(expected, self.run_method(*args, **kwargs))
1158
1159     def check_exception(self, error_class=None, *args, **kwargs):
1160         if error_class is None:
1161             error_class = self.DEFAULT_EXCEPTION
1162         with self.assertRaises(error_class) as err:
1163             self.run_method(*args, **kwargs)
1164         return err
1165
1166     def test_immediate_success(self):
1167         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 200):
1168             self.check_success()
1169
1170     def test_retry_then_success(self):
1171         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1172             self.check_success(num_retries=3)
1173
1174     def test_exception_then_success(self):
1175         with self.TEST_PATCHER(self.DEFAULT_EXPECT, Exception('mock err'), 200):
1176             self.check_success(num_retries=3)
1177
1178     def test_no_retry_after_permanent_error(self):
1179         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 403, 200):
1180             self.check_exception(num_retries=3)
1181
1182     def test_error_after_retries_exhausted(self):
1183         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 500, 200):
1184             err = self.check_exception(num_retries=1)
1185         self.assertRegex(str(err.exception), r'failed to .* after 2 attempts')
1186
1187     def test_num_retries_instance_fallback(self):
1188         self.client_kwargs['num_retries'] = 3
1189         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1190             self.check_success()
1191
1192
1193 @tutil.skip_sleep
1194 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1195 class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1196     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
1197     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1198     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1199     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1200
1201     def tearDown(self):
1202         DiskCacheBase.tearDown(self)
1203
1204     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1205                    *args, **kwargs):
1206         return self.new_client().get(locator, *args, **kwargs)
1207
1208     def test_specific_exception_when_not_found(self):
1209         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1210             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1211
1212     def test_general_exception_with_mixed_errors(self):
1213         # get should raise a NotFoundError if no server returns the block,
1214         # and a high threshold of servers report that it's not found.
1215         # This test rigs up 50/50 disagreement between two servers, and
1216         # checks that it does not become a NotFoundError.
1217         client = self.new_client(num_retries=0)
1218         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1219             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1220                 client.get(self.HINTED_LOCATOR)
1221             self.assertNotIsInstance(
1222                 exc_check.exception, arvados.errors.NotFoundError,
1223                 "mixed errors raised NotFoundError")
1224
1225     def test_hint_server_can_succeed_without_retries(self):
1226         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1227             self.check_success(locator=self.HINTED_LOCATOR)
1228
1229     def test_try_next_server_after_timeout(self):
1230         with tutil.mock_keep_responses(
1231                 (socket.timeout("timed out"), 200),
1232                 (self.DEFAULT_EXPECT, 200)):
1233             self.check_success(locator=self.HINTED_LOCATOR)
1234
1235     def test_retry_data_with_wrong_checksum(self):
1236         with tutil.mock_keep_responses(
1237                 ('baddata', 200),
1238                 (self.DEFAULT_EXPECT, 200)):
1239             self.check_success(locator=self.HINTED_LOCATOR)
1240
1241
1242 @tutil.skip_sleep
1243 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1244 class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1245     DEFAULT_EXPECT = True
1246     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1247     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1248     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1249
1250     def tearDown(self):
1251         DiskCacheBase.tearDown(self)
1252
1253     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1254                    *args, **kwargs):
1255         return self.new_client().head(locator, *args, **kwargs)
1256
1257     def test_specific_exception_when_not_found(self):
1258         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1259             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1260
1261     def test_general_exception_with_mixed_errors(self):
1262         # head should raise a NotFoundError if no server returns the block,
1263         # and a high threshold of servers report that it's not found.
1264         # This test rigs up 50/50 disagreement between two servers, and
1265         # checks that it does not become a NotFoundError.
1266         client = self.new_client(num_retries=0)
1267         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1268             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1269                 client.head(self.HINTED_LOCATOR)
1270             self.assertNotIsInstance(
1271                 exc_check.exception, arvados.errors.NotFoundError,
1272                 "mixed errors raised NotFoundError")
1273
1274     def test_hint_server_can_succeed_without_retries(self):
1275         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1276             self.check_success(locator=self.HINTED_LOCATOR)
1277
1278     def test_try_next_server_after_timeout(self):
1279         with tutil.mock_keep_responses(
1280                 (socket.timeout("timed out"), 200),
1281                 (self.DEFAULT_EXPECT, 200)):
1282             self.check_success(locator=self.HINTED_LOCATOR)
1283
1284
1285 @tutil.skip_sleep
1286 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1287 class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1288     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
1289     DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
1290     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1291
1292     def tearDown(self):
1293         DiskCacheBase.tearDown(self)
1294
1295     def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
1296                    copies=1, *args, **kwargs):
1297         return self.new_client().put(data, copies, *args, **kwargs)
1298
1299     def test_do_not_send_multiple_copies_to_same_server(self):
1300         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200):
1301             self.check_exception(copies=2, num_retries=3)
1302
1303
1304 class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
1305     class FakeKeepService(object):
1306         def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
1307             self.delay = delay
1308             self.will_succeed = will_succeed
1309             self.will_raise = will_raise
1310             self._result = {}
1311             self._result['headers'] = {}
1312             self._result['headers']['x-keep-replicas-stored'] = str(replicas)
1313             self._result['headers']['x-keep-storage-classes-confirmed'] = 'default={}'.format(replicas)
1314             self._result['body'] = 'foobar'
1315
1316         def put(self, data_hash, data, timeout, headers):
1317             time.sleep(self.delay)
1318             if self.will_raise is not None:
1319                 raise self.will_raise
1320             return self.will_succeed
1321
1322         def last_result(self):
1323             if self.will_succeed:
1324                 return self._result
1325             else:
1326                 return {"status_code": 500, "body": "didn't succeed"}
1327
1328         def finished(self):
1329             return False
1330
1331
1332     def setUp(self):
1333         self.copies = 3
1334         self.pool = arvados.KeepClient._KeepWriterThreadPool(
1335             data = 'foo',
1336             data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1337             max_service_replicas = self.copies,
1338             copies = self.copies
1339         )
1340
1341     def test_only_write_enough_on_success(self):
1342         for i in range(10):
1343             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1344             self.pool.add_task(ks, None)
1345         self.pool.join()
1346         self.assertEqual(self.pool.done(), (self.copies, []))
1347
1348     def test_only_write_enough_on_partial_success(self):
1349         for i in range(5):
1350             ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
1351             self.pool.add_task(ks, None)
1352             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1353             self.pool.add_task(ks, None)
1354         self.pool.join()
1355         self.assertEqual(self.pool.done(), (self.copies, []))
1356
1357     def test_only_write_enough_when_some_crash(self):
1358         for i in range(5):
1359             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1360             self.pool.add_task(ks, None)
1361             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1362             self.pool.add_task(ks, None)
1363         self.pool.join()
1364         self.assertEqual(self.pool.done(), (self.copies, []))
1365
1366     def test_fail_when_too_many_crash(self):
1367         for i in range(self.copies+1):
1368             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1369             self.pool.add_task(ks, None)
1370         for i in range(self.copies-1):
1371             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1372             self.pool.add_task(ks, None)
1373         self.pool.join()
1374         self.assertEqual(self.pool.done(), (self.copies-1, []))
1375
1376
1377 @tutil.skip_sleep
1378 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1379 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
1380     block_cache = False
1381
1382     # Test put()s that need two distinct servers to succeed, possibly
1383     # requiring multiple passes through the retry loop.
1384
1385     def setUp(self):
1386         self.api_client = self.mock_keep_services(count=2)
1387         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
1388
1389     def tearDown(self):
1390         DiskCacheBase.tearDown(self)
1391
1392     def test_success_after_exception(self):
1393         with tutil.mock_keep_responses(
1394                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1395                 Exception('mock err'), 200, 200) as req_mock:
1396             self.keep_client.put('foo', num_retries=1, copies=2)
1397         self.assertEqual(3, req_mock.call_count)
1398
1399     def test_success_after_retryable_error(self):
1400         with tutil.mock_keep_responses(
1401                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1402                 500, 200, 200) as req_mock:
1403             self.keep_client.put('foo', num_retries=1, copies=2)
1404         self.assertEqual(3, req_mock.call_count)
1405
1406     def test_fail_after_final_error(self):
1407         # First retry loop gets a 200 (can't achieve replication by
1408         # storing again on that server) and a 400 (can't retry that
1409         # server at all), so we shouldn't try a third request.
1410         with tutil.mock_keep_responses(
1411                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1412                 200, 400, 200) as req_mock:
1413             with self.assertRaises(arvados.errors.KeepWriteError):
1414                 self.keep_client.put('foo', num_retries=1, copies=2)
1415         self.assertEqual(2, req_mock.call_count)
1416
1417
1418 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1419 class KeepClientAPIErrorTest(unittest.TestCase, DiskCacheBase):
1420     disk_cache = False
1421
1422     def tearDown(self):
1423         DiskCacheBase.tearDown(self)
1424
1425     def test_api_fail(self):
1426         class ApiMock(object):
1427             def __getattr__(self, r):
1428                 if r == "api_token":
1429                     return "abc"
1430                 elif r == "insecure":
1431                     return False
1432                 elif r == "config":
1433                     return lambda: {}
1434                 else:
1435                     raise arvados.errors.KeepReadError()
1436         keep_client = arvados.KeepClient(api_client=ApiMock(),
1437                                          proxy='', local_store='',
1438                                          block_cache=self.make_block_cache(self.disk_cache))
1439
1440         # The bug this is testing for is that if an API (not
1441         # keepstore) exception is thrown as part of a get(), the next
1442         # attempt to get that same block will result in a deadlock.
1443         # This is why there are two get()s in a row.  Unfortunately,
1444         # the failure mode for this test is that the test suite
1445         # deadlocks, there isn't a good way to avoid that without
1446         # adding a special case that has no use except for this test.
1447
1448         with self.assertRaises(arvados.errors.KeepReadError):
1449             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1450         with self.assertRaises(arvados.errors.KeepReadError):
1451             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1452
1453
1454 class KeepDiskCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
1455     def setUp(self):
1456         self.api_client = self.mock_keep_services(count=2)
1457         self.data = b'xyzzy'
1458         self.locator = '1271ed5ef305aadabc605b1609e24c52'
1459         self.disk_cache_dir = tempfile.mkdtemp()
1460
1461     def tearDown(self):
1462         shutil.rmtree(self.disk_cache_dir)
1463
1464     @mock.patch('arvados._internal.basedirs.BaseDirectories.storage_path')
1465     def test_default_disk_cache_dir(self, storage_path):
1466         expected = Path(self.disk_cache_dir)
1467         storage_path.return_value = expected
1468         cache = arvados.keep.KeepBlockCache(disk_cache=True)
1469         storage_path.assert_called_with('keep')
1470         self.assertEqual(cache._disk_cache_dir, str(expected))
1471
1472     @mock.patch('arvados.KeepClient._KeepService.get')
1473     def test_disk_cache_read(self, get_mock):
1474         # confirm it finds an existing cache block when the cache is
1475         # initialized.
1476
1477         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1478         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1479             f.write(self.data)
1480
1481         # block cache should have found the existing block
1482         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1483                                                   disk_cache_dir=self.disk_cache_dir)
1484         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1485
1486         self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1487
1488         get_mock.assert_not_called()
1489
1490     @mock.patch('arvados.KeepClient._KeepService.get')
1491     def test_disk_cache_share(self, get_mock):
1492         # confirm it finds a cache block written after the disk cache
1493         # was initialized.
1494
1495         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1496                                                   disk_cache_dir=self.disk_cache_dir)
1497         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1498
1499         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1500         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1501             f.write(self.data)
1502
1503         # when we try to get the block, it'll check the disk and find it.
1504         self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1505
1506         get_mock.assert_not_called()
1507
1508     def test_disk_cache_write(self):
1509         # confirm the cache block was created
1510
1511         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1512                                                   disk_cache_dir=self.disk_cache_dir)
1513         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1514
1515         with tutil.mock_keep_responses(self.data, 200) as mock:
1516             self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1517
1518         self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1519
1520         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1521             self.assertTrue(tutil.binary_compare(f.read(), self.data))
1522
1523     def test_disk_cache_clean(self):
1524         # confirm that a tmp file in the cache is cleaned up
1525
1526         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1527         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock"), "wb") as f:
1528             f.write(b"abc1")
1529
1530         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC"), "wb") as f:
1531             f.write(b"abc2")
1532
1533         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC"), "wb") as f:
1534             f.write(b"abc3")
1535
1536         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1537         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1538         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1539
1540         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1541                                                   disk_cache_dir=self.disk_cache_dir)
1542
1543         # The tmp still hasn't been deleted because it was created in the last 60 seconds
1544         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1545         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1546         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1547
1548         # Set the mtime to 61s in the past
1549         os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock"), times=(time.time()-61, time.time()-61))
1550         os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC"), times=(time.time()-61, time.time()-61))
1551         os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC"), times=(time.time()-61, time.time()-61))
1552
1553         block_cache2 = arvados.keep.KeepBlockCache(disk_cache=True,
1554                                                    disk_cache_dir=self.disk_cache_dir)
1555
1556         # Tmp should be gone but the other ones are safe.
1557         self.assertFalse(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1558         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1559         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1560
1561     @mock.patch('arvados.KeepClient._KeepService.get')
1562     def test_disk_cache_cap(self, get_mock):
1563         # confirm that the cache is kept to the desired limit
1564
1565         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1566         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1567             f.write(self.data)
1568
1569         os.makedirs(os.path.join(self.disk_cache_dir, "acb"))
1570         with open(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock"), "wb") as f:
1571             f.write(b"foo")
1572
1573         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1574         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1575
1576         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1577                                                    disk_cache_dir=self.disk_cache_dir,
1578                                                    max_slots=1)
1579
1580         self.assertFalse(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1581         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1582
1583     @mock.patch('arvados.KeepClient._KeepService.get')
1584     def test_disk_cache_share(self, get_mock):
1585         # confirm that a second cache doesn't delete files that belong to the first cache.
1586
1587         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1588         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1589             f.write(self.data)
1590
1591         os.makedirs(os.path.join(self.disk_cache_dir, "acb"))
1592         with open(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock"), "wb") as f:
1593             f.write(b"foo")
1594
1595         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1596         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1597
1598         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1599                                                    disk_cache_dir=self.disk_cache_dir,
1600                                                    max_slots=2)
1601
1602         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1603         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1604
1605         block_cache2 = arvados.keep.KeepBlockCache(disk_cache=True,
1606                                                    disk_cache_dir=self.disk_cache_dir,
1607                                                    max_slots=1)
1608
1609         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1610         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1611
1612     def test_disk_cache_error(self):
1613         os.chmod(self.disk_cache_dir, stat.S_IRUSR)
1614
1615         # Fail during cache initialization.
1616         with self.assertRaises(OSError):
1617             block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1618                                                       disk_cache_dir=self.disk_cache_dir)
1619
1620     def test_disk_cache_write_error(self):
1621         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1622                                                   disk_cache_dir=self.disk_cache_dir)
1623
1624         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1625
1626         # Make the cache dir read-only
1627         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1628         os.chmod(os.path.join(self.disk_cache_dir, self.locator[0:3]), stat.S_IRUSR)
1629
1630         # Cache fails
1631         with self.assertRaises(arvados.errors.KeepCacheError):
1632             with tutil.mock_keep_responses(self.data, 200) as mock:
1633                 keep_client.get(self.locator)
1634
1635     def test_disk_cache_retry_write_error(self):
1636         cache_max_before = 512 * 1024 * 1024
1637         block_cache = arvados.keep.KeepBlockCache(
1638             cache_max=cache_max_before,
1639             disk_cache=True,
1640             disk_cache_dir=self.disk_cache_dir,
1641         )
1642         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1643
1644         called = False
1645         realmmap = mmap.mmap
1646         def sideeffect_mmap(*args, **kwargs):
1647             nonlocal called
1648             if not called:
1649                 called = True
1650                 raise OSError(errno.ENOSPC, "no space")
1651             else:
1652                 return realmmap(*args, **kwargs)
1653
1654         with patch('mmap.mmap', autospec=True, side_effect=sideeffect_mmap) as mockmmap:
1655             with tutil.mock_keep_responses(self.data, 200) as mock:
1656                 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1657
1658             self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1659
1660         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1661             self.assertTrue(tutil.binary_compare(f.read(), self.data))
1662
1663         # shrank the cache in response to ENOSPC
1664         self.assertGreater(cache_max_before, block_cache.cache_max)
1665
1666     def test_disk_cache_retry_write_error2(self):
1667         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1668                                                   disk_cache_dir=self.disk_cache_dir)
1669
1670         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1671
1672         called = False
1673         realmmap = mmap.mmap
1674         def sideeffect_mmap(*args, **kwargs):
1675             nonlocal called
1676             if not called:
1677                 called = True
1678                 raise OSError(errno.ENOMEM, "no memory")
1679             else:
1680                 return realmmap(*args, **kwargs)
1681
1682         with patch('mmap.mmap', autospec=True, side_effect=sideeffect_mmap) as mockmmap:
1683             slots_before = block_cache._max_slots
1684
1685             with tutil.mock_keep_responses(self.data, 200) as mock:
1686                 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1687
1688             self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1689
1690         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1691             self.assertTrue(tutil.binary_compare(f.read(), self.data))
1692
1693         # shrank the cache in response to ENOMEM
1694         self.assertGreater(slots_before, block_cache._max_slots)