Merge branch 'main' into 18842-arv-mount-disk-config
[arvados.git] / sdk / python / tests / test_keep_client.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 standard_library.install_aliases()
9 from builtins import str
10 from builtins import range
11 from builtins import object
12 import hashlib
13 import mock
14 import os
15 import pycurl
16 import random
17 import re
18 import shutil
19 import socket
20 import sys
21 import time
22 import unittest
23 import urllib.parse
24
25 import parameterized
26
27 import arvados
28 import arvados.retry
29 import arvados.util
30 from . import arvados_testutil as tutil
31 from . import keepstub
32 from . import run_test_server
33
34 from .arvados_testutil import make_block_cache
35
36 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
37 class KeepTestCase(run_test_server.TestCaseWithServers):
38     disk_cache = False
39     MAIN_SERVER = {}
40     KEEP_SERVER = {}
41
42     @classmethod
43     def setUpClass(cls):
44         super(KeepTestCase, cls).setUpClass()
45         run_test_server.authorize_with("admin")
46         cls.api_client = arvados.api('v1')
47         cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
48                                              proxy='', local_store='',
49                                              block_cache=make_block_cache(cls.disk_cache))
50
51     def test_KeepBasicRWTest(self):
52         self.assertEqual(0, self.keep_client.upload_counter.get())
53         foo_locator = self.keep_client.put('foo')
54         self.assertRegex(
55             foo_locator,
56             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
57             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
58
59         # 6 bytes because uploaded 2 copies
60         self.assertEqual(6, self.keep_client.upload_counter.get())
61
62         self.assertEqual(0, self.keep_client.download_counter.get())
63         self.assertTrue(tutil.binary_compare(self.keep_client.get(foo_locator),
64                          b'foo'),
65                          'wrong content from Keep.get(md5("foo"))')
66         self.assertEqual(3, self.keep_client.download_counter.get())
67
68     def test_KeepBinaryRWTest(self):
69         blob_str = b'\xff\xfe\xf7\x00\x01\x02'
70         blob_locator = self.keep_client.put(blob_str)
71         self.assertRegex(
72             blob_locator,
73             '^7fc7c53b45e53926ba52821140fef396\+6',
74             ('wrong locator from Keep.put(<binarydata>):' + blob_locator))
75         self.assertEqual(self.keep_client.get(blob_locator),
76                          blob_str,
77                          'wrong content from Keep.get(md5(<binarydata>))')
78
79     def test_KeepLongBinaryRWTest(self):
80         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
81         for i in range(0, 23):
82             blob_data = blob_data + blob_data
83         blob_locator = self.keep_client.put(blob_data)
84         self.assertRegex(
85             blob_locator,
86             '^84d90fc0d8175dd5dcfab04b999bc956\+67108864',
87             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
88         self.assertEqual(self.keep_client.get(blob_locator),
89                          blob_data,
90                          'wrong content from Keep.get(md5(<binarydata>))')
91
92     @unittest.skip("unreliable test - please fix and close #8752")
93     def test_KeepSingleCopyRWTest(self):
94         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
95         blob_locator = self.keep_client.put(blob_data, copies=1)
96         self.assertRegex(
97             blob_locator,
98             '^c902006bc98a3eb4a3663b65ab4a6fab\+8',
99             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
100         self.assertEqual(self.keep_client.get(blob_locator),
101                          blob_data,
102                          'wrong content from Keep.get(md5(<binarydata>))')
103
104     def test_KeepEmptyCollectionTest(self):
105         blob_locator = self.keep_client.put('', copies=1)
106         self.assertRegex(
107             blob_locator,
108             '^d41d8cd98f00b204e9800998ecf8427e\+0',
109             ('wrong locator from Keep.put(""): ' + blob_locator))
110
111     def test_unicode_must_be_ascii(self):
112         # If unicode type, must only consist of valid ASCII
113         foo_locator = self.keep_client.put(u'foo')
114         self.assertRegex(
115             foo_locator,
116             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
117             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
118
119         if sys.version_info < (3, 0):
120             with self.assertRaises(UnicodeEncodeError):
121                 # Error if it is not ASCII
122                 self.keep_client.put(u'\xe2')
123
124         with self.assertRaises(AttributeError):
125             # Must be bytes or have an encode() method
126             self.keep_client.put({})
127
128     def test_KeepHeadTest(self):
129         locator = self.keep_client.put('test_head')
130         self.assertRegex(
131             locator,
132             '^b9a772c7049325feb7130fff1f8333e9\+9',
133             'wrong md5 hash from Keep.put for "test_head": ' + locator)
134         self.assertEqual(True, self.keep_client.head(locator))
135         self.assertEqual(self.keep_client.get(locator),
136                          b'test_head',
137                          'wrong content from Keep.get for "test_head"')
138
139 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
140 class KeepPermissionTestCase(run_test_server.TestCaseWithServers):
141     disk_cache = False
142     MAIN_SERVER = {}
143     KEEP_SERVER = {'blob_signing': True}
144
145     def test_KeepBasicRWTest(self):
146         run_test_server.authorize_with('active')
147         keep_client = arvados.KeepClient(block_cache=make_block_cache(self.disk_cache))
148         foo_locator = keep_client.put('foo')
149         self.assertRegex(
150             foo_locator,
151             r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
152             'invalid locator from Keep.put("foo"): ' + foo_locator)
153         self.assertEqual(keep_client.get(foo_locator),
154                          b'foo',
155                          'wrong content from Keep.get(md5("foo"))')
156
157         # GET with an unsigned locator => NotFound
158         bar_locator = keep_client.put('bar')
159         unsigned_bar_locator = "37b51d194a7513e45b56f6524f2d51f2+3"
160         self.assertRegex(
161             bar_locator,
162             r'^37b51d194a7513e45b56f6524f2d51f2\+3\+A[a-f0-9]+@[a-f0-9]+$',
163             'invalid locator from Keep.put("bar"): ' + bar_locator)
164         self.assertRaises(arvados.errors.NotFoundError,
165                           keep_client.get,
166                           unsigned_bar_locator)
167
168         # GET from a different user => NotFound
169         run_test_server.authorize_with('spectator')
170         self.assertRaises(arvados.errors.NotFoundError,
171                           arvados.Keep.get,
172                           bar_locator)
173
174         # Unauthenticated GET for a signed locator => NotFound
175         # Unauthenticated GET for an unsigned locator => NotFound
176         keep_client.api_token = ''
177         self.assertRaises(arvados.errors.NotFoundError,
178                           keep_client.get,
179                           bar_locator)
180         self.assertRaises(arvados.errors.NotFoundError,
181                           keep_client.get,
182                           unsigned_bar_locator)
183
184 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
185 class KeepProxyTestCase(run_test_server.TestCaseWithServers):
186     disk_cache = False
187     MAIN_SERVER = {}
188     KEEP_SERVER = {}
189     KEEP_PROXY_SERVER = {}
190
191     @classmethod
192     def setUpClass(cls):
193         super(KeepProxyTestCase, cls).setUpClass()
194         run_test_server.authorize_with('active')
195         cls.api_client = arvados.api('v1')
196
197     def tearDown(self):
198         super(KeepProxyTestCase, self).tearDown()
199
200     def test_KeepProxyTest1(self):
201         # Will use ARVADOS_KEEP_SERVICES environment variable that
202         # is set by setUpClass().
203         keep_client = arvados.KeepClient(api_client=self.api_client,
204                                          local_store='', block_cache=make_block_cache(self.disk_cache))
205         baz_locator = keep_client.put('baz')
206         self.assertRegex(
207             baz_locator,
208             '^73feffa4b7f6bb68e44cf984c85f6e88\+3',
209             'wrong md5 hash from Keep.put("baz"): ' + baz_locator)
210         self.assertEqual(keep_client.get(baz_locator),
211                          b'baz',
212                          'wrong content from Keep.get(md5("baz"))')
213         self.assertTrue(keep_client.using_proxy)
214
215     def test_KeepProxyTestMultipleURIs(self):
216         # Test using ARVADOS_KEEP_SERVICES env var overriding any
217         # existing proxy setting and setting multiple proxies
218         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
219         keep_client = arvados.KeepClient(api_client=self.api_client,
220                                          local_store='',
221                                          block_cache=make_block_cache(self.disk_cache))
222         uris = [x['_service_root'] for x in keep_client._keep_services]
223         self.assertEqual(uris, ['http://10.0.0.1/',
224                                 'https://foo.example.org:1234/'])
225
226     def test_KeepProxyTestInvalidURI(self):
227         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org'
228         with self.assertRaises(arvados.errors.ArgumentError):
229             keep_client = arvados.KeepClient(api_client=self.api_client,
230                                              local_store='',
231                                              block_cache=make_block_cache(self.disk_cache))
232
233 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
234 class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
235     disk_cache = False
236
237     def get_service_roots(self, api_client):
238         keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
239         services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
240         return [urllib.parse.urlparse(url) for url in sorted(services)]
241
242     def test_ssl_flag_respected_in_roots(self):
243         for ssl_flag in [False, True]:
244             services = self.get_service_roots(self.mock_keep_services(
245                 service_ssl_flag=ssl_flag))
246             self.assertEqual(
247                 ('https' if ssl_flag else 'http'), services[0].scheme)
248
249     def test_correct_ports_with_ipv6_addresses(self):
250         service = self.get_service_roots(self.mock_keep_services(
251             service_type='proxy', service_host='100::1', service_port=10, count=1))[0]
252         self.assertEqual('100::1', service.hostname)
253         self.assertEqual(10, service.port)
254
255     def test_recognize_proxy_services_in_controller_response(self):
256         keep_client = arvados.KeepClient(api_client=self.mock_keep_services(
257             service_type='proxy', service_host='localhost', service_port=9, count=1),
258                                          block_cache=make_block_cache(self.disk_cache))
259         try:
260             # this will fail, but it ensures we get the service
261             # discovery response
262             keep_client.put('baz2')
263         except:
264             pass
265         self.assertTrue(keep_client.using_proxy)
266
267     def test_insecure_disables_tls_verify(self):
268         api_client = self.mock_keep_services(count=1)
269         force_timeout = socket.timeout("timed out")
270
271         api_client.insecure = True
272         with tutil.mock_keep_responses(b'foo', 200) as mock:
273             keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
274             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
275             self.assertEqual(
276                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
277                 0)
278             self.assertEqual(
279                 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
280                 0)
281
282         api_client.insecure = False
283         with tutil.mock_keep_responses(b'foo', 200) as mock:
284             keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
285             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
286             # getopt()==None here means we didn't change the
287             # default. If we were using real pycurl instead of a mock,
288             # it would return the default value 1.
289             self.assertEqual(
290                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
291                 None)
292             self.assertEqual(
293                 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
294                 None)
295
296     def test_refresh_signature(self):
297         blk_digest = '6f5902ac237024bdd0c176cb93063dc4+11'
298         blk_sig = 'da39a3ee5e6b4b0d3255bfef95601890afd80709@53bed294'
299         local_loc = blk_digest+'+A'+blk_sig
300         remote_loc = blk_digest+'+R'+blk_sig
301         api_client = self.mock_keep_services(count=1)
302         headers = {'X-Keep-Locator':local_loc}
303         with tutil.mock_keep_responses('', 200, **headers):
304             # Check that the translated locator gets returned
305             keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
306             self.assertEqual(local_loc, keep_client.refresh_signature(remote_loc))
307             # Check that refresh_signature() uses the correct method and headers
308             keep_client._get_or_head = mock.MagicMock()
309             keep_client.refresh_signature(remote_loc)
310             args, kwargs = keep_client._get_or_head.call_args_list[0]
311             self.assertIn(remote_loc, args)
312             self.assertEqual("HEAD", kwargs['method'])
313             self.assertIn('X-Keep-Signature', kwargs['headers'])
314
315     # test_*_timeout verify that KeepClient instructs pycurl to use
316     # the appropriate connection and read timeouts. They don't care
317     # whether pycurl actually exhibits the expected timeout behavior
318     # -- those tests are in the KeepClientTimeout test class.
319
320     def test_get_timeout(self):
321         api_client = self.mock_keep_services(count=1)
322         force_timeout = socket.timeout("timed out")
323         with tutil.mock_keep_responses(force_timeout, 0) as mock:
324             keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
325             with self.assertRaises(arvados.errors.KeepReadError):
326                 keep_client.get('ffffffffffffffffffffffffffffffff')
327             self.assertEqual(
328                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
329                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
330             self.assertEqual(
331                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
332                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
333             self.assertEqual(
334                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
335                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
336
337     def test_put_timeout(self):
338         api_client = self.mock_keep_services(count=1)
339         force_timeout = socket.timeout("timed out")
340         with tutil.mock_keep_responses(force_timeout, 0) as mock:
341             keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
342             with self.assertRaises(arvados.errors.KeepWriteError):
343                 keep_client.put(b'foo')
344             self.assertEqual(
345                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
346                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
347             self.assertEqual(
348                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
349                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
350             self.assertEqual(
351                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
352                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
353
354     def test_head_timeout(self):
355         api_client = self.mock_keep_services(count=1)
356         force_timeout = socket.timeout("timed out")
357         with tutil.mock_keep_responses(force_timeout, 0) as mock:
358             keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
359             with self.assertRaises(arvados.errors.KeepReadError):
360                 keep_client.head('ffffffffffffffffffffffffffffffff')
361             self.assertEqual(
362                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
363                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
364             self.assertEqual(
365                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
366                 None)
367             self.assertEqual(
368                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
369                 None)
370
371     def test_proxy_get_timeout(self):
372         api_client = self.mock_keep_services(service_type='proxy', count=1)
373         force_timeout = socket.timeout("timed out")
374         with tutil.mock_keep_responses(force_timeout, 0) as mock:
375             keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
376             with self.assertRaises(arvados.errors.KeepReadError):
377                 keep_client.get('ffffffffffffffffffffffffffffffff')
378             self.assertEqual(
379                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
380                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
381             self.assertEqual(
382                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
383                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
384             self.assertEqual(
385                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
386                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
387
388     def test_proxy_head_timeout(self):
389         api_client = self.mock_keep_services(service_type='proxy', count=1)
390         force_timeout = socket.timeout("timed out")
391         with tutil.mock_keep_responses(force_timeout, 0) as mock:
392             keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
393             with self.assertRaises(arvados.errors.KeepReadError):
394                 keep_client.head('ffffffffffffffffffffffffffffffff')
395             self.assertEqual(
396                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
397                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
398             self.assertEqual(
399                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
400                 None)
401             self.assertEqual(
402                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
403                 None)
404
405     def test_proxy_put_timeout(self):
406         api_client = self.mock_keep_services(service_type='proxy', count=1)
407         force_timeout = socket.timeout("timed out")
408         with tutil.mock_keep_responses(force_timeout, 0) as mock:
409             keep_client = arvados.KeepClient(api_client=api_client)
410             with self.assertRaises(arvados.errors.KeepWriteError):
411                 keep_client.put('foo')
412             self.assertEqual(
413                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
414                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
415             self.assertEqual(
416                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
417                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
418             self.assertEqual(
419                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
420                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
421
422     def check_no_services_error(self, verb, exc_class):
423         api_client = mock.MagicMock(name='api_client')
424         api_client.keep_services().accessible().execute.side_effect = (
425             arvados.errors.ApiError)
426         keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
427         with self.assertRaises(exc_class) as err_check:
428             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
429         self.assertEqual(0, len(err_check.exception.request_errors()))
430
431     def test_get_error_with_no_services(self):
432         self.check_no_services_error('get', arvados.errors.KeepReadError)
433
434     def test_head_error_with_no_services(self):
435         self.check_no_services_error('head', arvados.errors.KeepReadError)
436
437     def test_put_error_with_no_services(self):
438         self.check_no_services_error('put', arvados.errors.KeepWriteError)
439
440     def check_errors_from_last_retry(self, verb, exc_class):
441         api_client = self.mock_keep_services(count=2)
442         req_mock = tutil.mock_keep_responses(
443             "retry error reporting test", 500, 500, 500, 500, 500, 500, 502, 502)
444         with req_mock, tutil.skip_sleep, \
445                 self.assertRaises(exc_class) as err_check:
446             keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
447             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
448                                        num_retries=3)
449         self.assertEqual([502, 502], [
450                 getattr(error, 'status_code', None)
451                 for error in err_check.exception.request_errors().values()])
452         self.assertRegex(str(err_check.exception), r'failed to (read|write) .* after 4 attempts')
453
454     def test_get_error_reflects_last_retry(self):
455         self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
456
457     def test_head_error_reflects_last_retry(self):
458         self.check_errors_from_last_retry('head', arvados.errors.KeepReadError)
459
460     def test_put_error_reflects_last_retry(self):
461         self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
462
463     def test_put_error_does_not_include_successful_puts(self):
464         data = 'partial failure test'
465         data_loc = tutil.str_keep_locator(data)
466         api_client = self.mock_keep_services(count=3)
467         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
468                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
469             keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
470             keep_client.put(data)
471         self.assertEqual(2, len(exc_check.exception.request_errors()))
472
473     def test_proxy_put_with_no_writable_services(self):
474         data = 'test with no writable services'
475         data_loc = tutil.str_keep_locator(data)
476         api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
477         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
478                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
479           keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
480           keep_client.put(data)
481         self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
482         self.assertEqual(0, len(exc_check.exception.request_errors()))
483
484     def test_oddball_service_get(self):
485         body = b'oddball service get'
486         api_client = self.mock_keep_services(service_type='fancynewblobstore')
487         with tutil.mock_keep_responses(body, 200):
488             keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
489             actual = keep_client.get(tutil.str_keep_locator(body))
490         self.assertEqual(body, actual)
491
492     def test_oddball_service_put(self):
493         body = b'oddball service put'
494         pdh = tutil.str_keep_locator(body)
495         api_client = self.mock_keep_services(service_type='fancynewblobstore')
496         with tutil.mock_keep_responses(pdh, 200):
497             keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
498             actual = keep_client.put(body, copies=1)
499         self.assertEqual(pdh, actual)
500
501     def test_oddball_service_writer_count(self):
502         body = b'oddball service writer count'
503         pdh = tutil.str_keep_locator(body)
504         api_client = self.mock_keep_services(service_type='fancynewblobstore',
505                                              count=4)
506         headers = {'x-keep-replicas-stored': 3}
507         with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
508                                        **headers) as req_mock:
509             keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
510             actual = keep_client.put(body, copies=2)
511         self.assertEqual(pdh, actual)
512         self.assertEqual(1, req_mock.call_count)
513
514 @tutil.skip_sleep
515 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
516 class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
517     disk_cache = False
518
519     def setUp(self):
520         self.api_client = self.mock_keep_services(count=2)
521         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=make_block_cache(self.disk_cache))
522         self.data = b'xyzzy'
523         self.locator = '1271ed5ef305aadabc605b1609e24c52'
524
525     @mock.patch('arvados.KeepClient.KeepService.get')
526     def test_get_request_cache(self, get_mock):
527         with tutil.mock_keep_responses(self.data, 200, 200):
528             self.keep_client.get(self.locator)
529             self.keep_client.get(self.locator)
530         # Request already cached, don't require more than one request
531         get_mock.assert_called_once()
532
533     @mock.patch('arvados.KeepClient.KeepService.get')
534     def test_head_request_cache(self, get_mock):
535         with tutil.mock_keep_responses(self.data, 200, 200):
536             self.keep_client.head(self.locator)
537             self.keep_client.head(self.locator)
538         # Don't cache HEAD requests so that they're not confused with GET reqs
539         self.assertEqual(2, get_mock.call_count)
540
541     @mock.patch('arvados.KeepClient.KeepService.get')
542     def test_head_and_then_get_return_different_responses(self, get_mock):
543         head_resp = None
544         get_resp = None
545         get_mock.side_effect = [b'first response', b'second response']
546         with tutil.mock_keep_responses(self.data, 200, 200):
547             head_resp = self.keep_client.head(self.locator)
548             get_resp = self.keep_client.get(self.locator)
549         self.assertEqual(b'first response', head_resp)
550         # First reponse was not cached because it was from a HEAD request.
551         self.assertNotEqual(head_resp, get_resp)
552
553 @tutil.skip_sleep
554 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
555 class KeepStorageClassesTestCase(unittest.TestCase, tutil.ApiClientMock):
556     disk_cache = False
557
558     def setUp(self):
559         self.api_client = self.mock_keep_services(count=2)
560         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=make_block_cache(self.disk_cache))
561         self.data = b'xyzzy'
562         self.locator = '1271ed5ef305aadabc605b1609e24c52'
563
564     def test_multiple_default_storage_classes_req_header(self):
565         api_mock = self.api_client_mock()
566         api_mock.config.return_value = {
567             'StorageClasses': {
568                 'foo': { 'Default': True },
569                 'bar': { 'Default': True },
570                 'baz': { 'Default': False }
571             }
572         }
573         api_client = self.mock_keep_services(api_mock=api_mock, count=2)
574         keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
575         resp_hdr = {
576             'x-keep-storage-classes-confirmed': 'foo=1, bar=1',
577             'x-keep-replicas-stored': 1
578         }
579         with tutil.mock_keep_responses(self.locator, 200, **resp_hdr) as mock:
580             keep_client.put(self.data, copies=1)
581             req_hdr = mock.responses[0]
582             self.assertIn(
583                 'X-Keep-Storage-Classes: bar, foo', req_hdr.getopt(pycurl.HTTPHEADER))
584
585     def test_storage_classes_req_header(self):
586         self.assertEqual(
587             self.api_client.config()['StorageClasses'],
588             {'default': {'Default': True}})
589         cases = [
590             # requested, expected
591             [['foo'], 'X-Keep-Storage-Classes: foo'],
592             [['bar', 'foo'], 'X-Keep-Storage-Classes: bar, foo'],
593             [[], 'X-Keep-Storage-Classes: default'],
594             [None, 'X-Keep-Storage-Classes: default'],
595         ]
596         for req_classes, expected_header in cases:
597             headers = {'x-keep-replicas-stored': 1}
598             if req_classes is None or len(req_classes) == 0:
599                 confirmed_hdr = 'default=1'
600             elif len(req_classes) > 0:
601                 confirmed_hdr = ', '.join(["{}=1".format(cls) for cls in req_classes])
602             headers.update({'x-keep-storage-classes-confirmed': confirmed_hdr})
603             with tutil.mock_keep_responses(self.locator, 200, **headers) as mock:
604                 self.keep_client.put(self.data, copies=1, classes=req_classes)
605                 req_hdr = mock.responses[0]
606                 self.assertIn(expected_header, req_hdr.getopt(pycurl.HTTPHEADER))
607
608     def test_partial_storage_classes_put(self):
609         headers = {
610             'x-keep-replicas-stored': 1,
611             'x-keep-storage-classes-confirmed': 'foo=1'}
612         with tutil.mock_keep_responses(self.locator, 200, 503, **headers) as mock:
613             with self.assertRaises(arvados.errors.KeepWriteError):
614                 self.keep_client.put(self.data, copies=1, classes=['foo', 'bar'])
615             # 1st request, both classes pending
616             req1_headers = mock.responses[0].getopt(pycurl.HTTPHEADER)
617             self.assertIn('X-Keep-Storage-Classes: bar, foo', req1_headers)
618             # 2nd try, 'foo' class already satisfied
619             req2_headers = mock.responses[1].getopt(pycurl.HTTPHEADER)
620             self.assertIn('X-Keep-Storage-Classes: bar', req2_headers)
621
622     def test_successful_storage_classes_put_requests(self):
623         cases = [
624             # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, expected_requests
625             [ 1, ['foo'], 1, 'foo=1', 1],
626             [ 1, ['foo'], 2, 'foo=2', 1],
627             [ 2, ['foo'], 2, 'foo=2', 1],
628             [ 2, ['foo'], 1, 'foo=1', 2],
629             [ 1, ['foo', 'bar'], 1, 'foo=1, bar=1', 1],
630             [ 1, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
631             [ 2, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
632             [ 2, ['foo', 'bar'], 1, 'foo=1, bar=1', 2],
633             [ 1, ['foo', 'bar'], 1, None, 1],
634             [ 1, ['foo'], 1, None, 1],
635             [ 2, ['foo'], 2, None, 1],
636             [ 2, ['foo'], 1, None, 2],
637         ]
638         for w_copies, w_classes, c_copies, c_classes, e_reqs in cases:
639             headers = {'x-keep-replicas-stored': c_copies}
640             if c_classes is not None:
641                 headers.update({'x-keep-storage-classes-confirmed': c_classes})
642             with tutil.mock_keep_responses(self.locator, 200, 200, **headers) as mock:
643                 case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}", expected_requests={}'.format(w_copies, ', '.join(w_classes), c_copies, c_classes, e_reqs)
644                 self.assertEqual(self.locator,
645                     self.keep_client.put(self.data, copies=w_copies, classes=w_classes),
646                     case_desc)
647                 self.assertEqual(e_reqs, mock.call_count, case_desc)
648
649     def test_failed_storage_classes_put_requests(self):
650         cases = [
651             # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, return_code
652             [ 1, ['foo'], 1, 'bar=1', 200],
653             [ 1, ['foo'], 1, None, 503],
654             [ 2, ['foo'], 1, 'bar=1, foo=0', 200],
655             [ 3, ['foo'], 1, 'bar=1, foo=1', 200],
656             [ 3, ['foo', 'bar'], 1, 'bar=2, foo=1', 200],
657         ]
658         for w_copies, w_classes, c_copies, c_classes, return_code in cases:
659             headers = {'x-keep-replicas-stored': c_copies}
660             if c_classes is not None:
661                 headers.update({'x-keep-storage-classes-confirmed': c_classes})
662             with tutil.mock_keep_responses(self.locator, return_code, return_code, **headers):
663                 case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}"'.format(w_copies, ', '.join(w_classes), c_copies, c_classes)
664                 with self.assertRaises(arvados.errors.KeepWriteError, msg=case_desc):
665                     self.keep_client.put(self.data, copies=w_copies, classes=w_classes)
666
667 @tutil.skip_sleep
668 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
669 class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock):
670     disk_cache = False
671
672     def setUp(self):
673         self.api_client = self.mock_keep_services(count=2)
674         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=make_block_cache(self.disk_cache))
675         self.data = b'xyzzy'
676         self.locator = '1271ed5ef305aadabc605b1609e24c52'
677         self.test_id = arvados.util.new_request_id()
678         self.assertRegex(self.test_id, r'^req-[a-z0-9]{20}$')
679         # If we don't set request_id to None explicitly here, it will
680         # return <MagicMock name='api_client_mock.request_id'
681         # id='123456789'>:
682         self.api_client.request_id = None
683
684     def test_default_to_api_client_request_id(self):
685         self.api_client.request_id = self.test_id
686         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
687             self.keep_client.put(self.data)
688         self.assertEqual(2, len(mock.responses))
689         for resp in mock.responses:
690             self.assertProvidedRequestId(resp)
691
692         with tutil.mock_keep_responses(self.data, 200) as mock:
693             self.keep_client.get(self.locator)
694         self.assertProvidedRequestId(mock.responses[0])
695
696         with tutil.mock_keep_responses(b'', 200) as mock:
697             self.keep_client.head(self.locator)
698         self.assertProvidedRequestId(mock.responses[0])
699
700     def test_explicit_request_id(self):
701         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
702             self.keep_client.put(self.data, request_id=self.test_id)
703         self.assertEqual(2, len(mock.responses))
704         for resp in mock.responses:
705             self.assertProvidedRequestId(resp)
706
707         with tutil.mock_keep_responses(self.data, 200) as mock:
708             self.keep_client.get(self.locator, request_id=self.test_id)
709         self.assertProvidedRequestId(mock.responses[0])
710
711         with tutil.mock_keep_responses(b'', 200) as mock:
712             self.keep_client.head(self.locator, request_id=self.test_id)
713         self.assertProvidedRequestId(mock.responses[0])
714
715     def test_automatic_request_id(self):
716         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
717             self.keep_client.put(self.data)
718         self.assertEqual(2, len(mock.responses))
719         for resp in mock.responses:
720             self.assertAutomaticRequestId(resp)
721
722         with tutil.mock_keep_responses(self.data, 200) as mock:
723             self.keep_client.get(self.locator)
724         self.assertAutomaticRequestId(mock.responses[0])
725
726         with tutil.mock_keep_responses(b'', 200) as mock:
727             self.keep_client.head(self.locator)
728         self.assertAutomaticRequestId(mock.responses[0])
729
730     def test_request_id_in_exception(self):
731         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
732             with self.assertRaisesRegex(arvados.errors.KeepReadError, self.test_id):
733                 self.keep_client.head(self.locator, request_id=self.test_id)
734
735         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
736             with self.assertRaisesRegex(arvados.errors.KeepReadError, r'req-[a-z0-9]{20}'):
737                 self.keep_client.get(self.locator)
738
739         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
740             with self.assertRaisesRegex(arvados.errors.KeepWriteError, self.test_id):
741                 self.keep_client.put(self.data, request_id=self.test_id)
742
743         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
744             with self.assertRaisesRegex(arvados.errors.KeepWriteError, r'req-[a-z0-9]{20}'):
745                 self.keep_client.put(self.data)
746
747     def assertAutomaticRequestId(self, resp):
748         hdr = [x for x in resp.getopt(pycurl.HTTPHEADER)
749                if x.startswith('X-Request-Id: ')][0]
750         self.assertNotEqual(hdr, 'X-Request-Id: '+self.test_id)
751         self.assertRegex(hdr, r'^X-Request-Id: req-[a-z0-9]{20}$')
752
753     def assertProvidedRequestId(self, resp):
754         self.assertIn('X-Request-Id: '+self.test_id,
755                       resp.getopt(pycurl.HTTPHEADER))
756
757
758 @tutil.skip_sleep
759 #@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
760 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
761     disk_cache = False
762
763     def setUp(self):
764         # expected_order[i] is the probe order for
765         # hash=md5(sprintf("%064x",i)) where there are 16 services
766         # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
767         # the first probe for the block consisting of 64 "0"
768         # characters is the service whose uuid is
769         # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
770         self.services = 16
771         self.expected_order = [
772             list('3eab2d5fc9681074'),
773             list('097dba52e648f1c3'),
774             list('c5b4e023f8a7d691'),
775             list('9d81c02e76a3bf54'),
776             ]
777         self.blocks = [
778             "{:064x}".format(x).encode()
779             for x in range(len(self.expected_order))]
780         self.hashes = [
781             hashlib.md5(self.blocks[x]).hexdigest()
782             for x in range(len(self.expected_order))]
783         self.api_client = self.mock_keep_services(count=self.services)
784         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=make_block_cache(self.disk_cache))
785
786     def test_weighted_service_roots_against_reference_set(self):
787         # Confirm weighted_service_roots() returns the correct order
788         for i, hash in enumerate(self.hashes):
789             roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash))
790             got_order = [
791                 re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
792                 for root in roots]
793             self.assertEqual(self.expected_order[i], got_order)
794
795     def test_get_probe_order_against_reference_set(self):
796         self._test_probe_order_against_reference_set(
797             lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
798
799     def test_head_probe_order_against_reference_set(self):
800         self._test_probe_order_against_reference_set(
801             lambda i: self.keep_client.head(self.hashes[i], num_retries=1))
802
803     def test_put_probe_order_against_reference_set(self):
804         # copies=1 prevents the test from being sensitive to races
805         # between writer threads.
806         self._test_probe_order_against_reference_set(
807             lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1))
808
809     def _test_probe_order_against_reference_set(self, op):
810         for i in range(len(self.blocks)):
811             with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \
812                  self.assertRaises(arvados.errors.KeepRequestError):
813                 op(i)
814             got_order = [
815                 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
816                 for resp in mock.responses]
817             self.assertEqual(self.expected_order[i]*2, got_order)
818
819     def test_put_probe_order_multiple_copies(self):
820         for copies in range(2, 4):
821             for i in range(len(self.blocks)):
822                 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \
823                      self.assertRaises(arvados.errors.KeepWriteError):
824                     self.keep_client.put(self.blocks[i], num_retries=2, copies=copies)
825                 got_order = [
826                     re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
827                     for resp in mock.responses]
828                 # With T threads racing to make requests, the position
829                 # of a given server in the sequence of HTTP requests
830                 # (got_order) cannot be more than T-1 positions
831                 # earlier than that server's position in the reference
832                 # probe sequence (expected_order).
833                 #
834                 # Loop invariant: we have accounted for +pos+ expected
835                 # probes, either by seeing them in +got_order+ or by
836                 # putting them in +pending+ in the hope of seeing them
837                 # later. As long as +len(pending)<T+, we haven't
838                 # started a request too early.
839                 pending = []
840                 for pos, expected in enumerate(self.expected_order[i]*3):
841                     got = got_order[pos-len(pending)]
842                     while got in pending:
843                         del pending[pending.index(got)]
844                         got = got_order[pos-len(pending)]
845                     if got != expected:
846                         pending.append(expected)
847                         self.assertLess(
848                             len(pending), copies,
849                             "pending={}, with copies={}, got {}, expected {}".format(
850                                 pending, copies, repr(got_order), repr(self.expected_order[i]*3)))
851
852     def test_probe_waste_adding_one_server(self):
853         hashes = [
854             hashlib.md5("{:064x}".format(x).encode()).hexdigest() for x in range(100)]
855         initial_services = 12
856         self.api_client = self.mock_keep_services(count=initial_services)
857         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=make_block_cache(self.disk_cache))
858         probes_before = [
859             self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
860         for added_services in range(1, 12):
861             api_client = self.mock_keep_services(count=initial_services+added_services)
862             keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
863             total_penalty = 0
864             for hash_index in range(len(hashes)):
865                 probe_after = keep_client.weighted_service_roots(
866                     arvados.KeepLocator(hashes[hash_index]))
867                 penalty = probe_after.index(probes_before[hash_index][0])
868                 self.assertLessEqual(penalty, added_services)
869                 total_penalty += penalty
870             # Average penalty per block should not exceed
871             # N(added)/N(orig) by more than 20%, and should get closer
872             # to the ideal as we add data points.
873             expect_penalty = (
874                 added_services *
875                 len(hashes) / initial_services)
876             max_penalty = (
877                 expect_penalty *
878                 (120 - added_services)/100)
879             min_penalty = (
880                 expect_penalty * 8/10)
881             self.assertTrue(
882                 min_penalty <= total_penalty <= max_penalty,
883                 "With {}+{} services, {} blocks, penalty {} but expected {}..{}".format(
884                     initial_services,
885                     added_services,
886                     len(hashes),
887                     total_penalty,
888                     min_penalty,
889                     max_penalty))
890
891     def check_64_zeros_error_order(self, verb, exc_class):
892         data = b'0' * 64
893         if verb == 'get':
894             data = tutil.str_keep_locator(data)
895         # Arbitrary port number:
896         aport = random.randint(1024,65535)
897         api_client = self.mock_keep_services(service_port=aport, count=self.services)
898         keep_client = arvados.KeepClient(api_client=api_client, block_cache=make_block_cache(self.disk_cache))
899         with mock.patch('pycurl.Curl') as curl_mock, \
900              self.assertRaises(exc_class) as err_check:
901             curl_mock.return_value = tutil.FakeCurl.make(code=500, body=b'')
902             getattr(keep_client, verb)(data)
903         urls = [urllib.parse.urlparse(url)
904                 for url in err_check.exception.request_errors()]
905         self.assertEqual([('keep0x' + c, aport) for c in '3eab2d5fc9681074'],
906                          [(url.hostname, url.port) for url in urls])
907
908     def test_get_error_shows_probe_order(self):
909         self.check_64_zeros_error_order('get', arvados.errors.KeepReadError)
910
911     def test_put_error_shows_probe_order(self):
912         self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
913
914 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
915 class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase):
916     disk_cache = False
917
918     # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
919     # 1s worth of data and then trigger bandwidth errors before running
920     # out of data.
921     DATA = b'x'*2**11
922     BANDWIDTH_LOW_LIM = 1024
923     TIMEOUT_TIME = 1.0
924
925     class assertTakesBetween(unittest.TestCase):
926         def __init__(self, tmin, tmax):
927             self.tmin = tmin
928             self.tmax = tmax
929
930         def __enter__(self):
931             self.t0 = time.time()
932
933         def __exit__(self, *args, **kwargs):
934             # Round times to milliseconds, like CURL. Otherwise, we
935             # fail when CURL reaches a 1s timeout at 0.9998s.
936             delta = round(time.time() - self.t0, 3)
937             self.assertGreaterEqual(delta, self.tmin)
938             self.assertLessEqual(delta, self.tmax)
939
940     class assertTakesGreater(unittest.TestCase):
941         def __init__(self, tmin):
942             self.tmin = tmin
943
944         def __enter__(self):
945             self.t0 = time.time()
946
947         def __exit__(self, *args, **kwargs):
948             delta = round(time.time() - self.t0, 3)
949             self.assertGreaterEqual(delta, self.tmin)
950
951     def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
952         return arvados.KeepClient(
953             api_client=self.api_client,
954             timeout=timeouts, block_cache=make_block_cache(self.disk_cache))
955
956     def test_timeout_slow_connect(self):
957         # Can't simulate TCP delays with our own socket. Leave our
958         # stub server running uselessly, and try to connect to an
959         # unroutable IP address instead.
960         self.api_client = self.mock_keep_services(
961             count=1,
962             service_host='240.0.0.0',
963         )
964         with self.assertTakesBetween(0.1, 0.5):
965             with self.assertRaises(arvados.errors.KeepWriteError):
966                 self.keepClient().put(self.DATA, copies=1, num_retries=0)
967
968     def test_low_bandwidth_no_delays_success(self):
969         self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
970         kc = self.keepClient()
971         loc = kc.put(self.DATA, copies=1, num_retries=0)
972         self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
973
974     def test_too_low_bandwidth_no_delays_failure(self):
975         # Check that lessening bandwidth corresponds to failing
976         kc = self.keepClient()
977         loc = kc.put(self.DATA, copies=1, num_retries=0)
978         self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
979         with self.assertTakesGreater(self.TIMEOUT_TIME):
980             with self.assertRaises(arvados.errors.KeepReadError):
981                 kc.get(loc, num_retries=0)
982         with self.assertTakesGreater(self.TIMEOUT_TIME):
983             with self.assertRaises(arvados.errors.KeepWriteError):
984                 kc.put(self.DATA, copies=1, num_retries=0)
985
986     def test_low_bandwidth_with_server_response_delay_failure(self):
987         kc = self.keepClient()
988         loc = kc.put(self.DATA, copies=1, num_retries=0)
989         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
990         # Note the actual delay must be 1s longer than the low speed
991         # limit interval in order for curl to detect it reliably.
992         self.server.setdelays(response=self.TIMEOUT_TIME+1)
993         with self.assertTakesGreater(self.TIMEOUT_TIME):
994             with self.assertRaises(arvados.errors.KeepReadError):
995                 kc.get(loc, num_retries=0)
996         with self.assertTakesGreater(self.TIMEOUT_TIME):
997             with self.assertRaises(arvados.errors.KeepWriteError):
998                 kc.put(self.DATA, copies=1, num_retries=0)
999         with self.assertTakesGreater(self.TIMEOUT_TIME):
1000             kc.head(loc, num_retries=0)
1001
1002     def test_low_bandwidth_with_server_mid_delay_failure(self):
1003         kc = self.keepClient()
1004         loc = kc.put(self.DATA, copies=1, num_retries=0)
1005         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
1006         # Note the actual delay must be 1s longer than the low speed
1007         # limit interval in order for curl to detect it reliably.
1008         self.server.setdelays(mid_write=self.TIMEOUT_TIME+1, mid_read=self.TIMEOUT_TIME+1)
1009         with self.assertTakesGreater(self.TIMEOUT_TIME):
1010             with self.assertRaises(arvados.errors.KeepReadError) as e:
1011                 kc.get(loc, num_retries=0)
1012         with self.assertTakesGreater(self.TIMEOUT_TIME):
1013             with self.assertRaises(arvados.errors.KeepWriteError):
1014                 kc.put(self.DATA, copies=1, num_retries=0)
1015
1016     def test_timeout_slow_request(self):
1017         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
1018         self.server.setdelays(request=.2)
1019         self._test_connect_timeout_under_200ms(loc)
1020         self.server.setdelays(request=2)
1021         self._test_response_timeout_under_2s(loc)
1022
1023     def test_timeout_slow_response(self):
1024         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
1025         self.server.setdelays(response=.2)
1026         self._test_connect_timeout_under_200ms(loc)
1027         self.server.setdelays(response=2)
1028         self._test_response_timeout_under_2s(loc)
1029
1030     def test_timeout_slow_response_body(self):
1031         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
1032         self.server.setdelays(response_body=.2)
1033         self._test_connect_timeout_under_200ms(loc)
1034         self.server.setdelays(response_body=2)
1035         self._test_response_timeout_under_2s(loc)
1036
1037     def _test_connect_timeout_under_200ms(self, loc):
1038         # Allow 100ms to connect, then 1s for response. Everything
1039         # should work, and everything should take at least 200ms to
1040         # return.
1041         kc = self.keepClient(timeouts=(.1, 1))
1042         with self.assertTakesBetween(.2, .3):
1043             kc.put(self.DATA, copies=1, num_retries=0)
1044         with self.assertTakesBetween(.2, .3):
1045             self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
1046
1047     def _test_response_timeout_under_2s(self, loc):
1048         # Allow 10s to connect, then 1s for response. Nothing should
1049         # work, and everything should take at least 1s to return.
1050         kc = self.keepClient(timeouts=(10, 1))
1051         with self.assertTakesBetween(1, 9):
1052             with self.assertRaises(arvados.errors.KeepReadError):
1053                 kc.get(loc, num_retries=0)
1054         with self.assertTakesBetween(1, 9):
1055             with self.assertRaises(arvados.errors.KeepWriteError):
1056                 kc.put(self.DATA, copies=1, num_retries=0)
1057
1058 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1059 class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
1060     disk_cache = False
1061
1062     def mock_disks_and_gateways(self, disks=3, gateways=1):
1063         self.gateways = [{
1064                 'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
1065                 'owner_uuid': 'zzzzz-tpzed-000000000000000',
1066                 'service_host': 'gatewayhost{}'.format(i),
1067                 'service_port': 12345,
1068                 'service_ssl_flag': True,
1069                 'service_type': 'gateway:test',
1070         } for i in range(gateways)]
1071         self.gateway_roots = [
1072             "https://{service_host}:{service_port}/".format(**gw)
1073             for gw in self.gateways]
1074         self.api_client = self.mock_keep_services(
1075             count=disks, additional_services=self.gateways)
1076         self.keepClient = arvados.KeepClient(api_client=self.api_client, block_cache=make_block_cache(self.disk_cache))
1077
1078     @mock.patch('pycurl.Curl')
1079     def test_get_with_gateway_hint_first(self, MockCurl):
1080         MockCurl.return_value = tutil.FakeCurl.make(
1081             code=200, body='foo', headers={'Content-Length': 3})
1082         self.mock_disks_and_gateways()
1083         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@' + self.gateways[0]['uuid']
1084         self.assertEqual(b'foo', self.keepClient.get(locator))
1085         self.assertEqual(self.gateway_roots[0]+locator,
1086                          MockCurl.return_value.getopt(pycurl.URL).decode())
1087         self.assertEqual(True, self.keepClient.head(locator))
1088
1089     @mock.patch('pycurl.Curl')
1090     def test_get_with_gateway_hints_in_order(self, MockCurl):
1091         gateways = 4
1092         disks = 3
1093         mocks = [
1094             tutil.FakeCurl.make(code=404, body='')
1095             for _ in range(gateways+disks)
1096         ]
1097         MockCurl.side_effect = tutil.queue_with(mocks)
1098         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1099         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1100                            ['K@'+gw['uuid'] for gw in self.gateways])
1101         with self.assertRaises(arvados.errors.NotFoundError):
1102             self.keepClient.get(locator)
1103         # Gateways are tried first, in the order given.
1104         for i, root in enumerate(self.gateway_roots):
1105             self.assertEqual(root+locator,
1106                              mocks[i].getopt(pycurl.URL).decode())
1107         # Disk services are tried next.
1108         for i in range(gateways, gateways+disks):
1109             self.assertRegex(
1110                 mocks[i].getopt(pycurl.URL).decode(),
1111                 r'keep0x')
1112
1113     @mock.patch('pycurl.Curl')
1114     def test_head_with_gateway_hints_in_order(self, MockCurl):
1115         gateways = 4
1116         disks = 3
1117         mocks = [
1118             tutil.FakeCurl.make(code=404, body=b'')
1119             for _ in range(gateways+disks)
1120         ]
1121         MockCurl.side_effect = tutil.queue_with(mocks)
1122         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1123         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1124                            ['K@'+gw['uuid'] for gw in self.gateways])
1125         with self.assertRaises(arvados.errors.NotFoundError):
1126             self.keepClient.head(locator)
1127         # Gateways are tried first, in the order given.
1128         for i, root in enumerate(self.gateway_roots):
1129             self.assertEqual(root+locator,
1130                              mocks[i].getopt(pycurl.URL).decode())
1131         # Disk services are tried next.
1132         for i in range(gateways, gateways+disks):
1133             self.assertRegex(
1134                 mocks[i].getopt(pycurl.URL).decode(),
1135                 r'keep0x')
1136
1137     @mock.patch('pycurl.Curl')
1138     def test_get_with_remote_proxy_hint(self, MockCurl):
1139         MockCurl.return_value = tutil.FakeCurl.make(
1140             code=200, body=b'foo', headers={'Content-Length': 3})
1141         self.mock_disks_and_gateways()
1142         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1143         self.assertEqual(b'foo', self.keepClient.get(locator))
1144         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1145                          MockCurl.return_value.getopt(pycurl.URL).decode())
1146
1147     @mock.patch('pycurl.Curl')
1148     def test_head_with_remote_proxy_hint(self, MockCurl):
1149         MockCurl.return_value = tutil.FakeCurl.make(
1150             code=200, body=b'foo', headers={'Content-Length': 3})
1151         self.mock_disks_and_gateways()
1152         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1153         self.assertEqual(True, self.keepClient.head(locator))
1154         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1155                          MockCurl.return_value.getopt(pycurl.URL).decode())
1156
1157 class KeepClientRetryTestMixin(object):
1158     disk_cache = False
1159
1160     # Testing with a local Keep store won't exercise the retry behavior.
1161     # Instead, our strategy is:
1162     # * Create a client with one proxy specified (pointed at a black
1163     #   hole), so there's no need to instantiate an API client, and
1164     #   all HTTP requests come from one place.
1165     # * Mock httplib's request method to provide simulated responses.
1166     # This lets us test the retry logic extensively without relying on any
1167     # supporting servers, and prevents side effects in case something hiccups.
1168     # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
1169     # run_method().
1170     #
1171     # Test classes must define TEST_PATCHER to a method that mocks
1172     # out appropriate methods in the client.
1173
1174     PROXY_ADDR = 'http://[%s]:65535/' % (tutil.TEST_HOST,)
1175     TEST_DATA = b'testdata'
1176     TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
1177
1178     def setUp(self):
1179         self.client_kwargs = {'proxy': self.PROXY_ADDR, 'local_store': ''}
1180
1181     def new_client(self, **caller_kwargs):
1182         kwargs = self.client_kwargs.copy()
1183         kwargs.update(caller_kwargs)
1184         kwargs['block_cache'] = make_block_cache(self.disk_cache)
1185         return arvados.KeepClient(**kwargs)
1186
1187     def run_method(self, *args, **kwargs):
1188         raise NotImplementedError("test subclasses must define run_method")
1189
1190     def check_success(self, expected=None, *args, **kwargs):
1191         if expected is None:
1192             expected = self.DEFAULT_EXPECT
1193         self.assertEqual(expected, self.run_method(*args, **kwargs))
1194
1195     def check_exception(self, error_class=None, *args, **kwargs):
1196         if error_class is None:
1197             error_class = self.DEFAULT_EXCEPTION
1198         with self.assertRaises(error_class) as err:
1199             self.run_method(*args, **kwargs)
1200         return err
1201
1202     def test_immediate_success(self):
1203         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 200):
1204             self.check_success()
1205
1206     def test_retry_then_success(self):
1207         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1208             self.check_success(num_retries=3)
1209
1210     def test_exception_then_success(self):
1211         with self.TEST_PATCHER(self.DEFAULT_EXPECT, Exception('mock err'), 200):
1212             self.check_success(num_retries=3)
1213
1214     def test_no_default_retry(self):
1215         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1216             self.check_exception()
1217
1218     def test_no_retry_after_permanent_error(self):
1219         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 403, 200):
1220             self.check_exception(num_retries=3)
1221
1222     def test_error_after_retries_exhausted(self):
1223         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 500, 200):
1224             err = self.check_exception(num_retries=1)
1225         self.assertRegex(str(err.exception), r'failed to .* after 2 attempts')
1226
1227     def test_num_retries_instance_fallback(self):
1228         self.client_kwargs['num_retries'] = 3
1229         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1230             self.check_success()
1231
1232
1233 @tutil.skip_sleep
1234 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1235 class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1236     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
1237     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1238     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1239     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1240
1241     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1242                    *args, **kwargs):
1243         return self.new_client().get(locator, *args, **kwargs)
1244
1245     def test_specific_exception_when_not_found(self):
1246         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1247             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1248
1249     def test_general_exception_with_mixed_errors(self):
1250         # get should raise a NotFoundError if no server returns the block,
1251         # and a high threshold of servers report that it's not found.
1252         # This test rigs up 50/50 disagreement between two servers, and
1253         # checks that it does not become a NotFoundError.
1254         client = self.new_client()
1255         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1256             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1257                 client.get(self.HINTED_LOCATOR)
1258             self.assertNotIsInstance(
1259                 exc_check.exception, arvados.errors.NotFoundError,
1260                 "mixed errors raised NotFoundError")
1261
1262     def test_hint_server_can_succeed_without_retries(self):
1263         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1264             self.check_success(locator=self.HINTED_LOCATOR)
1265
1266     def test_try_next_server_after_timeout(self):
1267         with tutil.mock_keep_responses(
1268                 (socket.timeout("timed out"), 200),
1269                 (self.DEFAULT_EXPECT, 200)):
1270             self.check_success(locator=self.HINTED_LOCATOR)
1271
1272     def test_retry_data_with_wrong_checksum(self):
1273         with tutil.mock_keep_responses(
1274                 ('baddata', 200),
1275                 (self.DEFAULT_EXPECT, 200)):
1276             self.check_success(locator=self.HINTED_LOCATOR)
1277
1278 @tutil.skip_sleep
1279 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1280 class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1281     DEFAULT_EXPECT = True
1282     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1283     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1284     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1285
1286     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1287                    *args, **kwargs):
1288         return self.new_client().head(locator, *args, **kwargs)
1289
1290     def test_specific_exception_when_not_found(self):
1291         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1292             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1293
1294     def test_general_exception_with_mixed_errors(self):
1295         # head should raise a NotFoundError if no server returns the block,
1296         # and a high threshold of servers report that it's not found.
1297         # This test rigs up 50/50 disagreement between two servers, and
1298         # checks that it does not become a NotFoundError.
1299         client = self.new_client()
1300         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1301             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1302                 client.head(self.HINTED_LOCATOR)
1303             self.assertNotIsInstance(
1304                 exc_check.exception, arvados.errors.NotFoundError,
1305                 "mixed errors raised NotFoundError")
1306
1307     def test_hint_server_can_succeed_without_retries(self):
1308         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1309             self.check_success(locator=self.HINTED_LOCATOR)
1310
1311     def test_try_next_server_after_timeout(self):
1312         with tutil.mock_keep_responses(
1313                 (socket.timeout("timed out"), 200),
1314                 (self.DEFAULT_EXPECT, 200)):
1315             self.check_success(locator=self.HINTED_LOCATOR)
1316
1317 @tutil.skip_sleep
1318 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1319 class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1320     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
1321     DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
1322     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1323
1324     def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
1325                    copies=1, *args, **kwargs):
1326         return self.new_client().put(data, copies, *args, **kwargs)
1327
1328     def test_do_not_send_multiple_copies_to_same_server(self):
1329         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200):
1330             self.check_exception(copies=2, num_retries=3)
1331
1332
1333 class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
1334
1335     class FakeKeepService(object):
1336         def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
1337             self.delay = delay
1338             self.will_succeed = will_succeed
1339             self.will_raise = will_raise
1340             self._result = {}
1341             self._result['headers'] = {}
1342             self._result['headers']['x-keep-replicas-stored'] = str(replicas)
1343             self._result['headers']['x-keep-storage-classes-confirmed'] = 'default={}'.format(replicas)
1344             self._result['body'] = 'foobar'
1345
1346         def put(self, data_hash, data, timeout, headers):
1347             time.sleep(self.delay)
1348             if self.will_raise is not None:
1349                 raise self.will_raise
1350             return self.will_succeed
1351
1352         def last_result(self):
1353             if self.will_succeed:
1354                 return self._result
1355             else:
1356                 return {"status_code": 500, "body": "didn't succeed"}
1357
1358         def finished(self):
1359             return False
1360
1361     def setUp(self):
1362         self.copies = 3
1363         self.pool = arvados.KeepClient.KeepWriterThreadPool(
1364             data = 'foo',
1365             data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1366             max_service_replicas = self.copies,
1367             copies = self.copies
1368         )
1369
1370     def test_only_write_enough_on_success(self):
1371         for i in range(10):
1372             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1373             self.pool.add_task(ks, None)
1374         self.pool.join()
1375         self.assertEqual(self.pool.done(), (self.copies, []))
1376
1377     def test_only_write_enough_on_partial_success(self):
1378         for i in range(5):
1379             ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
1380             self.pool.add_task(ks, None)
1381             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1382             self.pool.add_task(ks, None)
1383         self.pool.join()
1384         self.assertEqual(self.pool.done(), (self.copies, []))
1385
1386     def test_only_write_enough_when_some_crash(self):
1387         for i in range(5):
1388             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1389             self.pool.add_task(ks, None)
1390             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1391             self.pool.add_task(ks, None)
1392         self.pool.join()
1393         self.assertEqual(self.pool.done(), (self.copies, []))
1394
1395     def test_fail_when_too_many_crash(self):
1396         for i in range(self.copies+1):
1397             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1398             self.pool.add_task(ks, None)
1399         for i in range(self.copies-1):
1400             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1401             self.pool.add_task(ks, None)
1402         self.pool.join()
1403         self.assertEqual(self.pool.done(), (self.copies-1, []))
1404
1405
1406 @tutil.skip_sleep
1407 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1408 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock):
1409     block_cache = False
1410
1411     # Test put()s that need two distinct servers to succeed, possibly
1412     # requiring multiple passes through the retry loop.
1413
1414     def setUp(self):
1415         self.api_client = self.mock_keep_services(count=2)
1416         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=make_block_cache(self.disk_cache))
1417
1418     def test_success_after_exception(self):
1419         with tutil.mock_keep_responses(
1420                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1421                 Exception('mock err'), 200, 200) as req_mock:
1422             self.keep_client.put('foo', num_retries=1, copies=2)
1423         self.assertEqual(3, req_mock.call_count)
1424
1425     def test_success_after_retryable_error(self):
1426         with tutil.mock_keep_responses(
1427                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1428                 500, 200, 200) as req_mock:
1429             self.keep_client.put('foo', num_retries=1, copies=2)
1430         self.assertEqual(3, req_mock.call_count)
1431
1432     def test_fail_after_final_error(self):
1433         # First retry loop gets a 200 (can't achieve replication by
1434         # storing again on that server) and a 400 (can't retry that
1435         # server at all), so we shouldn't try a third request.
1436         with tutil.mock_keep_responses(
1437                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1438                 200, 400, 200) as req_mock:
1439             with self.assertRaises(arvados.errors.KeepWriteError):
1440                 self.keep_client.put('foo', num_retries=1, copies=2)
1441         self.assertEqual(2, req_mock.call_count)
1442
1443 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1444 class KeepClientAPIErrorTest(unittest.TestCase):
1445     disk_cache = False
1446
1447     def test_api_fail(self):
1448         class ApiMock(object):
1449             def __getattr__(self, r):
1450                 if r == "api_token":
1451                     return "abc"
1452                 elif r == "insecure":
1453                     return False
1454                 elif r == "config":
1455                     return lambda: {}
1456                 else:
1457                     raise arvados.errors.KeepReadError()
1458         keep_client = arvados.KeepClient(api_client=ApiMock(),
1459                                          proxy='', local_store='',
1460                                          block_cache=make_block_cache(self.disk_cache))
1461
1462         # The bug this is testing for is that if an API (not
1463         # keepstore) exception is thrown as part of a get(), the next
1464         # attempt to get that same block will result in a deadlock.
1465         # This is why there are two get()s in a row.  Unfortunately,
1466         # the failure mode for this test is that the test suite
1467         # deadlocks, there isn't a good way to avoid that without
1468         # adding a special case that has no use except for this test.
1469
1470         with self.assertRaises(arvados.errors.KeepReadError):
1471             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1472         with self.assertRaises(arvados.errors.KeepReadError):
1473             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")