18842: Use tempdir for cache directory in tests
[arvados.git] / sdk / python / tests / test_keep_client.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 standard_library.install_aliases()
9 from builtins import str
10 from builtins import range
11 from builtins import object
12 import hashlib
13 import mock
14 import os
15 import pycurl
16 import random
17 import re
18 import shutil
19 import socket
20 import sys
21 import time
22 import unittest
23 import urllib.parse
24
25 import parameterized
26
27 import arvados
28 import arvados.retry
29 import arvados.util
30 from . import arvados_testutil as tutil
31 from . import keepstub
32 from . import run_test_server
33
34 from .arvados_testutil import DiskCacheBase
35
36 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
37 class KeepTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
38     disk_cache = False
39     MAIN_SERVER = {}
40     KEEP_SERVER = {}
41     block_cache_test = None
42
43     @classmethod
44     def setUpClass(cls):
45         super(KeepTestCase, cls).setUpClass()
46         run_test_server.authorize_with("admin")
47         cls.api_client = arvados.api('v1')
48         cls.block_cache_test = DiskCacheBase()
49         cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
50                                              proxy='', local_store='',
51                                              block_cache=cls.block_cache_test.make_block_cache(cls.disk_cache))
52
53     @classmethod
54     def tearDownClass(cls):
55         super(KeepTestCase, cls).setUpClass()
56         cls.block_cache_test.tearDown()
57
58     def test_KeepBasicRWTest(self):
59         self.assertEqual(0, self.keep_client.upload_counter.get())
60         foo_locator = self.keep_client.put('foo')
61         self.assertRegex(
62             foo_locator,
63             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
64             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
65
66         # 6 bytes because uploaded 2 copies
67         self.assertEqual(6, self.keep_client.upload_counter.get())
68
69         self.assertEqual(0, self.keep_client.download_counter.get())
70         self.assertTrue(tutil.binary_compare(self.keep_client.get(foo_locator),
71                          b'foo'),
72                          'wrong content from Keep.get(md5("foo"))')
73         self.assertEqual(3, self.keep_client.download_counter.get())
74
75     def test_KeepBinaryRWTest(self):
76         blob_str = b'\xff\xfe\xf7\x00\x01\x02'
77         blob_locator = self.keep_client.put(blob_str)
78         self.assertRegex(
79             blob_locator,
80             '^7fc7c53b45e53926ba52821140fef396\+6',
81             ('wrong locator from Keep.put(<binarydata>):' + blob_locator))
82         self.assertEqual(self.keep_client.get(blob_locator),
83                          blob_str,
84                          'wrong content from Keep.get(md5(<binarydata>))')
85
86     def test_KeepLongBinaryRWTest(self):
87         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
88         for i in range(0, 23):
89             blob_data = blob_data + blob_data
90         blob_locator = self.keep_client.put(blob_data)
91         self.assertRegex(
92             blob_locator,
93             '^84d90fc0d8175dd5dcfab04b999bc956\+67108864',
94             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
95         self.assertEqual(self.keep_client.get(blob_locator),
96                          blob_data,
97                          'wrong content from Keep.get(md5(<binarydata>))')
98
99     @unittest.skip("unreliable test - please fix and close #8752")
100     def test_KeepSingleCopyRWTest(self):
101         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
102         blob_locator = self.keep_client.put(blob_data, copies=1)
103         self.assertRegex(
104             blob_locator,
105             '^c902006bc98a3eb4a3663b65ab4a6fab\+8',
106             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
107         self.assertEqual(self.keep_client.get(blob_locator),
108                          blob_data,
109                          'wrong content from Keep.get(md5(<binarydata>))')
110
111     def test_KeepEmptyCollectionTest(self):
112         blob_locator = self.keep_client.put('', copies=1)
113         self.assertRegex(
114             blob_locator,
115             '^d41d8cd98f00b204e9800998ecf8427e\+0',
116             ('wrong locator from Keep.put(""): ' + blob_locator))
117
118     def test_unicode_must_be_ascii(self):
119         # If unicode type, must only consist of valid ASCII
120         foo_locator = self.keep_client.put(u'foo')
121         self.assertRegex(
122             foo_locator,
123             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
124             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
125
126         if sys.version_info < (3, 0):
127             with self.assertRaises(UnicodeEncodeError):
128                 # Error if it is not ASCII
129                 self.keep_client.put(u'\xe2')
130
131         with self.assertRaises(AttributeError):
132             # Must be bytes or have an encode() method
133             self.keep_client.put({})
134
135     def test_KeepHeadTest(self):
136         locator = self.keep_client.put('test_head')
137         self.assertRegex(
138             locator,
139             '^b9a772c7049325feb7130fff1f8333e9\+9',
140             'wrong md5 hash from Keep.put for "test_head": ' + locator)
141         self.assertEqual(True, self.keep_client.head(locator))
142         self.assertEqual(self.keep_client.get(locator),
143                          b'test_head',
144                          'wrong content from Keep.get for "test_head"')
145
146 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
147 class KeepPermissionTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
148     disk_cache = False
149     MAIN_SERVER = {}
150     KEEP_SERVER = {'blob_signing': True}
151
152     def tearDown(self):
153         DiskCacheBase.tearDown(self)
154
155     def test_KeepBasicRWTest(self):
156         run_test_server.authorize_with('active')
157         keep_client = arvados.KeepClient(block_cache=self.make_block_cache(self.disk_cache))
158         foo_locator = keep_client.put('foo')
159         self.assertRegex(
160             foo_locator,
161             r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
162             'invalid locator from Keep.put("foo"): ' + foo_locator)
163         self.assertEqual(keep_client.get(foo_locator),
164                          b'foo',
165                          'wrong content from Keep.get(md5("foo"))')
166
167         # GET with an unsigned locator => NotFound
168         bar_locator = keep_client.put('bar')
169         unsigned_bar_locator = "37b51d194a7513e45b56f6524f2d51f2+3"
170         self.assertRegex(
171             bar_locator,
172             r'^37b51d194a7513e45b56f6524f2d51f2\+3\+A[a-f0-9]+@[a-f0-9]+$',
173             'invalid locator from Keep.put("bar"): ' + bar_locator)
174         self.assertRaises(arvados.errors.NotFoundError,
175                           keep_client.get,
176                           unsigned_bar_locator)
177
178         # GET from a different user => NotFound
179         run_test_server.authorize_with('spectator')
180         self.assertRaises(arvados.errors.NotFoundError,
181                           arvados.Keep.get,
182                           bar_locator)
183
184         # Unauthenticated GET for a signed locator => NotFound
185         # Unauthenticated GET for an unsigned locator => NotFound
186         keep_client.api_token = ''
187         self.assertRaises(arvados.errors.NotFoundError,
188                           keep_client.get,
189                           bar_locator)
190         self.assertRaises(arvados.errors.NotFoundError,
191                           keep_client.get,
192                           unsigned_bar_locator)
193
194 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
195 class KeepProxyTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
196     disk_cache = False
197     MAIN_SERVER = {}
198     KEEP_SERVER = {}
199     KEEP_PROXY_SERVER = {}
200
201     @classmethod
202     def setUpClass(cls):
203         super(KeepProxyTestCase, cls).setUpClass()
204         run_test_server.authorize_with('active')
205         cls.api_client = arvados.api('v1')
206
207     def tearDown(self):
208         super(KeepProxyTestCase, self).tearDown()
209         DiskCacheBase.tearDown(self)
210
211     def test_KeepProxyTest1(self):
212         # Will use ARVADOS_KEEP_SERVICES environment variable that
213         # is set by setUpClass().
214         keep_client = arvados.KeepClient(api_client=self.api_client,
215                                          local_store='', block_cache=self.make_block_cache(self.disk_cache))
216         baz_locator = keep_client.put('baz')
217         self.assertRegex(
218             baz_locator,
219             '^73feffa4b7f6bb68e44cf984c85f6e88\+3',
220             'wrong md5 hash from Keep.put("baz"): ' + baz_locator)
221         self.assertEqual(keep_client.get(baz_locator),
222                          b'baz',
223                          'wrong content from Keep.get(md5("baz"))')
224         self.assertTrue(keep_client.using_proxy)
225
226     def test_KeepProxyTestMultipleURIs(self):
227         # Test using ARVADOS_KEEP_SERVICES env var overriding any
228         # existing proxy setting and setting multiple proxies
229         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
230         keep_client = arvados.KeepClient(api_client=self.api_client,
231                                          local_store='',
232                                          block_cache=self.make_block_cache(self.disk_cache))
233         uris = [x['_service_root'] for x in keep_client._keep_services]
234         self.assertEqual(uris, ['http://10.0.0.1/',
235                                 'https://foo.example.org:1234/'])
236
237     def test_KeepProxyTestInvalidURI(self):
238         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org'
239         with self.assertRaises(arvados.errors.ArgumentError):
240             keep_client = arvados.KeepClient(api_client=self.api_client,
241                                              local_store='',
242                                              block_cache=self.make_block_cache(self.disk_cache))
243
244 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
245 class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
246     disk_cache = False
247
248     def tearDown(self):
249         DiskCacheBase.tearDown(self)
250
251     def get_service_roots(self, api_client):
252         keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
253         services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
254         return [urllib.parse.urlparse(url) for url in sorted(services)]
255
256     def test_ssl_flag_respected_in_roots(self):
257         for ssl_flag in [False, True]:
258             services = self.get_service_roots(self.mock_keep_services(
259                 service_ssl_flag=ssl_flag))
260             self.assertEqual(
261                 ('https' if ssl_flag else 'http'), services[0].scheme)
262
263     def test_correct_ports_with_ipv6_addresses(self):
264         service = self.get_service_roots(self.mock_keep_services(
265             service_type='proxy', service_host='100::1', service_port=10, count=1))[0]
266         self.assertEqual('100::1', service.hostname)
267         self.assertEqual(10, service.port)
268
269     def test_recognize_proxy_services_in_controller_response(self):
270         keep_client = arvados.KeepClient(api_client=self.mock_keep_services(
271             service_type='proxy', service_host='localhost', service_port=9, count=1),
272                                          block_cache=self.make_block_cache(self.disk_cache))
273         try:
274             # this will fail, but it ensures we get the service
275             # discovery response
276             keep_client.put('baz2')
277         except:
278             pass
279         self.assertTrue(keep_client.using_proxy)
280
281     def test_insecure_disables_tls_verify(self):
282         api_client = self.mock_keep_services(count=1)
283         force_timeout = socket.timeout("timed out")
284
285         api_client.insecure = True
286         with tutil.mock_keep_responses(b'foo', 200) as mock:
287             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
288             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
289             self.assertEqual(
290                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
291                 0)
292             self.assertEqual(
293                 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
294                 0)
295
296         api_client.insecure = False
297         with tutil.mock_keep_responses(b'foo', 200) as mock:
298             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
299             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
300             # getopt()==None here means we didn't change the
301             # default. If we were using real pycurl instead of a mock,
302             # it would return the default value 1.
303             self.assertEqual(
304                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
305                 None)
306             self.assertEqual(
307                 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
308                 None)
309
310     def test_refresh_signature(self):
311         blk_digest = '6f5902ac237024bdd0c176cb93063dc4+11'
312         blk_sig = 'da39a3ee5e6b4b0d3255bfef95601890afd80709@53bed294'
313         local_loc = blk_digest+'+A'+blk_sig
314         remote_loc = blk_digest+'+R'+blk_sig
315         api_client = self.mock_keep_services(count=1)
316         headers = {'X-Keep-Locator':local_loc}
317         with tutil.mock_keep_responses('', 200, **headers):
318             # Check that the translated locator gets returned
319             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
320             self.assertEqual(local_loc, keep_client.refresh_signature(remote_loc))
321             # Check that refresh_signature() uses the correct method and headers
322             keep_client._get_or_head = mock.MagicMock()
323             keep_client.refresh_signature(remote_loc)
324             args, kwargs = keep_client._get_or_head.call_args_list[0]
325             self.assertIn(remote_loc, args)
326             self.assertEqual("HEAD", kwargs['method'])
327             self.assertIn('X-Keep-Signature', kwargs['headers'])
328
329     # test_*_timeout verify that KeepClient instructs pycurl to use
330     # the appropriate connection and read timeouts. They don't care
331     # whether pycurl actually exhibits the expected timeout behavior
332     # -- those tests are in the KeepClientTimeout test class.
333
334     def test_get_timeout(self):
335         api_client = self.mock_keep_services(count=1)
336         force_timeout = socket.timeout("timed out")
337         with tutil.mock_keep_responses(force_timeout, 0) as mock:
338             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
339             with self.assertRaises(arvados.errors.KeepReadError):
340                 keep_client.get('ffffffffffffffffffffffffffffffff')
341             self.assertEqual(
342                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
343                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
344             self.assertEqual(
345                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
346                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
347             self.assertEqual(
348                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
349                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
350
351     def test_put_timeout(self):
352         api_client = self.mock_keep_services(count=1)
353         force_timeout = socket.timeout("timed out")
354         with tutil.mock_keep_responses(force_timeout, 0) as mock:
355             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
356             with self.assertRaises(arvados.errors.KeepWriteError):
357                 keep_client.put(b'foo')
358             self.assertEqual(
359                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
360                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
361             self.assertEqual(
362                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
363                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
364             self.assertEqual(
365                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
366                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
367
368     def test_head_timeout(self):
369         api_client = self.mock_keep_services(count=1)
370         force_timeout = socket.timeout("timed out")
371         with tutil.mock_keep_responses(force_timeout, 0) as mock:
372             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
373             with self.assertRaises(arvados.errors.KeepReadError):
374                 keep_client.head('ffffffffffffffffffffffffffffffff')
375             self.assertEqual(
376                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
377                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
378             self.assertEqual(
379                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
380                 None)
381             self.assertEqual(
382                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
383                 None)
384
385     def test_proxy_get_timeout(self):
386         api_client = self.mock_keep_services(service_type='proxy', count=1)
387         force_timeout = socket.timeout("timed out")
388         with tutil.mock_keep_responses(force_timeout, 0) as mock:
389             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
390             with self.assertRaises(arvados.errors.KeepReadError):
391                 keep_client.get('ffffffffffffffffffffffffffffffff')
392             self.assertEqual(
393                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
394                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
395             self.assertEqual(
396                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
397                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
398             self.assertEqual(
399                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
400                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
401
402     def test_proxy_head_timeout(self):
403         api_client = self.mock_keep_services(service_type='proxy', count=1)
404         force_timeout = socket.timeout("timed out")
405         with tutil.mock_keep_responses(force_timeout, 0) as mock:
406             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
407             with self.assertRaises(arvados.errors.KeepReadError):
408                 keep_client.head('ffffffffffffffffffffffffffffffff')
409             self.assertEqual(
410                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
411                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
412             self.assertEqual(
413                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
414                 None)
415             self.assertEqual(
416                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
417                 None)
418
419     def test_proxy_put_timeout(self):
420         self.disk_cache_dir = None
421         api_client = self.mock_keep_services(service_type='proxy', count=1)
422         force_timeout = socket.timeout("timed out")
423         with tutil.mock_keep_responses(force_timeout, 0) as mock:
424             keep_client = arvados.KeepClient(api_client=api_client)
425             with self.assertRaises(arvados.errors.KeepWriteError):
426                 keep_client.put('foo')
427             self.assertEqual(
428                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
429                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
430             self.assertEqual(
431                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
432                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
433             self.assertEqual(
434                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
435                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
436
437     def check_no_services_error(self, verb, exc_class):
438         api_client = mock.MagicMock(name='api_client')
439         api_client.keep_services().accessible().execute.side_effect = (
440             arvados.errors.ApiError)
441         keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
442         with self.assertRaises(exc_class) as err_check:
443             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
444         self.assertEqual(0, len(err_check.exception.request_errors()))
445
446     def test_get_error_with_no_services(self):
447         self.check_no_services_error('get', arvados.errors.KeepReadError)
448
449     def test_head_error_with_no_services(self):
450         self.check_no_services_error('head', arvados.errors.KeepReadError)
451
452     def test_put_error_with_no_services(self):
453         self.check_no_services_error('put', arvados.errors.KeepWriteError)
454
455     def check_errors_from_last_retry(self, verb, exc_class):
456         api_client = self.mock_keep_services(count=2)
457         req_mock = tutil.mock_keep_responses(
458             "retry error reporting test", 500, 500, 500, 500, 500, 500, 502, 502)
459         with req_mock, tutil.skip_sleep, \
460                 self.assertRaises(exc_class) as err_check:
461             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
462             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
463                                        num_retries=3)
464         self.assertEqual([502, 502], [
465                 getattr(error, 'status_code', None)
466                 for error in err_check.exception.request_errors().values()])
467         self.assertRegex(str(err_check.exception), r'failed to (read|write) .* after 4 attempts')
468
469     def test_get_error_reflects_last_retry(self):
470         self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
471
472     def test_head_error_reflects_last_retry(self):
473         self.check_errors_from_last_retry('head', arvados.errors.KeepReadError)
474
475     def test_put_error_reflects_last_retry(self):
476         self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
477
478     def test_put_error_does_not_include_successful_puts(self):
479         data = 'partial failure test'
480         data_loc = tutil.str_keep_locator(data)
481         api_client = self.mock_keep_services(count=3)
482         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
483                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
484             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
485             keep_client.put(data)
486         self.assertEqual(2, len(exc_check.exception.request_errors()))
487
488     def test_proxy_put_with_no_writable_services(self):
489         data = 'test with no writable services'
490         data_loc = tutil.str_keep_locator(data)
491         api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
492         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
493                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
494           keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
495           keep_client.put(data)
496         self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
497         self.assertEqual(0, len(exc_check.exception.request_errors()))
498
499     def test_oddball_service_get(self):
500         body = b'oddball service get'
501         api_client = self.mock_keep_services(service_type='fancynewblobstore')
502         with tutil.mock_keep_responses(body, 200):
503             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
504             actual = keep_client.get(tutil.str_keep_locator(body))
505         self.assertEqual(body, actual)
506
507     def test_oddball_service_put(self):
508         body = b'oddball service put'
509         pdh = tutil.str_keep_locator(body)
510         api_client = self.mock_keep_services(service_type='fancynewblobstore')
511         with tutil.mock_keep_responses(pdh, 200):
512             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
513             actual = keep_client.put(body, copies=1)
514         self.assertEqual(pdh, actual)
515
516     def test_oddball_service_writer_count(self):
517         body = b'oddball service writer count'
518         pdh = tutil.str_keep_locator(body)
519         api_client = self.mock_keep_services(service_type='fancynewblobstore',
520                                              count=4)
521         headers = {'x-keep-replicas-stored': 3}
522         with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
523                                        **headers) as req_mock:
524             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
525             actual = keep_client.put(body, copies=2)
526         self.assertEqual(pdh, actual)
527         self.assertEqual(1, req_mock.call_count)
528
529 @tutil.skip_sleep
530 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
531 class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
532     disk_cache = False
533
534     def setUp(self):
535         self.api_client = self.mock_keep_services(count=2)
536         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
537         self.data = b'xyzzy'
538         self.locator = '1271ed5ef305aadabc605b1609e24c52'
539
540     def tearDown(self):
541         DiskCacheBase.tearDown(self)
542
543     @mock.patch('arvados.KeepClient.KeepService.get')
544     def test_get_request_cache(self, get_mock):
545         with tutil.mock_keep_responses(self.data, 200, 200):
546             self.keep_client.get(self.locator)
547             self.keep_client.get(self.locator)
548         # Request already cached, don't require more than one request
549         get_mock.assert_called_once()
550
551     @mock.patch('arvados.KeepClient.KeepService.get')
552     def test_head_request_cache(self, get_mock):
553         with tutil.mock_keep_responses(self.data, 200, 200):
554             self.keep_client.head(self.locator)
555             self.keep_client.head(self.locator)
556         # Don't cache HEAD requests so that they're not confused with GET reqs
557         self.assertEqual(2, get_mock.call_count)
558
559     @mock.patch('arvados.KeepClient.KeepService.get')
560     def test_head_and_then_get_return_different_responses(self, get_mock):
561         head_resp = None
562         get_resp = None
563         get_mock.side_effect = [b'first response', b'second response']
564         with tutil.mock_keep_responses(self.data, 200, 200):
565             head_resp = self.keep_client.head(self.locator)
566             get_resp = self.keep_client.get(self.locator)
567         self.assertEqual(b'first response', head_resp)
568         # First reponse was not cached because it was from a HEAD request.
569         self.assertNotEqual(head_resp, get_resp)
570
571 @tutil.skip_sleep
572 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
573 class KeepStorageClassesTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
574     disk_cache = False
575
576     def setUp(self):
577         self.api_client = self.mock_keep_services(count=2)
578         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
579         self.data = b'xyzzy'
580         self.locator = '1271ed5ef305aadabc605b1609e24c52'
581
582     def tearDown(self):
583         DiskCacheBase.tearDown(self)
584
585     def test_multiple_default_storage_classes_req_header(self):
586         api_mock = self.api_client_mock()
587         api_mock.config.return_value = {
588             'StorageClasses': {
589                 'foo': { 'Default': True },
590                 'bar': { 'Default': True },
591                 'baz': { 'Default': False }
592             }
593         }
594         api_client = self.mock_keep_services(api_mock=api_mock, count=2)
595         keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
596         resp_hdr = {
597             'x-keep-storage-classes-confirmed': 'foo=1, bar=1',
598             'x-keep-replicas-stored': 1
599         }
600         with tutil.mock_keep_responses(self.locator, 200, **resp_hdr) as mock:
601             keep_client.put(self.data, copies=1)
602             req_hdr = mock.responses[0]
603             self.assertIn(
604                 'X-Keep-Storage-Classes: bar, foo', req_hdr.getopt(pycurl.HTTPHEADER))
605
606     def test_storage_classes_req_header(self):
607         self.assertEqual(
608             self.api_client.config()['StorageClasses'],
609             {'default': {'Default': True}})
610         cases = [
611             # requested, expected
612             [['foo'], 'X-Keep-Storage-Classes: foo'],
613             [['bar', 'foo'], 'X-Keep-Storage-Classes: bar, foo'],
614             [[], 'X-Keep-Storage-Classes: default'],
615             [None, 'X-Keep-Storage-Classes: default'],
616         ]
617         for req_classes, expected_header in cases:
618             headers = {'x-keep-replicas-stored': 1}
619             if req_classes is None or len(req_classes) == 0:
620                 confirmed_hdr = 'default=1'
621             elif len(req_classes) > 0:
622                 confirmed_hdr = ', '.join(["{}=1".format(cls) for cls in req_classes])
623             headers.update({'x-keep-storage-classes-confirmed': confirmed_hdr})
624             with tutil.mock_keep_responses(self.locator, 200, **headers) as mock:
625                 self.keep_client.put(self.data, copies=1, classes=req_classes)
626                 req_hdr = mock.responses[0]
627                 self.assertIn(expected_header, req_hdr.getopt(pycurl.HTTPHEADER))
628
629     def test_partial_storage_classes_put(self):
630         headers = {
631             'x-keep-replicas-stored': 1,
632             'x-keep-storage-classes-confirmed': 'foo=1'}
633         with tutil.mock_keep_responses(self.locator, 200, 503, **headers) as mock:
634             with self.assertRaises(arvados.errors.KeepWriteError):
635                 self.keep_client.put(self.data, copies=1, classes=['foo', 'bar'])
636             # 1st request, both classes pending
637             req1_headers = mock.responses[0].getopt(pycurl.HTTPHEADER)
638             self.assertIn('X-Keep-Storage-Classes: bar, foo', req1_headers)
639             # 2nd try, 'foo' class already satisfied
640             req2_headers = mock.responses[1].getopt(pycurl.HTTPHEADER)
641             self.assertIn('X-Keep-Storage-Classes: bar', req2_headers)
642
643     def test_successful_storage_classes_put_requests(self):
644         cases = [
645             # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, expected_requests
646             [ 1, ['foo'], 1, 'foo=1', 1],
647             [ 1, ['foo'], 2, 'foo=2', 1],
648             [ 2, ['foo'], 2, 'foo=2', 1],
649             [ 2, ['foo'], 1, 'foo=1', 2],
650             [ 1, ['foo', 'bar'], 1, 'foo=1, bar=1', 1],
651             [ 1, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
652             [ 2, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
653             [ 2, ['foo', 'bar'], 1, 'foo=1, bar=1', 2],
654             [ 1, ['foo', 'bar'], 1, None, 1],
655             [ 1, ['foo'], 1, None, 1],
656             [ 2, ['foo'], 2, None, 1],
657             [ 2, ['foo'], 1, None, 2],
658         ]
659         for w_copies, w_classes, c_copies, c_classes, e_reqs in cases:
660             headers = {'x-keep-replicas-stored': c_copies}
661             if c_classes is not None:
662                 headers.update({'x-keep-storage-classes-confirmed': c_classes})
663             with tutil.mock_keep_responses(self.locator, 200, 200, **headers) as mock:
664                 case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}", expected_requests={}'.format(w_copies, ', '.join(w_classes), c_copies, c_classes, e_reqs)
665                 self.assertEqual(self.locator,
666                     self.keep_client.put(self.data, copies=w_copies, classes=w_classes),
667                     case_desc)
668                 self.assertEqual(e_reqs, mock.call_count, case_desc)
669
670     def test_failed_storage_classes_put_requests(self):
671         cases = [
672             # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, return_code
673             [ 1, ['foo'], 1, 'bar=1', 200],
674             [ 1, ['foo'], 1, None, 503],
675             [ 2, ['foo'], 1, 'bar=1, foo=0', 200],
676             [ 3, ['foo'], 1, 'bar=1, foo=1', 200],
677             [ 3, ['foo', 'bar'], 1, 'bar=2, foo=1', 200],
678         ]
679         for w_copies, w_classes, c_copies, c_classes, return_code in cases:
680             headers = {'x-keep-replicas-stored': c_copies}
681             if c_classes is not None:
682                 headers.update({'x-keep-storage-classes-confirmed': c_classes})
683             with tutil.mock_keep_responses(self.locator, return_code, return_code, **headers):
684                 case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}"'.format(w_copies, ', '.join(w_classes), c_copies, c_classes)
685                 with self.assertRaises(arvados.errors.KeepWriteError, msg=case_desc):
686                     self.keep_client.put(self.data, copies=w_copies, classes=w_classes)
687
688 @tutil.skip_sleep
689 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
690 class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
691     disk_cache = False
692
693     def setUp(self):
694         self.api_client = self.mock_keep_services(count=2)
695         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
696         self.data = b'xyzzy'
697         self.locator = '1271ed5ef305aadabc605b1609e24c52'
698         self.test_id = arvados.util.new_request_id()
699         self.assertRegex(self.test_id, r'^req-[a-z0-9]{20}$')
700         # If we don't set request_id to None explicitly here, it will
701         # return <MagicMock name='api_client_mock.request_id'
702         # id='123456789'>:
703         self.api_client.request_id = None
704
705     def tearDown(self):
706         DiskCacheBase.tearDown(self)
707
708     def test_default_to_api_client_request_id(self):
709         self.api_client.request_id = self.test_id
710         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
711             self.keep_client.put(self.data)
712         self.assertEqual(2, len(mock.responses))
713         for resp in mock.responses:
714             self.assertProvidedRequestId(resp)
715
716         with tutil.mock_keep_responses(self.data, 200) as mock:
717             self.keep_client.get(self.locator)
718         self.assertProvidedRequestId(mock.responses[0])
719
720         with tutil.mock_keep_responses(b'', 200) as mock:
721             self.keep_client.head(self.locator)
722         self.assertProvidedRequestId(mock.responses[0])
723
724     def test_explicit_request_id(self):
725         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
726             self.keep_client.put(self.data, request_id=self.test_id)
727         self.assertEqual(2, len(mock.responses))
728         for resp in mock.responses:
729             self.assertProvidedRequestId(resp)
730
731         with tutil.mock_keep_responses(self.data, 200) as mock:
732             self.keep_client.get(self.locator, request_id=self.test_id)
733         self.assertProvidedRequestId(mock.responses[0])
734
735         with tutil.mock_keep_responses(b'', 200) as mock:
736             self.keep_client.head(self.locator, request_id=self.test_id)
737         self.assertProvidedRequestId(mock.responses[0])
738
739     def test_automatic_request_id(self):
740         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
741             self.keep_client.put(self.data)
742         self.assertEqual(2, len(mock.responses))
743         for resp in mock.responses:
744             self.assertAutomaticRequestId(resp)
745
746         with tutil.mock_keep_responses(self.data, 200) as mock:
747             self.keep_client.get(self.locator)
748         self.assertAutomaticRequestId(mock.responses[0])
749
750         with tutil.mock_keep_responses(b'', 200) as mock:
751             self.keep_client.head(self.locator)
752         self.assertAutomaticRequestId(mock.responses[0])
753
754     def test_request_id_in_exception(self):
755         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
756             with self.assertRaisesRegex(arvados.errors.KeepReadError, self.test_id):
757                 self.keep_client.head(self.locator, request_id=self.test_id)
758
759         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
760             with self.assertRaisesRegex(arvados.errors.KeepReadError, r'req-[a-z0-9]{20}'):
761                 self.keep_client.get(self.locator)
762
763         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
764             with self.assertRaisesRegex(arvados.errors.KeepWriteError, self.test_id):
765                 self.keep_client.put(self.data, request_id=self.test_id)
766
767         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
768             with self.assertRaisesRegex(arvados.errors.KeepWriteError, r'req-[a-z0-9]{20}'):
769                 self.keep_client.put(self.data)
770
771     def assertAutomaticRequestId(self, resp):
772         hdr = [x for x in resp.getopt(pycurl.HTTPHEADER)
773                if x.startswith('X-Request-Id: ')][0]
774         self.assertNotEqual(hdr, 'X-Request-Id: '+self.test_id)
775         self.assertRegex(hdr, r'^X-Request-Id: req-[a-z0-9]{20}$')
776
777     def assertProvidedRequestId(self, resp):
778         self.assertIn('X-Request-Id: '+self.test_id,
779                       resp.getopt(pycurl.HTTPHEADER))
780
781
782 @tutil.skip_sleep
783 #@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
784 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
785     disk_cache = False
786
787     def setUp(self):
788         # expected_order[i] is the probe order for
789         # hash=md5(sprintf("%064x",i)) where there are 16 services
790         # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
791         # the first probe for the block consisting of 64 "0"
792         # characters is the service whose uuid is
793         # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
794         self.services = 16
795         self.expected_order = [
796             list('3eab2d5fc9681074'),
797             list('097dba52e648f1c3'),
798             list('c5b4e023f8a7d691'),
799             list('9d81c02e76a3bf54'),
800             ]
801         self.blocks = [
802             "{:064x}".format(x).encode()
803             for x in range(len(self.expected_order))]
804         self.hashes = [
805             hashlib.md5(self.blocks[x]).hexdigest()
806             for x in range(len(self.expected_order))]
807         self.api_client = self.mock_keep_services(count=self.services)
808         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
809
810     def tearDown(self):
811         DiskCacheBase.tearDown(self)
812
813     def test_weighted_service_roots_against_reference_set(self):
814         # Confirm weighted_service_roots() returns the correct order
815         for i, hash in enumerate(self.hashes):
816             roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash))
817             got_order = [
818                 re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
819                 for root in roots]
820             self.assertEqual(self.expected_order[i], got_order)
821
822     def test_get_probe_order_against_reference_set(self):
823         self._test_probe_order_against_reference_set(
824             lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
825
826     def test_head_probe_order_against_reference_set(self):
827         self._test_probe_order_against_reference_set(
828             lambda i: self.keep_client.head(self.hashes[i], num_retries=1))
829
830     def test_put_probe_order_against_reference_set(self):
831         # copies=1 prevents the test from being sensitive to races
832         # between writer threads.
833         self._test_probe_order_against_reference_set(
834             lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1))
835
836     def _test_probe_order_against_reference_set(self, op):
837         for i in range(len(self.blocks)):
838             with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \
839                  self.assertRaises(arvados.errors.KeepRequestError):
840                 op(i)
841             got_order = [
842                 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
843                 for resp in mock.responses]
844             self.assertEqual(self.expected_order[i]*2, got_order)
845
846     def test_put_probe_order_multiple_copies(self):
847         for copies in range(2, 4):
848             for i in range(len(self.blocks)):
849                 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \
850                      self.assertRaises(arvados.errors.KeepWriteError):
851                     self.keep_client.put(self.blocks[i], num_retries=2, copies=copies)
852                 got_order = [
853                     re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
854                     for resp in mock.responses]
855                 # With T threads racing to make requests, the position
856                 # of a given server in the sequence of HTTP requests
857                 # (got_order) cannot be more than T-1 positions
858                 # earlier than that server's position in the reference
859                 # probe sequence (expected_order).
860                 #
861                 # Loop invariant: we have accounted for +pos+ expected
862                 # probes, either by seeing them in +got_order+ or by
863                 # putting them in +pending+ in the hope of seeing them
864                 # later. As long as +len(pending)<T+, we haven't
865                 # started a request too early.
866                 pending = []
867                 for pos, expected in enumerate(self.expected_order[i]*3):
868                     got = got_order[pos-len(pending)]
869                     while got in pending:
870                         del pending[pending.index(got)]
871                         got = got_order[pos-len(pending)]
872                     if got != expected:
873                         pending.append(expected)
874                         self.assertLess(
875                             len(pending), copies,
876                             "pending={}, with copies={}, got {}, expected {}".format(
877                                 pending, copies, repr(got_order), repr(self.expected_order[i]*3)))
878
879     def test_probe_waste_adding_one_server(self):
880         hashes = [
881             hashlib.md5("{:064x}".format(x).encode()).hexdigest() for x in range(100)]
882         initial_services = 12
883         self.api_client = self.mock_keep_services(count=initial_services)
884         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
885         probes_before = [
886             self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
887         for added_services in range(1, 12):
888             api_client = self.mock_keep_services(count=initial_services+added_services)
889             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
890             total_penalty = 0
891             for hash_index in range(len(hashes)):
892                 probe_after = keep_client.weighted_service_roots(
893                     arvados.KeepLocator(hashes[hash_index]))
894                 penalty = probe_after.index(probes_before[hash_index][0])
895                 self.assertLessEqual(penalty, added_services)
896                 total_penalty += penalty
897             # Average penalty per block should not exceed
898             # N(added)/N(orig) by more than 20%, and should get closer
899             # to the ideal as we add data points.
900             expect_penalty = (
901                 added_services *
902                 len(hashes) / initial_services)
903             max_penalty = (
904                 expect_penalty *
905                 (120 - added_services)/100)
906             min_penalty = (
907                 expect_penalty * 8/10)
908             self.assertTrue(
909                 min_penalty <= total_penalty <= max_penalty,
910                 "With {}+{} services, {} blocks, penalty {} but expected {}..{}".format(
911                     initial_services,
912                     added_services,
913                     len(hashes),
914                     total_penalty,
915                     min_penalty,
916                     max_penalty))
917
918     def check_64_zeros_error_order(self, verb, exc_class):
919         data = b'0' * 64
920         if verb == 'get':
921             data = tutil.str_keep_locator(data)
922         # Arbitrary port number:
923         aport = random.randint(1024,65535)
924         api_client = self.mock_keep_services(service_port=aport, count=self.services)
925         keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
926         with mock.patch('pycurl.Curl') as curl_mock, \
927              self.assertRaises(exc_class) as err_check:
928             curl_mock.return_value = tutil.FakeCurl.make(code=500, body=b'')
929             getattr(keep_client, verb)(data)
930         urls = [urllib.parse.urlparse(url)
931                 for url in err_check.exception.request_errors()]
932         self.assertEqual([('keep0x' + c, aport) for c in '3eab2d5fc9681074'],
933                          [(url.hostname, url.port) for url in urls])
934
935     def test_get_error_shows_probe_order(self):
936         self.check_64_zeros_error_order('get', arvados.errors.KeepReadError)
937
938     def test_put_error_shows_probe_order(self):
939         self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
940
941 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
942 class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase, DiskCacheBase):
943     disk_cache = False
944
945     # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
946     # 1s worth of data and then trigger bandwidth errors before running
947     # out of data.
948     DATA = b'x'*2**11
949     BANDWIDTH_LOW_LIM = 1024
950     TIMEOUT_TIME = 1.0
951
952     def tearDown(self):
953         DiskCacheBase.tearDown(self)
954
955     class assertTakesBetween(unittest.TestCase):
956         def __init__(self, tmin, tmax):
957             self.tmin = tmin
958             self.tmax = tmax
959
960         def __enter__(self):
961             self.t0 = time.time()
962
963         def __exit__(self, *args, **kwargs):
964             # Round times to milliseconds, like CURL. Otherwise, we
965             # fail when CURL reaches a 1s timeout at 0.9998s.
966             delta = round(time.time() - self.t0, 3)
967             self.assertGreaterEqual(delta, self.tmin)
968             self.assertLessEqual(delta, self.tmax)
969
970     class assertTakesGreater(unittest.TestCase):
971         def __init__(self, tmin):
972             self.tmin = tmin
973
974         def __enter__(self):
975             self.t0 = time.time()
976
977         def __exit__(self, *args, **kwargs):
978             delta = round(time.time() - self.t0, 3)
979             self.assertGreaterEqual(delta, self.tmin)
980
981     def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
982         return arvados.KeepClient(
983             api_client=self.api_client,
984             timeout=timeouts, block_cache=self.make_block_cache(self.disk_cache))
985
986     def test_timeout_slow_connect(self):
987         # Can't simulate TCP delays with our own socket. Leave our
988         # stub server running uselessly, and try to connect to an
989         # unroutable IP address instead.
990         self.api_client = self.mock_keep_services(
991             count=1,
992             service_host='240.0.0.0',
993         )
994         with self.assertTakesBetween(0.1, 0.5):
995             with self.assertRaises(arvados.errors.KeepWriteError):
996                 self.keepClient().put(self.DATA, copies=1, num_retries=0)
997
998     def test_low_bandwidth_no_delays_success(self):
999         self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
1000         kc = self.keepClient()
1001         loc = kc.put(self.DATA, copies=1, num_retries=0)
1002         self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
1003
1004     def test_too_low_bandwidth_no_delays_failure(self):
1005         # Check that lessening bandwidth corresponds to failing
1006         kc = self.keepClient()
1007         loc = kc.put(self.DATA, copies=1, num_retries=0)
1008         self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
1009         with self.assertTakesGreater(self.TIMEOUT_TIME):
1010             with self.assertRaises(arvados.errors.KeepReadError):
1011                 kc.get(loc, num_retries=0)
1012         with self.assertTakesGreater(self.TIMEOUT_TIME):
1013             with self.assertRaises(arvados.errors.KeepWriteError):
1014                 kc.put(self.DATA, copies=1, num_retries=0)
1015
1016     def test_low_bandwidth_with_server_response_delay_failure(self):
1017         kc = self.keepClient()
1018         loc = kc.put(self.DATA, copies=1, num_retries=0)
1019         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
1020         # Note the actual delay must be 1s longer than the low speed
1021         # limit interval in order for curl to detect it reliably.
1022         self.server.setdelays(response=self.TIMEOUT_TIME+1)
1023         with self.assertTakesGreater(self.TIMEOUT_TIME):
1024             with self.assertRaises(arvados.errors.KeepReadError):
1025                 kc.get(loc, num_retries=0)
1026         with self.assertTakesGreater(self.TIMEOUT_TIME):
1027             with self.assertRaises(arvados.errors.KeepWriteError):
1028                 kc.put(self.DATA, copies=1, num_retries=0)
1029         with self.assertTakesGreater(self.TIMEOUT_TIME):
1030             kc.head(loc, num_retries=0)
1031
1032     def test_low_bandwidth_with_server_mid_delay_failure(self):
1033         kc = self.keepClient()
1034         loc = kc.put(self.DATA, copies=1, num_retries=0)
1035         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
1036         # Note the actual delay must be 1s longer than the low speed
1037         # limit interval in order for curl to detect it reliably.
1038         self.server.setdelays(mid_write=self.TIMEOUT_TIME+1, mid_read=self.TIMEOUT_TIME+1)
1039         with self.assertTakesGreater(self.TIMEOUT_TIME):
1040             with self.assertRaises(arvados.errors.KeepReadError) as e:
1041                 kc.get(loc, num_retries=0)
1042         with self.assertTakesGreater(self.TIMEOUT_TIME):
1043             with self.assertRaises(arvados.errors.KeepWriteError):
1044                 kc.put(self.DATA, copies=1, num_retries=0)
1045
1046     def test_timeout_slow_request(self):
1047         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
1048         self.server.setdelays(request=.2)
1049         self._test_connect_timeout_under_200ms(loc)
1050         self.server.setdelays(request=2)
1051         self._test_response_timeout_under_2s(loc)
1052
1053     def test_timeout_slow_response(self):
1054         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
1055         self.server.setdelays(response=.2)
1056         self._test_connect_timeout_under_200ms(loc)
1057         self.server.setdelays(response=2)
1058         self._test_response_timeout_under_2s(loc)
1059
1060     def test_timeout_slow_response_body(self):
1061         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
1062         self.server.setdelays(response_body=.2)
1063         self._test_connect_timeout_under_200ms(loc)
1064         self.server.setdelays(response_body=2)
1065         self._test_response_timeout_under_2s(loc)
1066
1067     def _test_connect_timeout_under_200ms(self, loc):
1068         # Allow 100ms to connect, then 1s for response. Everything
1069         # should work, and everything should take at least 200ms to
1070         # return.
1071         kc = self.keepClient(timeouts=(.1, 1))
1072         with self.assertTakesBetween(.2, .3):
1073             kc.put(self.DATA, copies=1, num_retries=0)
1074         with self.assertTakesBetween(.2, .3):
1075             self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
1076
1077     def _test_response_timeout_under_2s(self, loc):
1078         # Allow 10s to connect, then 1s for response. Nothing should
1079         # work, and everything should take at least 1s to return.
1080         kc = self.keepClient(timeouts=(10, 1))
1081         with self.assertTakesBetween(1, 9):
1082             with self.assertRaises(arvados.errors.KeepReadError):
1083                 kc.get(loc, num_retries=0)
1084         with self.assertTakesBetween(1, 9):
1085             with self.assertRaises(arvados.errors.KeepWriteError):
1086                 kc.put(self.DATA, copies=1, num_retries=0)
1087
1088 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1089 class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
1090     disk_cache = False
1091
1092     def tearDown(self):
1093         DiskCacheBase.tearDown(self)
1094
1095     def mock_disks_and_gateways(self, disks=3, gateways=1):
1096         self.gateways = [{
1097                 'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
1098                 'owner_uuid': 'zzzzz-tpzed-000000000000000',
1099                 'service_host': 'gatewayhost{}'.format(i),
1100                 'service_port': 12345,
1101                 'service_ssl_flag': True,
1102                 'service_type': 'gateway:test',
1103         } for i in range(gateways)]
1104         self.gateway_roots = [
1105             "https://{service_host}:{service_port}/".format(**gw)
1106             for gw in self.gateways]
1107         self.api_client = self.mock_keep_services(
1108             count=disks, additional_services=self.gateways)
1109         self.keepClient = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
1110
1111     @mock.patch('pycurl.Curl')
1112     def test_get_with_gateway_hint_first(self, MockCurl):
1113         MockCurl.return_value = tutil.FakeCurl.make(
1114             code=200, body='foo', headers={'Content-Length': 3})
1115         self.mock_disks_and_gateways()
1116         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@' + self.gateways[0]['uuid']
1117         self.assertEqual(b'foo', self.keepClient.get(locator))
1118         self.assertEqual(self.gateway_roots[0]+locator,
1119                          MockCurl.return_value.getopt(pycurl.URL).decode())
1120         self.assertEqual(True, self.keepClient.head(locator))
1121
1122     @mock.patch('pycurl.Curl')
1123     def test_get_with_gateway_hints_in_order(self, MockCurl):
1124         gateways = 4
1125         disks = 3
1126         mocks = [
1127             tutil.FakeCurl.make(code=404, body='')
1128             for _ in range(gateways+disks)
1129         ]
1130         MockCurl.side_effect = tutil.queue_with(mocks)
1131         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1132         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1133                            ['K@'+gw['uuid'] for gw in self.gateways])
1134         with self.assertRaises(arvados.errors.NotFoundError):
1135             self.keepClient.get(locator)
1136         # Gateways are tried first, in the order given.
1137         for i, root in enumerate(self.gateway_roots):
1138             self.assertEqual(root+locator,
1139                              mocks[i].getopt(pycurl.URL).decode())
1140         # Disk services are tried next.
1141         for i in range(gateways, gateways+disks):
1142             self.assertRegex(
1143                 mocks[i].getopt(pycurl.URL).decode(),
1144                 r'keep0x')
1145
1146     @mock.patch('pycurl.Curl')
1147     def test_head_with_gateway_hints_in_order(self, MockCurl):
1148         gateways = 4
1149         disks = 3
1150         mocks = [
1151             tutil.FakeCurl.make(code=404, body=b'')
1152             for _ in range(gateways+disks)
1153         ]
1154         MockCurl.side_effect = tutil.queue_with(mocks)
1155         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1156         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1157                            ['K@'+gw['uuid'] for gw in self.gateways])
1158         with self.assertRaises(arvados.errors.NotFoundError):
1159             self.keepClient.head(locator)
1160         # Gateways are tried first, in the order given.
1161         for i, root in enumerate(self.gateway_roots):
1162             self.assertEqual(root+locator,
1163                              mocks[i].getopt(pycurl.URL).decode())
1164         # Disk services are tried next.
1165         for i in range(gateways, gateways+disks):
1166             self.assertRegex(
1167                 mocks[i].getopt(pycurl.URL).decode(),
1168                 r'keep0x')
1169
1170     @mock.patch('pycurl.Curl')
1171     def test_get_with_remote_proxy_hint(self, MockCurl):
1172         MockCurl.return_value = tutil.FakeCurl.make(
1173             code=200, body=b'foo', headers={'Content-Length': 3})
1174         self.mock_disks_and_gateways()
1175         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1176         self.assertEqual(b'foo', self.keepClient.get(locator))
1177         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1178                          MockCurl.return_value.getopt(pycurl.URL).decode())
1179
1180     @mock.patch('pycurl.Curl')
1181     def test_head_with_remote_proxy_hint(self, MockCurl):
1182         MockCurl.return_value = tutil.FakeCurl.make(
1183             code=200, body=b'foo', headers={'Content-Length': 3})
1184         self.mock_disks_and_gateways()
1185         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1186         self.assertEqual(True, self.keepClient.head(locator))
1187         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1188                          MockCurl.return_value.getopt(pycurl.URL).decode())
1189
1190 class KeepClientRetryTestMixin(object):
1191     disk_cache = False
1192
1193     # Testing with a local Keep store won't exercise the retry behavior.
1194     # Instead, our strategy is:
1195     # * Create a client with one proxy specified (pointed at a black
1196     #   hole), so there's no need to instantiate an API client, and
1197     #   all HTTP requests come from one place.
1198     # * Mock httplib's request method to provide simulated responses.
1199     # This lets us test the retry logic extensively without relying on any
1200     # supporting servers, and prevents side effects in case something hiccups.
1201     # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
1202     # run_method().
1203     #
1204     # Test classes must define TEST_PATCHER to a method that mocks
1205     # out appropriate methods in the client.
1206
1207     PROXY_ADDR = 'http://[%s]:65535/' % (tutil.TEST_HOST,)
1208     TEST_DATA = b'testdata'
1209     TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
1210
1211     def setUp(self):
1212         self.client_kwargs = {'proxy': self.PROXY_ADDR, 'local_store': ''}
1213
1214     def new_client(self, **caller_kwargs):
1215         kwargs = self.client_kwargs.copy()
1216         kwargs.update(caller_kwargs)
1217         kwargs['block_cache'] = self.make_block_cache(self.disk_cache)
1218         return arvados.KeepClient(**kwargs)
1219
1220     def run_method(self, *args, **kwargs):
1221         raise NotImplementedError("test subclasses must define run_method")
1222
1223     def check_success(self, expected=None, *args, **kwargs):
1224         if expected is None:
1225             expected = self.DEFAULT_EXPECT
1226         self.assertEqual(expected, self.run_method(*args, **kwargs))
1227
1228     def check_exception(self, error_class=None, *args, **kwargs):
1229         if error_class is None:
1230             error_class = self.DEFAULT_EXCEPTION
1231         with self.assertRaises(error_class) as err:
1232             self.run_method(*args, **kwargs)
1233         return err
1234
1235     def test_immediate_success(self):
1236         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 200):
1237             self.check_success()
1238
1239     def test_retry_then_success(self):
1240         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1241             self.check_success(num_retries=3)
1242
1243     def test_exception_then_success(self):
1244         with self.TEST_PATCHER(self.DEFAULT_EXPECT, Exception('mock err'), 200):
1245             self.check_success(num_retries=3)
1246
1247     def test_no_default_retry(self):
1248         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1249             self.check_exception()
1250
1251     def test_no_retry_after_permanent_error(self):
1252         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 403, 200):
1253             self.check_exception(num_retries=3)
1254
1255     def test_error_after_retries_exhausted(self):
1256         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 500, 200):
1257             err = self.check_exception(num_retries=1)
1258         self.assertRegex(str(err.exception), r'failed to .* after 2 attempts')
1259
1260     def test_num_retries_instance_fallback(self):
1261         self.client_kwargs['num_retries'] = 3
1262         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1263             self.check_success()
1264
1265
1266 @tutil.skip_sleep
1267 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1268 class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1269     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
1270     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1271     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1272     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1273
1274     def tearDown(self):
1275         DiskCacheBase.tearDown(self)
1276
1277     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1278                    *args, **kwargs):
1279         return self.new_client().get(locator, *args, **kwargs)
1280
1281     def test_specific_exception_when_not_found(self):
1282         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1283             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1284
1285     def test_general_exception_with_mixed_errors(self):
1286         # get should raise a NotFoundError if no server returns the block,
1287         # and a high threshold of servers report that it's not found.
1288         # This test rigs up 50/50 disagreement between two servers, and
1289         # checks that it does not become a NotFoundError.
1290         client = self.new_client()
1291         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1292             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1293                 client.get(self.HINTED_LOCATOR)
1294             self.assertNotIsInstance(
1295                 exc_check.exception, arvados.errors.NotFoundError,
1296                 "mixed errors raised NotFoundError")
1297
1298     def test_hint_server_can_succeed_without_retries(self):
1299         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1300             self.check_success(locator=self.HINTED_LOCATOR)
1301
1302     def test_try_next_server_after_timeout(self):
1303         with tutil.mock_keep_responses(
1304                 (socket.timeout("timed out"), 200),
1305                 (self.DEFAULT_EXPECT, 200)):
1306             self.check_success(locator=self.HINTED_LOCATOR)
1307
1308     def test_retry_data_with_wrong_checksum(self):
1309         with tutil.mock_keep_responses(
1310                 ('baddata', 200),
1311                 (self.DEFAULT_EXPECT, 200)):
1312             self.check_success(locator=self.HINTED_LOCATOR)
1313
1314 @tutil.skip_sleep
1315 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1316 class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1317     DEFAULT_EXPECT = True
1318     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1319     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1320     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1321
1322     def tearDown(self):
1323         DiskCacheBase.tearDown(self)
1324
1325     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1326                    *args, **kwargs):
1327         return self.new_client().head(locator, *args, **kwargs)
1328
1329     def test_specific_exception_when_not_found(self):
1330         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1331             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1332
1333     def test_general_exception_with_mixed_errors(self):
1334         # head should raise a NotFoundError if no server returns the block,
1335         # and a high threshold of servers report that it's not found.
1336         # This test rigs up 50/50 disagreement between two servers, and
1337         # checks that it does not become a NotFoundError.
1338         client = self.new_client()
1339         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1340             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1341                 client.head(self.HINTED_LOCATOR)
1342             self.assertNotIsInstance(
1343                 exc_check.exception, arvados.errors.NotFoundError,
1344                 "mixed errors raised NotFoundError")
1345
1346     def test_hint_server_can_succeed_without_retries(self):
1347         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1348             self.check_success(locator=self.HINTED_LOCATOR)
1349
1350     def test_try_next_server_after_timeout(self):
1351         with tutil.mock_keep_responses(
1352                 (socket.timeout("timed out"), 200),
1353                 (self.DEFAULT_EXPECT, 200)):
1354             self.check_success(locator=self.HINTED_LOCATOR)
1355
1356 @tutil.skip_sleep
1357 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1358 class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1359     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
1360     DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
1361     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1362
1363     def tearDown(self):
1364         DiskCacheBase.tearDown(self)
1365
1366     def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
1367                    copies=1, *args, **kwargs):
1368         return self.new_client().put(data, copies, *args, **kwargs)
1369
1370     def test_do_not_send_multiple_copies_to_same_server(self):
1371         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200):
1372             self.check_exception(copies=2, num_retries=3)
1373
1374
1375 class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
1376
1377     class FakeKeepService(object):
1378         def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
1379             self.delay = delay
1380             self.will_succeed = will_succeed
1381             self.will_raise = will_raise
1382             self._result = {}
1383             self._result['headers'] = {}
1384             self._result['headers']['x-keep-replicas-stored'] = str(replicas)
1385             self._result['headers']['x-keep-storage-classes-confirmed'] = 'default={}'.format(replicas)
1386             self._result['body'] = 'foobar'
1387
1388         def put(self, data_hash, data, timeout, headers):
1389             time.sleep(self.delay)
1390             if self.will_raise is not None:
1391                 raise self.will_raise
1392             return self.will_succeed
1393
1394         def last_result(self):
1395             if self.will_succeed:
1396                 return self._result
1397             else:
1398                 return {"status_code": 500, "body": "didn't succeed"}
1399
1400         def finished(self):
1401             return False
1402
1403     def setUp(self):
1404         self.copies = 3
1405         self.pool = arvados.KeepClient.KeepWriterThreadPool(
1406             data = 'foo',
1407             data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1408             max_service_replicas = self.copies,
1409             copies = self.copies
1410         )
1411
1412     def test_only_write_enough_on_success(self):
1413         for i in range(10):
1414             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1415             self.pool.add_task(ks, None)
1416         self.pool.join()
1417         self.assertEqual(self.pool.done(), (self.copies, []))
1418
1419     def test_only_write_enough_on_partial_success(self):
1420         for i in range(5):
1421             ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
1422             self.pool.add_task(ks, None)
1423             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1424             self.pool.add_task(ks, None)
1425         self.pool.join()
1426         self.assertEqual(self.pool.done(), (self.copies, []))
1427
1428     def test_only_write_enough_when_some_crash(self):
1429         for i in range(5):
1430             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1431             self.pool.add_task(ks, None)
1432             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1433             self.pool.add_task(ks, None)
1434         self.pool.join()
1435         self.assertEqual(self.pool.done(), (self.copies, []))
1436
1437     def test_fail_when_too_many_crash(self):
1438         for i in range(self.copies+1):
1439             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1440             self.pool.add_task(ks, None)
1441         for i in range(self.copies-1):
1442             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1443             self.pool.add_task(ks, None)
1444         self.pool.join()
1445         self.assertEqual(self.pool.done(), (self.copies-1, []))
1446
1447
1448 @tutil.skip_sleep
1449 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1450 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
1451     block_cache = False
1452
1453     # Test put()s that need two distinct servers to succeed, possibly
1454     # requiring multiple passes through the retry loop.
1455
1456     def setUp(self):
1457         self.api_client = self.mock_keep_services(count=2)
1458         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
1459
1460     def tearDown(self):
1461         DiskCacheBase.tearDown(self)
1462
1463     def test_success_after_exception(self):
1464         with tutil.mock_keep_responses(
1465                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1466                 Exception('mock err'), 200, 200) as req_mock:
1467             self.keep_client.put('foo', num_retries=1, copies=2)
1468         self.assertEqual(3, req_mock.call_count)
1469
1470     def test_success_after_retryable_error(self):
1471         with tutil.mock_keep_responses(
1472                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1473                 500, 200, 200) as req_mock:
1474             self.keep_client.put('foo', num_retries=1, copies=2)
1475         self.assertEqual(3, req_mock.call_count)
1476
1477     def test_fail_after_final_error(self):
1478         # First retry loop gets a 200 (can't achieve replication by
1479         # storing again on that server) and a 400 (can't retry that
1480         # server at all), so we shouldn't try a third request.
1481         with tutil.mock_keep_responses(
1482                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1483                 200, 400, 200) as req_mock:
1484             with self.assertRaises(arvados.errors.KeepWriteError):
1485                 self.keep_client.put('foo', num_retries=1, copies=2)
1486         self.assertEqual(2, req_mock.call_count)
1487
1488 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1489 class KeepClientAPIErrorTest(unittest.TestCase, DiskCacheBase):
1490     disk_cache = False
1491
1492     def tearDown(self):
1493         DiskCacheBase.tearDown(self)
1494
1495     def test_api_fail(self):
1496         class ApiMock(object):
1497             def __getattr__(self, r):
1498                 if r == "api_token":
1499                     return "abc"
1500                 elif r == "insecure":
1501                     return False
1502                 elif r == "config":
1503                     return lambda: {}
1504                 else:
1505                     raise arvados.errors.KeepReadError()
1506         keep_client = arvados.KeepClient(api_client=ApiMock(),
1507                                          proxy='', local_store='',
1508                                          block_cache=self.make_block_cache(self.disk_cache))
1509
1510         # The bug this is testing for is that if an API (not
1511         # keepstore) exception is thrown as part of a get(), the next
1512         # attempt to get that same block will result in a deadlock.
1513         # This is why there are two get()s in a row.  Unfortunately,
1514         # the failure mode for this test is that the test suite
1515         # deadlocks, there isn't a good way to avoid that without
1516         # adding a special case that has no use except for this test.
1517
1518         with self.assertRaises(arvados.errors.KeepReadError):
1519             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1520         with self.assertRaises(arvados.errors.KeepReadError):
1521             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")