Merge branch '15707-dispatch-onhold-metrics'
[arvados.git] / sdk / python / tests / test_keep_client.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 standard_library.install_aliases()
9 from builtins import str
10 from builtins import range
11 from builtins import object
12 import hashlib
13 import mock
14 import os
15 import pycurl
16 import random
17 import re
18 import socket
19 import sys
20 import time
21 import unittest
22 import urllib.parse
23
24 import arvados
25 import arvados.retry
26 import arvados.util
27 from . import arvados_testutil as tutil
28 from . import keepstub
29 from . import run_test_server
30
31 class KeepTestCase(run_test_server.TestCaseWithServers):
32     MAIN_SERVER = {}
33     KEEP_SERVER = {}
34
35     @classmethod
36     def setUpClass(cls):
37         super(KeepTestCase, cls).setUpClass()
38         run_test_server.authorize_with("admin")
39         cls.api_client = arvados.api('v1')
40         cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
41                                              proxy='', local_store='')
42
43     def test_KeepBasicRWTest(self):
44         self.assertEqual(0, self.keep_client.upload_counter.get())
45         foo_locator = self.keep_client.put('foo')
46         self.assertRegex(
47             foo_locator,
48             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
49             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
50
51         # 6 bytes because uploaded 2 copies
52         self.assertEqual(6, self.keep_client.upload_counter.get())
53
54         self.assertEqual(0, self.keep_client.download_counter.get())
55         self.assertEqual(self.keep_client.get(foo_locator),
56                          b'foo',
57                          'wrong content from Keep.get(md5("foo"))')
58         self.assertEqual(3, self.keep_client.download_counter.get())
59
60     def test_KeepBinaryRWTest(self):
61         blob_str = b'\xff\xfe\xf7\x00\x01\x02'
62         blob_locator = self.keep_client.put(blob_str)
63         self.assertRegex(
64             blob_locator,
65             '^7fc7c53b45e53926ba52821140fef396\+6',
66             ('wrong locator from Keep.put(<binarydata>):' + blob_locator))
67         self.assertEqual(self.keep_client.get(blob_locator),
68                          blob_str,
69                          'wrong content from Keep.get(md5(<binarydata>))')
70
71     def test_KeepLongBinaryRWTest(self):
72         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
73         for i in range(0,23):
74             blob_data = blob_data + blob_data
75         blob_locator = self.keep_client.put(blob_data)
76         self.assertRegex(
77             blob_locator,
78             '^84d90fc0d8175dd5dcfab04b999bc956\+67108864',
79             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
80         self.assertEqual(self.keep_client.get(blob_locator),
81                          blob_data,
82                          'wrong content from Keep.get(md5(<binarydata>))')
83
84     @unittest.skip("unreliable test - please fix and close #8752")
85     def test_KeepSingleCopyRWTest(self):
86         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
87         blob_locator = self.keep_client.put(blob_data, copies=1)
88         self.assertRegex(
89             blob_locator,
90             '^c902006bc98a3eb4a3663b65ab4a6fab\+8',
91             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
92         self.assertEqual(self.keep_client.get(blob_locator),
93                          blob_data,
94                          'wrong content from Keep.get(md5(<binarydata>))')
95
96     def test_KeepEmptyCollectionTest(self):
97         blob_locator = self.keep_client.put('', copies=1)
98         self.assertRegex(
99             blob_locator,
100             '^d41d8cd98f00b204e9800998ecf8427e\+0',
101             ('wrong locator from Keep.put(""): ' + blob_locator))
102
103     def test_unicode_must_be_ascii(self):
104         # If unicode type, must only consist of valid ASCII
105         foo_locator = self.keep_client.put(u'foo')
106         self.assertRegex(
107             foo_locator,
108             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
109             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
110
111         if sys.version_info < (3, 0):
112             with self.assertRaises(UnicodeEncodeError):
113                 # Error if it is not ASCII
114                 self.keep_client.put(u'\xe2')
115
116         with self.assertRaises(AttributeError):
117             # Must be bytes or have an encode() method
118             self.keep_client.put({})
119
120     def test_KeepHeadTest(self):
121         locator = self.keep_client.put('test_head')
122         self.assertRegex(
123             locator,
124             '^b9a772c7049325feb7130fff1f8333e9\+9',
125             'wrong md5 hash from Keep.put for "test_head": ' + locator)
126         self.assertEqual(True, self.keep_client.head(locator))
127         self.assertEqual(self.keep_client.get(locator),
128                          b'test_head',
129                          'wrong content from Keep.get for "test_head"')
130
131 class KeepPermissionTestCase(run_test_server.TestCaseWithServers):
132     MAIN_SERVER = {}
133     KEEP_SERVER = {'blob_signing': True}
134
135     def test_KeepBasicRWTest(self):
136         run_test_server.authorize_with('active')
137         keep_client = arvados.KeepClient()
138         foo_locator = keep_client.put('foo')
139         self.assertRegex(
140             foo_locator,
141             r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
142             'invalid locator from Keep.put("foo"): ' + foo_locator)
143         self.assertEqual(keep_client.get(foo_locator),
144                          b'foo',
145                          'wrong content from Keep.get(md5("foo"))')
146
147         # GET with an unsigned locator => NotFound
148         bar_locator = keep_client.put('bar')
149         unsigned_bar_locator = "37b51d194a7513e45b56f6524f2d51f2+3"
150         self.assertRegex(
151             bar_locator,
152             r'^37b51d194a7513e45b56f6524f2d51f2\+3\+A[a-f0-9]+@[a-f0-9]+$',
153             'invalid locator from Keep.put("bar"): ' + bar_locator)
154         self.assertRaises(arvados.errors.NotFoundError,
155                           keep_client.get,
156                           unsigned_bar_locator)
157
158         # GET from a different user => NotFound
159         run_test_server.authorize_with('spectator')
160         self.assertRaises(arvados.errors.NotFoundError,
161                           arvados.Keep.get,
162                           bar_locator)
163
164         # Unauthenticated GET for a signed locator => NotFound
165         # Unauthenticated GET for an unsigned locator => NotFound
166         keep_client.api_token = ''
167         self.assertRaises(arvados.errors.NotFoundError,
168                           keep_client.get,
169                           bar_locator)
170         self.assertRaises(arvados.errors.NotFoundError,
171                           keep_client.get,
172                           unsigned_bar_locator)
173
174
175 class KeepProxyTestCase(run_test_server.TestCaseWithServers):
176     MAIN_SERVER = {}
177     KEEP_SERVER = {}
178     KEEP_PROXY_SERVER = {}
179
180     @classmethod
181     def setUpClass(cls):
182         super(KeepProxyTestCase, cls).setUpClass()
183         run_test_server.authorize_with('active')
184         cls.api_client = arvados.api('v1')
185
186     def tearDown(self):
187         arvados.config.settings().pop('ARVADOS_EXTERNAL_CLIENT', None)
188         super(KeepProxyTestCase, self).tearDown()
189
190     def test_KeepProxyTest1(self):
191         # Will use ARVADOS_KEEP_SERVICES environment variable that
192         # is set by setUpClass().
193         keep_client = arvados.KeepClient(api_client=self.api_client,
194                                          local_store='')
195         baz_locator = keep_client.put('baz')
196         self.assertRegex(
197             baz_locator,
198             '^73feffa4b7f6bb68e44cf984c85f6e88\+3',
199             'wrong md5 hash from Keep.put("baz"): ' + baz_locator)
200         self.assertEqual(keep_client.get(baz_locator),
201                          b'baz',
202                          'wrong content from Keep.get(md5("baz"))')
203         self.assertTrue(keep_client.using_proxy)
204
205     def test_KeepProxyTest2(self):
206         # Don't instantiate the proxy directly, but set the X-External-Client
207         # header.  The API server should direct us to the proxy.
208         arvados.config.settings()['ARVADOS_EXTERNAL_CLIENT'] = 'true'
209         keep_client = arvados.KeepClient(api_client=self.api_client,
210                                          proxy='', local_store='')
211         baz_locator = keep_client.put('baz2')
212         self.assertRegex(
213             baz_locator,
214             '^91f372a266fe2bf2823cb8ec7fda31ce\+4',
215             'wrong md5 hash from Keep.put("baz2"): ' + baz_locator)
216         self.assertEqual(keep_client.get(baz_locator),
217                          b'baz2',
218                          'wrong content from Keep.get(md5("baz2"))')
219         self.assertTrue(keep_client.using_proxy)
220
221     def test_KeepProxyTestMultipleURIs(self):
222         # Test using ARVADOS_KEEP_SERVICES env var overriding any
223         # existing proxy setting and setting multiple proxies
224         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
225         keep_client = arvados.KeepClient(api_client=self.api_client,
226                                          local_store='')
227         uris = [x['_service_root'] for x in keep_client._keep_services]
228         self.assertEqual(uris, ['http://10.0.0.1/',
229                                 'https://foo.example.org:1234/'])
230
231     def test_KeepProxyTestInvalidURI(self):
232         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org'
233         with self.assertRaises(arvados.errors.ArgumentError):
234             keep_client = arvados.KeepClient(api_client=self.api_client,
235                                              local_store='')
236
237
238 class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
239     def get_service_roots(self, api_client):
240         keep_client = arvados.KeepClient(api_client=api_client)
241         services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
242         return [urllib.parse.urlparse(url) for url in sorted(services)]
243
244     def test_ssl_flag_respected_in_roots(self):
245         for ssl_flag in [False, True]:
246             services = self.get_service_roots(self.mock_keep_services(
247                 service_ssl_flag=ssl_flag))
248             self.assertEqual(
249                 ('https' if ssl_flag else 'http'), services[0].scheme)
250
251     def test_correct_ports_with_ipv6_addresses(self):
252         service = self.get_service_roots(self.mock_keep_services(
253             service_type='proxy', service_host='100::1', service_port=10, count=1))[0]
254         self.assertEqual('100::1', service.hostname)
255         self.assertEqual(10, service.port)
256
257     def test_insecure_disables_tls_verify(self):
258         api_client = self.mock_keep_services(count=1)
259         force_timeout = socket.timeout("timed out")
260
261         api_client.insecure = True
262         with tutil.mock_keep_responses(b'foo', 200) as mock:
263             keep_client = arvados.KeepClient(api_client=api_client)
264             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
265             self.assertEqual(
266                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
267                 0)
268
269         api_client.insecure = False
270         with tutil.mock_keep_responses(b'foo', 200) as mock:
271             keep_client = arvados.KeepClient(api_client=api_client)
272             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
273             # getopt()==None here means we didn't change the
274             # default. If we were using real pycurl instead of a mock,
275             # it would return the default value 1.
276             self.assertEqual(
277                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
278                 None)
279
280     def test_refresh_signature(self):
281         blk_digest = '6f5902ac237024bdd0c176cb93063dc4+11'
282         blk_sig = 'da39a3ee5e6b4b0d3255bfef95601890afd80709@53bed294'
283         local_loc = blk_digest+'+A'+blk_sig
284         remote_loc = blk_digest+'+R'+blk_sig
285         api_client = self.mock_keep_services(count=1)
286         headers = {'X-Keep-Locator':local_loc}
287         with tutil.mock_keep_responses('', 200, **headers):
288             # Check that the translated locator gets returned
289             keep_client = arvados.KeepClient(api_client=api_client)
290             self.assertEqual(local_loc, keep_client.refresh_signature(remote_loc))
291             # Check that refresh_signature() uses the correct method and headers
292             keep_client._get_or_head = mock.MagicMock()
293             keep_client.refresh_signature(remote_loc)
294             args, kwargs = keep_client._get_or_head.call_args_list[0]
295             self.assertIn(remote_loc, args)
296             self.assertEqual("HEAD", kwargs['method'])
297             self.assertIn('X-Keep-Signature', kwargs['headers'])
298
299     # test_*_timeout verify that KeepClient instructs pycurl to use
300     # the appropriate connection and read timeouts. They don't care
301     # whether pycurl actually exhibits the expected timeout behavior
302     # -- those tests are in the KeepClientTimeout test class.
303
304     def test_get_timeout(self):
305         api_client = self.mock_keep_services(count=1)
306         force_timeout = socket.timeout("timed out")
307         with tutil.mock_keep_responses(force_timeout, 0) as mock:
308             keep_client = arvados.KeepClient(api_client=api_client)
309             with self.assertRaises(arvados.errors.KeepReadError):
310                 keep_client.get('ffffffffffffffffffffffffffffffff')
311             self.assertEqual(
312                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
313                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
314             self.assertEqual(
315                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
316                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
317             self.assertEqual(
318                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
319                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
320
321     def test_put_timeout(self):
322         api_client = self.mock_keep_services(count=1)
323         force_timeout = socket.timeout("timed out")
324         with tutil.mock_keep_responses(force_timeout, 0) as mock:
325             keep_client = arvados.KeepClient(api_client=api_client)
326             with self.assertRaises(arvados.errors.KeepWriteError):
327                 keep_client.put(b'foo')
328             self.assertEqual(
329                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
330                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
331             self.assertEqual(
332                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
333                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
334             self.assertEqual(
335                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
336                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
337
338     def test_head_timeout(self):
339         api_client = self.mock_keep_services(count=1)
340         force_timeout = socket.timeout("timed out")
341         with tutil.mock_keep_responses(force_timeout, 0) as mock:
342             keep_client = arvados.KeepClient(api_client=api_client)
343             with self.assertRaises(arvados.errors.KeepReadError):
344                 keep_client.head('ffffffffffffffffffffffffffffffff')
345             self.assertEqual(
346                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
347                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
348             self.assertEqual(
349                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
350                 None)
351             self.assertEqual(
352                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
353                 None)
354
355     def test_proxy_get_timeout(self):
356         api_client = self.mock_keep_services(service_type='proxy', count=1)
357         force_timeout = socket.timeout("timed out")
358         with tutil.mock_keep_responses(force_timeout, 0) as mock:
359             keep_client = arvados.KeepClient(api_client=api_client)
360             with self.assertRaises(arvados.errors.KeepReadError):
361                 keep_client.get('ffffffffffffffffffffffffffffffff')
362             self.assertEqual(
363                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
364                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
365             self.assertEqual(
366                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
367                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
368             self.assertEqual(
369                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
370                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
371
372     def test_proxy_head_timeout(self):
373         api_client = self.mock_keep_services(service_type='proxy', count=1)
374         force_timeout = socket.timeout("timed out")
375         with tutil.mock_keep_responses(force_timeout, 0) as mock:
376             keep_client = arvados.KeepClient(api_client=api_client)
377             with self.assertRaises(arvados.errors.KeepReadError):
378                 keep_client.head('ffffffffffffffffffffffffffffffff')
379             self.assertEqual(
380                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
381                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
382             self.assertEqual(
383                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
384                 None)
385             self.assertEqual(
386                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
387                 None)
388
389     def test_proxy_put_timeout(self):
390         api_client = self.mock_keep_services(service_type='proxy', count=1)
391         force_timeout = socket.timeout("timed out")
392         with tutil.mock_keep_responses(force_timeout, 0) as mock:
393             keep_client = arvados.KeepClient(api_client=api_client)
394             with self.assertRaises(arvados.errors.KeepWriteError):
395                 keep_client.put('foo')
396             self.assertEqual(
397                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
398                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
399             self.assertEqual(
400                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
401                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
402             self.assertEqual(
403                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
404                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
405
406     def check_no_services_error(self, verb, exc_class):
407         api_client = mock.MagicMock(name='api_client')
408         api_client.keep_services().accessible().execute.side_effect = (
409             arvados.errors.ApiError)
410         keep_client = arvados.KeepClient(api_client=api_client)
411         with self.assertRaises(exc_class) as err_check:
412             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
413         self.assertEqual(0, len(err_check.exception.request_errors()))
414
415     def test_get_error_with_no_services(self):
416         self.check_no_services_error('get', arvados.errors.KeepReadError)
417
418     def test_head_error_with_no_services(self):
419         self.check_no_services_error('head', arvados.errors.KeepReadError)
420
421     def test_put_error_with_no_services(self):
422         self.check_no_services_error('put', arvados.errors.KeepWriteError)
423
424     def check_errors_from_last_retry(self, verb, exc_class):
425         api_client = self.mock_keep_services(count=2)
426         req_mock = tutil.mock_keep_responses(
427             "retry error reporting test", 500, 500, 500, 500, 500, 500, 502, 502)
428         with req_mock, tutil.skip_sleep, \
429                 self.assertRaises(exc_class) as err_check:
430             keep_client = arvados.KeepClient(api_client=api_client)
431             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
432                                        num_retries=3)
433         self.assertEqual([502, 502], [
434                 getattr(error, 'status_code', None)
435                 for error in err_check.exception.request_errors().values()])
436         self.assertRegex(str(err_check.exception), r'failed to (read|write) .* after 4 attempts')
437
438     def test_get_error_reflects_last_retry(self):
439         self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
440
441     def test_head_error_reflects_last_retry(self):
442         self.check_errors_from_last_retry('head', arvados.errors.KeepReadError)
443
444     def test_put_error_reflects_last_retry(self):
445         self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
446
447     def test_put_error_does_not_include_successful_puts(self):
448         data = 'partial failure test'
449         data_loc = tutil.str_keep_locator(data)
450         api_client = self.mock_keep_services(count=3)
451         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
452                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
453             keep_client = arvados.KeepClient(api_client=api_client)
454             keep_client.put(data)
455         self.assertEqual(2, len(exc_check.exception.request_errors()))
456
457     def test_proxy_put_with_no_writable_services(self):
458         data = 'test with no writable services'
459         data_loc = tutil.str_keep_locator(data)
460         api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
461         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
462                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
463           keep_client = arvados.KeepClient(api_client=api_client)
464           keep_client.put(data)
465         self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
466         self.assertEqual(0, len(exc_check.exception.request_errors()))
467
468     def test_oddball_service_get(self):
469         body = b'oddball service get'
470         api_client = self.mock_keep_services(service_type='fancynewblobstore')
471         with tutil.mock_keep_responses(body, 200):
472             keep_client = arvados.KeepClient(api_client=api_client)
473             actual = keep_client.get(tutil.str_keep_locator(body))
474         self.assertEqual(body, actual)
475
476     def test_oddball_service_put(self):
477         body = b'oddball service put'
478         pdh = tutil.str_keep_locator(body)
479         api_client = self.mock_keep_services(service_type='fancynewblobstore')
480         with tutil.mock_keep_responses(pdh, 200):
481             keep_client = arvados.KeepClient(api_client=api_client)
482             actual = keep_client.put(body, copies=1)
483         self.assertEqual(pdh, actual)
484
485     def test_oddball_service_writer_count(self):
486         body = b'oddball service writer count'
487         pdh = tutil.str_keep_locator(body)
488         api_client = self.mock_keep_services(service_type='fancynewblobstore',
489                                              count=4)
490         headers = {'x-keep-replicas-stored': 3}
491         with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
492                                        **headers) as req_mock:
493             keep_client = arvados.KeepClient(api_client=api_client)
494             actual = keep_client.put(body, copies=2)
495         self.assertEqual(pdh, actual)
496         self.assertEqual(1, req_mock.call_count)
497
498
499 @tutil.skip_sleep
500 class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
501     def setUp(self):
502         self.api_client = self.mock_keep_services(count=2)
503         self.keep_client = arvados.KeepClient(api_client=self.api_client)
504         self.data = b'xyzzy'
505         self.locator = '1271ed5ef305aadabc605b1609e24c52'
506
507     @mock.patch('arvados.KeepClient.KeepService.get')
508     def test_get_request_cache(self, get_mock):
509         with tutil.mock_keep_responses(self.data, 200, 200):
510             self.keep_client.get(self.locator)
511             self.keep_client.get(self.locator)
512         # Request already cached, don't require more than one request
513         get_mock.assert_called_once()
514
515     @mock.patch('arvados.KeepClient.KeepService.get')
516     def test_head_request_cache(self, get_mock):
517         with tutil.mock_keep_responses(self.data, 200, 200):
518             self.keep_client.head(self.locator)
519             self.keep_client.head(self.locator)
520         # Don't cache HEAD requests so that they're not confused with GET reqs
521         self.assertEqual(2, get_mock.call_count)
522
523     @mock.patch('arvados.KeepClient.KeepService.get')
524     def test_head_and_then_get_return_different_responses(self, get_mock):
525         head_resp = None
526         get_resp = None
527         get_mock.side_effect = ['first response', 'second response']
528         with tutil.mock_keep_responses(self.data, 200, 200):
529             head_resp = self.keep_client.head(self.locator)
530             get_resp = self.keep_client.get(self.locator)
531         self.assertEqual('first response', head_resp)
532         # First reponse was not cached because it was from a HEAD request.
533         self.assertNotEqual(head_resp, get_resp)
534
535
536 @tutil.skip_sleep
537 class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock):
538     def setUp(self):
539         self.api_client = self.mock_keep_services(count=2)
540         self.keep_client = arvados.KeepClient(api_client=self.api_client)
541         self.data = b'xyzzy'
542         self.locator = '1271ed5ef305aadabc605b1609e24c52'
543         self.test_id = arvados.util.new_request_id()
544         self.assertRegex(self.test_id, r'^req-[a-z0-9]{20}$')
545         # If we don't set request_id to None explicitly here, it will
546         # return <MagicMock name='api_client_mock.request_id'
547         # id='123456789'>:
548         self.api_client.request_id = None
549
550     def test_default_to_api_client_request_id(self):
551         self.api_client.request_id = self.test_id
552         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
553             self.keep_client.put(self.data)
554         self.assertEqual(2, len(mock.responses))
555         for resp in mock.responses:
556             self.assertProvidedRequestId(resp)
557
558         with tutil.mock_keep_responses(self.data, 200) as mock:
559             self.keep_client.get(self.locator)
560         self.assertProvidedRequestId(mock.responses[0])
561
562         with tutil.mock_keep_responses(b'', 200) as mock:
563             self.keep_client.head(self.locator)
564         self.assertProvidedRequestId(mock.responses[0])
565
566     def test_explicit_request_id(self):
567         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
568             self.keep_client.put(self.data, request_id=self.test_id)
569         self.assertEqual(2, len(mock.responses))
570         for resp in mock.responses:
571             self.assertProvidedRequestId(resp)
572
573         with tutil.mock_keep_responses(self.data, 200) as mock:
574             self.keep_client.get(self.locator, request_id=self.test_id)
575         self.assertProvidedRequestId(mock.responses[0])
576
577         with tutil.mock_keep_responses(b'', 200) as mock:
578             self.keep_client.head(self.locator, request_id=self.test_id)
579         self.assertProvidedRequestId(mock.responses[0])
580
581     def test_automatic_request_id(self):
582         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
583             self.keep_client.put(self.data)
584         self.assertEqual(2, len(mock.responses))
585         for resp in mock.responses:
586             self.assertAutomaticRequestId(resp)
587
588         with tutil.mock_keep_responses(self.data, 200) as mock:
589             self.keep_client.get(self.locator)
590         self.assertAutomaticRequestId(mock.responses[0])
591
592         with tutil.mock_keep_responses(b'', 200) as mock:
593             self.keep_client.head(self.locator)
594         self.assertAutomaticRequestId(mock.responses[0])
595
596     def assertAutomaticRequestId(self, resp):
597         hdr = [x for x in resp.getopt(pycurl.HTTPHEADER)
598                if x.startswith('X-Request-Id: ')][0]
599         self.assertNotEqual(hdr, 'X-Request-Id: '+self.test_id)
600         self.assertRegex(hdr, r'^X-Request-Id: req-[a-z0-9]{20}$')
601
602     def assertProvidedRequestId(self, resp):
603         self.assertIn('X-Request-Id: '+self.test_id,
604                       resp.getopt(pycurl.HTTPHEADER))
605
606
607 @tutil.skip_sleep
608 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
609
610     def setUp(self):
611         # expected_order[i] is the probe order for
612         # hash=md5(sprintf("%064x",i)) where there are 16 services
613         # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
614         # the first probe for the block consisting of 64 "0"
615         # characters is the service whose uuid is
616         # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
617         self.services = 16
618         self.expected_order = [
619             list('3eab2d5fc9681074'),
620             list('097dba52e648f1c3'),
621             list('c5b4e023f8a7d691'),
622             list('9d81c02e76a3bf54'),
623             ]
624         self.blocks = [
625             "{:064x}".format(x).encode()
626             for x in range(len(self.expected_order))]
627         self.hashes = [
628             hashlib.md5(self.blocks[x]).hexdigest()
629             for x in range(len(self.expected_order))]
630         self.api_client = self.mock_keep_services(count=self.services)
631         self.keep_client = arvados.KeepClient(api_client=self.api_client)
632
633     def test_weighted_service_roots_against_reference_set(self):
634         # Confirm weighted_service_roots() returns the correct order
635         for i, hash in enumerate(self.hashes):
636             roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash))
637             got_order = [
638                 re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
639                 for root in roots]
640             self.assertEqual(self.expected_order[i], got_order)
641
642     def test_get_probe_order_against_reference_set(self):
643         self._test_probe_order_against_reference_set(
644             lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
645
646     def test_head_probe_order_against_reference_set(self):
647         self._test_probe_order_against_reference_set(
648             lambda i: self.keep_client.head(self.hashes[i], num_retries=1))
649
650     def test_put_probe_order_against_reference_set(self):
651         # copies=1 prevents the test from being sensitive to races
652         # between writer threads.
653         self._test_probe_order_against_reference_set(
654             lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1))
655
656     def _test_probe_order_against_reference_set(self, op):
657         for i in range(len(self.blocks)):
658             with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \
659                  self.assertRaises(arvados.errors.KeepRequestError):
660                 op(i)
661             got_order = [
662                 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
663                 for resp in mock.responses]
664             self.assertEqual(self.expected_order[i]*2, got_order)
665
666     def test_put_probe_order_multiple_copies(self):
667         for copies in range(2, 4):
668             for i in range(len(self.blocks)):
669                 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \
670                      self.assertRaises(arvados.errors.KeepWriteError):
671                     self.keep_client.put(self.blocks[i], num_retries=2, copies=copies)
672                 got_order = [
673                     re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
674                     for resp in mock.responses]
675                 # With T threads racing to make requests, the position
676                 # of a given server in the sequence of HTTP requests
677                 # (got_order) cannot be more than T-1 positions
678                 # earlier than that server's position in the reference
679                 # probe sequence (expected_order).
680                 #
681                 # Loop invariant: we have accounted for +pos+ expected
682                 # probes, either by seeing them in +got_order+ or by
683                 # putting them in +pending+ in the hope of seeing them
684                 # later. As long as +len(pending)<T+, we haven't
685                 # started a request too early.
686                 pending = []
687                 for pos, expected in enumerate(self.expected_order[i]*3):
688                     got = got_order[pos-len(pending)]
689                     while got in pending:
690                         del pending[pending.index(got)]
691                         got = got_order[pos-len(pending)]
692                     if got != expected:
693                         pending.append(expected)
694                         self.assertLess(
695                             len(pending), copies,
696                             "pending={}, with copies={}, got {}, expected {}".format(
697                                 pending, copies, repr(got_order), repr(self.expected_order[i]*3)))
698
699     def test_probe_waste_adding_one_server(self):
700         hashes = [
701             hashlib.md5("{:064x}".format(x).encode()).hexdigest() for x in range(100)]
702         initial_services = 12
703         self.api_client = self.mock_keep_services(count=initial_services)
704         self.keep_client = arvados.KeepClient(api_client=self.api_client)
705         probes_before = [
706             self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
707         for added_services in range(1, 12):
708             api_client = self.mock_keep_services(count=initial_services+added_services)
709             keep_client = arvados.KeepClient(api_client=api_client)
710             total_penalty = 0
711             for hash_index in range(len(hashes)):
712                 probe_after = keep_client.weighted_service_roots(
713                     arvados.KeepLocator(hashes[hash_index]))
714                 penalty = probe_after.index(probes_before[hash_index][0])
715                 self.assertLessEqual(penalty, added_services)
716                 total_penalty += penalty
717             # Average penalty per block should not exceed
718             # N(added)/N(orig) by more than 20%, and should get closer
719             # to the ideal as we add data points.
720             expect_penalty = (
721                 added_services *
722                 len(hashes) / initial_services)
723             max_penalty = (
724                 expect_penalty *
725                 (120 - added_services)/100)
726             min_penalty = (
727                 expect_penalty * 8/10)
728             self.assertTrue(
729                 min_penalty <= total_penalty <= max_penalty,
730                 "With {}+{} services, {} blocks, penalty {} but expected {}..{}".format(
731                     initial_services,
732                     added_services,
733                     len(hashes),
734                     total_penalty,
735                     min_penalty,
736                     max_penalty))
737
738     def check_64_zeros_error_order(self, verb, exc_class):
739         data = b'0' * 64
740         if verb == 'get':
741             data = tutil.str_keep_locator(data)
742         # Arbitrary port number:
743         aport = random.randint(1024,65535)
744         api_client = self.mock_keep_services(service_port=aport, count=self.services)
745         keep_client = arvados.KeepClient(api_client=api_client)
746         with mock.patch('pycurl.Curl') as curl_mock, \
747              self.assertRaises(exc_class) as err_check:
748             curl_mock.return_value = tutil.FakeCurl.make(code=500, body=b'')
749             getattr(keep_client, verb)(data)
750         urls = [urllib.parse.urlparse(url)
751                 for url in err_check.exception.request_errors()]
752         self.assertEqual([('keep0x' + c, aport) for c in '3eab2d5fc9681074'],
753                          [(url.hostname, url.port) for url in urls])
754
755     def test_get_error_shows_probe_order(self):
756         self.check_64_zeros_error_order('get', arvados.errors.KeepReadError)
757
758     def test_put_error_shows_probe_order(self):
759         self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
760
761
762 class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase):
763     # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
764     # 1s worth of data and then trigger bandwidth errors before running
765     # out of data.
766     DATA = b'x'*2**11
767     BANDWIDTH_LOW_LIM = 1024
768     TIMEOUT_TIME = 1.0
769
770     class assertTakesBetween(unittest.TestCase):
771         def __init__(self, tmin, tmax):
772             self.tmin = tmin
773             self.tmax = tmax
774
775         def __enter__(self):
776             self.t0 = time.time()
777
778         def __exit__(self, *args, **kwargs):
779             # Round times to milliseconds, like CURL. Otherwise, we
780             # fail when CURL reaches a 1s timeout at 0.9998s.
781             delta = round(time.time() - self.t0, 3)
782             self.assertGreaterEqual(delta, self.tmin)
783             self.assertLessEqual(delta, self.tmax)
784
785     class assertTakesGreater(unittest.TestCase):
786         def __init__(self, tmin):
787             self.tmin = tmin
788
789         def __enter__(self):
790             self.t0 = time.time()
791
792         def __exit__(self, *args, **kwargs):
793             delta = round(time.time() - self.t0, 3)
794             self.assertGreaterEqual(delta, self.tmin)
795
796     def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
797         return arvados.KeepClient(
798             api_client=self.api_client,
799             timeout=timeouts)
800
801     def test_timeout_slow_connect(self):
802         # Can't simulate TCP delays with our own socket. Leave our
803         # stub server running uselessly, and try to connect to an
804         # unroutable IP address instead.
805         self.api_client = self.mock_keep_services(
806             count=1,
807             service_host='240.0.0.0',
808         )
809         with self.assertTakesBetween(0.1, 0.5):
810             with self.assertRaises(arvados.errors.KeepWriteError):
811                 self.keepClient().put(self.DATA, copies=1, num_retries=0)
812
813     def test_low_bandwidth_no_delays_success(self):
814         self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
815         kc = self.keepClient()
816         loc = kc.put(self.DATA, copies=1, num_retries=0)
817         self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
818
819     def test_too_low_bandwidth_no_delays_failure(self):
820         # Check that lessening bandwidth corresponds to failing
821         kc = self.keepClient()
822         loc = kc.put(self.DATA, copies=1, num_retries=0)
823         self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
824         with self.assertTakesGreater(self.TIMEOUT_TIME):
825             with self.assertRaises(arvados.errors.KeepReadError):
826                 kc.get(loc, num_retries=0)
827         with self.assertTakesGreater(self.TIMEOUT_TIME):
828             with self.assertRaises(arvados.errors.KeepWriteError):
829                 kc.put(self.DATA, copies=1, num_retries=0)
830
831     def test_low_bandwidth_with_server_response_delay_failure(self):
832         kc = self.keepClient()
833         loc = kc.put(self.DATA, copies=1, num_retries=0)
834         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
835         self.server.setdelays(response=self.TIMEOUT_TIME)
836         with self.assertTakesGreater(self.TIMEOUT_TIME):
837             with self.assertRaises(arvados.errors.KeepReadError):
838                 kc.get(loc, num_retries=0)
839         with self.assertTakesGreater(self.TIMEOUT_TIME):
840             with self.assertRaises(arvados.errors.KeepWriteError):
841                 kc.put(self.DATA, copies=1, num_retries=0)
842         with self.assertTakesGreater(self.TIMEOUT_TIME):
843             kc.head(loc, num_retries=0)
844
845     def test_low_bandwidth_with_server_mid_delay_failure(self):
846         kc = self.keepClient()
847         loc = kc.put(self.DATA, copies=1, num_retries=0)
848         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
849         self.server.setdelays(mid_write=self.TIMEOUT_TIME, mid_read=self.TIMEOUT_TIME)
850         with self.assertTakesGreater(self.TIMEOUT_TIME):
851             with self.assertRaises(arvados.errors.KeepReadError) as e:
852                 kc.get(loc, num_retries=0)
853         with self.assertTakesGreater(self.TIMEOUT_TIME):
854             with self.assertRaises(arvados.errors.KeepWriteError):
855                 kc.put(self.DATA, copies=1, num_retries=0)
856
857     def test_timeout_slow_request(self):
858         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
859         self.server.setdelays(request=.2)
860         self._test_connect_timeout_under_200ms(loc)
861         self.server.setdelays(request=2)
862         self._test_response_timeout_under_2s(loc)
863
864     def test_timeout_slow_response(self):
865         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
866         self.server.setdelays(response=.2)
867         self._test_connect_timeout_under_200ms(loc)
868         self.server.setdelays(response=2)
869         self._test_response_timeout_under_2s(loc)
870
871     def test_timeout_slow_response_body(self):
872         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
873         self.server.setdelays(response_body=.2)
874         self._test_connect_timeout_under_200ms(loc)
875         self.server.setdelays(response_body=2)
876         self._test_response_timeout_under_2s(loc)
877
878     def _test_connect_timeout_under_200ms(self, loc):
879         # Allow 100ms to connect, then 1s for response. Everything
880         # should work, and everything should take at least 200ms to
881         # return.
882         kc = self.keepClient(timeouts=(.1, 1))
883         with self.assertTakesBetween(.2, .3):
884             kc.put(self.DATA, copies=1, num_retries=0)
885         with self.assertTakesBetween(.2, .3):
886             self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
887
888     def _test_response_timeout_under_2s(self, loc):
889         # Allow 10s to connect, then 1s for response. Nothing should
890         # work, and everything should take at least 1s to return.
891         kc = self.keepClient(timeouts=(10, 1))
892         with self.assertTakesBetween(1, 9):
893             with self.assertRaises(arvados.errors.KeepReadError):
894                 kc.get(loc, num_retries=0)
895         with self.assertTakesBetween(1, 9):
896             with self.assertRaises(arvados.errors.KeepWriteError):
897                 kc.put(self.DATA, copies=1, num_retries=0)
898
899
900 class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
901     def mock_disks_and_gateways(self, disks=3, gateways=1):
902         self.gateways = [{
903                 'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
904                 'owner_uuid': 'zzzzz-tpzed-000000000000000',
905                 'service_host': 'gatewayhost{}'.format(i),
906                 'service_port': 12345,
907                 'service_ssl_flag': True,
908                 'service_type': 'gateway:test',
909         } for i in range(gateways)]
910         self.gateway_roots = [
911             "https://{service_host}:{service_port}/".format(**gw)
912             for gw in self.gateways]
913         self.api_client = self.mock_keep_services(
914             count=disks, additional_services=self.gateways)
915         self.keepClient = arvados.KeepClient(api_client=self.api_client)
916
917     @mock.patch('pycurl.Curl')
918     def test_get_with_gateway_hint_first(self, MockCurl):
919         MockCurl.return_value = tutil.FakeCurl.make(
920             code=200, body='foo', headers={'Content-Length': 3})
921         self.mock_disks_and_gateways()
922         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@' + self.gateways[0]['uuid']
923         self.assertEqual(b'foo', self.keepClient.get(locator))
924         self.assertEqual(self.gateway_roots[0]+locator,
925                          MockCurl.return_value.getopt(pycurl.URL).decode())
926         self.assertEqual(True, self.keepClient.head(locator))
927
928     @mock.patch('pycurl.Curl')
929     def test_get_with_gateway_hints_in_order(self, MockCurl):
930         gateways = 4
931         disks = 3
932         mocks = [
933             tutil.FakeCurl.make(code=404, body='')
934             for _ in range(gateways+disks)
935         ]
936         MockCurl.side_effect = tutil.queue_with(mocks)
937         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
938         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
939                            ['K@'+gw['uuid'] for gw in self.gateways])
940         with self.assertRaises(arvados.errors.NotFoundError):
941             self.keepClient.get(locator)
942         # Gateways are tried first, in the order given.
943         for i, root in enumerate(self.gateway_roots):
944             self.assertEqual(root+locator,
945                              mocks[i].getopt(pycurl.URL).decode())
946         # Disk services are tried next.
947         for i in range(gateways, gateways+disks):
948             self.assertRegex(
949                 mocks[i].getopt(pycurl.URL).decode(),
950                 r'keep0x')
951
952     @mock.patch('pycurl.Curl')
953     def test_head_with_gateway_hints_in_order(self, MockCurl):
954         gateways = 4
955         disks = 3
956         mocks = [
957             tutil.FakeCurl.make(code=404, body=b'')
958             for _ in range(gateways+disks)
959         ]
960         MockCurl.side_effect = tutil.queue_with(mocks)
961         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
962         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
963                            ['K@'+gw['uuid'] for gw in self.gateways])
964         with self.assertRaises(arvados.errors.NotFoundError):
965             self.keepClient.head(locator)
966         # Gateways are tried first, in the order given.
967         for i, root in enumerate(self.gateway_roots):
968             self.assertEqual(root+locator,
969                              mocks[i].getopt(pycurl.URL).decode())
970         # Disk services are tried next.
971         for i in range(gateways, gateways+disks):
972             self.assertRegex(
973                 mocks[i].getopt(pycurl.URL).decode(),
974                 r'keep0x')
975
976     @mock.patch('pycurl.Curl')
977     def test_get_with_remote_proxy_hint(self, MockCurl):
978         MockCurl.return_value = tutil.FakeCurl.make(
979             code=200, body=b'foo', headers={'Content-Length': 3})
980         self.mock_disks_and_gateways()
981         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
982         self.assertEqual(b'foo', self.keepClient.get(locator))
983         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
984                          MockCurl.return_value.getopt(pycurl.URL).decode())
985
986     @mock.patch('pycurl.Curl')
987     def test_head_with_remote_proxy_hint(self, MockCurl):
988         MockCurl.return_value = tutil.FakeCurl.make(
989             code=200, body=b'foo', headers={'Content-Length': 3})
990         self.mock_disks_and_gateways()
991         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
992         self.assertEqual(True, self.keepClient.head(locator))
993         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
994                          MockCurl.return_value.getopt(pycurl.URL).decode())
995
996
997 class KeepClientRetryTestMixin(object):
998     # Testing with a local Keep store won't exercise the retry behavior.
999     # Instead, our strategy is:
1000     # * Create a client with one proxy specified (pointed at a black
1001     #   hole), so there's no need to instantiate an API client, and
1002     #   all HTTP requests come from one place.
1003     # * Mock httplib's request method to provide simulated responses.
1004     # This lets us test the retry logic extensively without relying on any
1005     # supporting servers, and prevents side effects in case something hiccups.
1006     # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
1007     # run_method().
1008     #
1009     # Test classes must define TEST_PATCHER to a method that mocks
1010     # out appropriate methods in the client.
1011
1012     PROXY_ADDR = 'http://[%s]:65535/' % (tutil.TEST_HOST,)
1013     TEST_DATA = b'testdata'
1014     TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
1015
1016     def setUp(self):
1017         self.client_kwargs = {'proxy': self.PROXY_ADDR, 'local_store': ''}
1018
1019     def new_client(self, **caller_kwargs):
1020         kwargs = self.client_kwargs.copy()
1021         kwargs.update(caller_kwargs)
1022         return arvados.KeepClient(**kwargs)
1023
1024     def run_method(self, *args, **kwargs):
1025         raise NotImplementedError("test subclasses must define run_method")
1026
1027     def check_success(self, expected=None, *args, **kwargs):
1028         if expected is None:
1029             expected = self.DEFAULT_EXPECT
1030         self.assertEqual(expected, self.run_method(*args, **kwargs))
1031
1032     def check_exception(self, error_class=None, *args, **kwargs):
1033         if error_class is None:
1034             error_class = self.DEFAULT_EXCEPTION
1035         with self.assertRaises(error_class) as err:
1036             self.run_method(*args, **kwargs)
1037         return err
1038
1039     def test_immediate_success(self):
1040         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 200):
1041             self.check_success()
1042
1043     def test_retry_then_success(self):
1044         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1045             self.check_success(num_retries=3)
1046
1047     def test_exception_then_success(self):
1048         with self.TEST_PATCHER(self.DEFAULT_EXPECT, Exception('mock err'), 200):
1049             self.check_success(num_retries=3)
1050
1051     def test_no_default_retry(self):
1052         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1053             self.check_exception()
1054
1055     def test_no_retry_after_permanent_error(self):
1056         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 403, 200):
1057             self.check_exception(num_retries=3)
1058
1059     def test_error_after_retries_exhausted(self):
1060         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 500, 200):
1061             err = self.check_exception(num_retries=1)
1062         self.assertRegex(str(err.exception), r'failed to .* after 2 attempts')
1063
1064     def test_num_retries_instance_fallback(self):
1065         self.client_kwargs['num_retries'] = 3
1066         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1067             self.check_success()
1068
1069
1070 @tutil.skip_sleep
1071 class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1072     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
1073     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1074     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1075     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1076
1077     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1078                    *args, **kwargs):
1079         return self.new_client().get(locator, *args, **kwargs)
1080
1081     def test_specific_exception_when_not_found(self):
1082         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1083             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1084
1085     def test_general_exception_with_mixed_errors(self):
1086         # get should raise a NotFoundError if no server returns the block,
1087         # and a high threshold of servers report that it's not found.
1088         # This test rigs up 50/50 disagreement between two servers, and
1089         # checks that it does not become a NotFoundError.
1090         client = self.new_client()
1091         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1092             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1093                 client.get(self.HINTED_LOCATOR)
1094             self.assertNotIsInstance(
1095                 exc_check.exception, arvados.errors.NotFoundError,
1096                 "mixed errors raised NotFoundError")
1097
1098     def test_hint_server_can_succeed_without_retries(self):
1099         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1100             self.check_success(locator=self.HINTED_LOCATOR)
1101
1102     def test_try_next_server_after_timeout(self):
1103         with tutil.mock_keep_responses(
1104                 (socket.timeout("timed out"), 200),
1105                 (self.DEFAULT_EXPECT, 200)):
1106             self.check_success(locator=self.HINTED_LOCATOR)
1107
1108     def test_retry_data_with_wrong_checksum(self):
1109         with tutil.mock_keep_responses(
1110                 ('baddata', 200),
1111                 (self.DEFAULT_EXPECT, 200)):
1112             self.check_success(locator=self.HINTED_LOCATOR)
1113
1114 @tutil.skip_sleep
1115 class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1116     DEFAULT_EXPECT = True
1117     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1118     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1119     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1120
1121     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1122                    *args, **kwargs):
1123         return self.new_client().head(locator, *args, **kwargs)
1124
1125     def test_specific_exception_when_not_found(self):
1126         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1127             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1128
1129     def test_general_exception_with_mixed_errors(self):
1130         # head should raise a NotFoundError if no server returns the block,
1131         # and a high threshold of servers report that it's not found.
1132         # This test rigs up 50/50 disagreement between two servers, and
1133         # checks that it does not become a NotFoundError.
1134         client = self.new_client()
1135         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1136             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1137                 client.head(self.HINTED_LOCATOR)
1138             self.assertNotIsInstance(
1139                 exc_check.exception, arvados.errors.NotFoundError,
1140                 "mixed errors raised NotFoundError")
1141
1142     def test_hint_server_can_succeed_without_retries(self):
1143         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1144             self.check_success(locator=self.HINTED_LOCATOR)
1145
1146     def test_try_next_server_after_timeout(self):
1147         with tutil.mock_keep_responses(
1148                 (socket.timeout("timed out"), 200),
1149                 (self.DEFAULT_EXPECT, 200)):
1150             self.check_success(locator=self.HINTED_LOCATOR)
1151
1152 @tutil.skip_sleep
1153 class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1154     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
1155     DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
1156     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1157
1158     def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
1159                    copies=1, *args, **kwargs):
1160         return self.new_client().put(data, copies, *args, **kwargs)
1161
1162     def test_do_not_send_multiple_copies_to_same_server(self):
1163         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200):
1164             self.check_exception(copies=2, num_retries=3)
1165
1166
1167 class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
1168
1169     class FakeKeepService(object):
1170         def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
1171             self.delay = delay
1172             self.will_succeed = will_succeed
1173             self.will_raise = will_raise
1174             self._result = {}
1175             self._result['headers'] = {}
1176             self._result['headers']['x-keep-replicas-stored'] = str(replicas)
1177             self._result['body'] = 'foobar'
1178
1179         def put(self, data_hash, data, timeout):
1180             time.sleep(self.delay)
1181             if self.will_raise is not None:
1182                 raise self.will_raise
1183             return self.will_succeed
1184
1185         def last_result(self):
1186             if self.will_succeed:
1187                 return self._result
1188
1189         def finished(self):
1190             return False
1191
1192     def setUp(self):
1193         self.copies = 3
1194         self.pool = arvados.KeepClient.KeepWriterThreadPool(
1195             data = 'foo',
1196             data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1197             max_service_replicas = self.copies,
1198             copies = self.copies
1199         )
1200
1201     def test_only_write_enough_on_success(self):
1202         for i in range(10):
1203             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1204             self.pool.add_task(ks, None)
1205         self.pool.join()
1206         self.assertEqual(self.pool.done(), self.copies)
1207
1208     def test_only_write_enough_on_partial_success(self):
1209         for i in range(5):
1210             ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
1211             self.pool.add_task(ks, None)
1212             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1213             self.pool.add_task(ks, None)
1214         self.pool.join()
1215         self.assertEqual(self.pool.done(), self.copies)
1216
1217     def test_only_write_enough_when_some_crash(self):
1218         for i in range(5):
1219             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1220             self.pool.add_task(ks, None)
1221             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1222             self.pool.add_task(ks, None)
1223         self.pool.join()
1224         self.assertEqual(self.pool.done(), self.copies)
1225
1226     def test_fail_when_too_many_crash(self):
1227         for i in range(self.copies+1):
1228             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1229             self.pool.add_task(ks, None)
1230         for i in range(self.copies-1):
1231             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1232             self.pool.add_task(ks, None)
1233         self.pool.join()
1234         self.assertEqual(self.pool.done(), self.copies-1)
1235
1236
1237 @tutil.skip_sleep
1238 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock):
1239     # Test put()s that need two distinct servers to succeed, possibly
1240     # requiring multiple passes through the retry loop.
1241
1242     def setUp(self):
1243         self.api_client = self.mock_keep_services(count=2)
1244         self.keep_client = arvados.KeepClient(api_client=self.api_client)
1245
1246     def test_success_after_exception(self):
1247         with tutil.mock_keep_responses(
1248                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1249                 Exception('mock err'), 200, 200) as req_mock:
1250             self.keep_client.put('foo', num_retries=1, copies=2)
1251         self.assertEqual(3, req_mock.call_count)
1252
1253     def test_success_after_retryable_error(self):
1254         with tutil.mock_keep_responses(
1255                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1256                 500, 200, 200) as req_mock:
1257             self.keep_client.put('foo', num_retries=1, copies=2)
1258         self.assertEqual(3, req_mock.call_count)
1259
1260     def test_fail_after_final_error(self):
1261         # First retry loop gets a 200 (can't achieve replication by
1262         # storing again on that server) and a 400 (can't retry that
1263         # server at all), so we shouldn't try a third request.
1264         with tutil.mock_keep_responses(
1265                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1266                 200, 400, 200) as req_mock:
1267             with self.assertRaises(arvados.errors.KeepWriteError):
1268                 self.keep_client.put('foo', num_retries=1, copies=2)
1269         self.assertEqual(2, req_mock.call_count)
1270
1271 class KeepClientAPIErrorTest(unittest.TestCase):
1272     def test_api_fail(self):
1273         class ApiMock(object):
1274             def __getattr__(self, r):
1275                 if r == "api_token":
1276                     return "abc"
1277                 elif r == "insecure":
1278                     return False
1279                 else:
1280                     raise arvados.errors.KeepReadError()
1281         keep_client = arvados.KeepClient(api_client=ApiMock(),
1282                                              proxy='', local_store='')
1283
1284         # The bug this is testing for is that if an API (not
1285         # keepstore) exception is thrown as part of a get(), the next
1286         # attempt to get that same block will result in a deadlock.
1287         # This is why there are two get()s in a row.  Unfortunately,
1288         # the failure mode for this test is that the test suite
1289         # deadlocks, there isn't a good way to avoid that without
1290         # adding a special case that has no use except for this test.
1291
1292         with self.assertRaises(arvados.errors.KeepReadError):
1293             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1294         with self.assertRaises(arvados.errors.KeepReadError):
1295             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")