13647: Use cluster config instead of custom keepstore config.
[arvados.git] / sdk / python / tests / test_keep_client.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 standard_library.install_aliases()
9 from builtins import str
10 from builtins import range
11 from builtins import object
12 import hashlib
13 import mock
14 import os
15 import pycurl
16 import random
17 import re
18 import socket
19 import sys
20 import time
21 import unittest
22 import urllib.parse
23
24 import arvados
25 import arvados.retry
26 import arvados.util
27 from . import arvados_testutil as tutil
28 from . import keepstub
29 from . import run_test_server
30
31 class KeepTestCase(run_test_server.TestCaseWithServers):
32     MAIN_SERVER = {}
33     KEEP_SERVER = {}
34
35     @classmethod
36     def setUpClass(cls):
37         super(KeepTestCase, cls).setUpClass()
38         run_test_server.authorize_with("admin")
39         cls.api_client = arvados.api('v1')
40         cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
41                                              proxy='', local_store='')
42
43     def test_KeepBasicRWTest(self):
44         self.assertEqual(0, self.keep_client.upload_counter.get())
45         foo_locator = self.keep_client.put('foo')
46         self.assertRegex(
47             foo_locator,
48             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
49             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
50
51         # 6 bytes because uploaded 2 copies
52         self.assertEqual(6, self.keep_client.upload_counter.get())
53
54         self.assertEqual(0, self.keep_client.download_counter.get())
55         self.assertEqual(self.keep_client.get(foo_locator),
56                          b'foo',
57                          'wrong content from Keep.get(md5("foo"))')
58         self.assertEqual(3, self.keep_client.download_counter.get())
59
60     def test_KeepBinaryRWTest(self):
61         blob_str = b'\xff\xfe\xf7\x00\x01\x02'
62         blob_locator = self.keep_client.put(blob_str)
63         self.assertRegex(
64             blob_locator,
65             '^7fc7c53b45e53926ba52821140fef396\+6',
66             ('wrong locator from Keep.put(<binarydata>):' + blob_locator))
67         self.assertEqual(self.keep_client.get(blob_locator),
68                          blob_str,
69                          'wrong content from Keep.get(md5(<binarydata>))')
70
71     def test_KeepLongBinaryRWTest(self):
72         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
73         for i in range(0,23):
74             blob_data = blob_data + blob_data
75         blob_locator = self.keep_client.put(blob_data)
76         self.assertRegex(
77             blob_locator,
78             '^84d90fc0d8175dd5dcfab04b999bc956\+67108864',
79             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
80         self.assertEqual(self.keep_client.get(blob_locator),
81                          blob_data,
82                          'wrong content from Keep.get(md5(<binarydata>))')
83
84     @unittest.skip("unreliable test - please fix and close #8752")
85     def test_KeepSingleCopyRWTest(self):
86         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
87         blob_locator = self.keep_client.put(blob_data, copies=1)
88         self.assertRegex(
89             blob_locator,
90             '^c902006bc98a3eb4a3663b65ab4a6fab\+8',
91             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
92         self.assertEqual(self.keep_client.get(blob_locator),
93                          blob_data,
94                          'wrong content from Keep.get(md5(<binarydata>))')
95
96     def test_KeepEmptyCollectionTest(self):
97         blob_locator = self.keep_client.put('', copies=1)
98         self.assertRegex(
99             blob_locator,
100             '^d41d8cd98f00b204e9800998ecf8427e\+0',
101             ('wrong locator from Keep.put(""): ' + blob_locator))
102
103     def test_unicode_must_be_ascii(self):
104         # If unicode type, must only consist of valid ASCII
105         foo_locator = self.keep_client.put(u'foo')
106         self.assertRegex(
107             foo_locator,
108             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
109             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
110
111         if sys.version_info < (3, 0):
112             with self.assertRaises(UnicodeEncodeError):
113                 # Error if it is not ASCII
114                 self.keep_client.put(u'\xe2')
115
116         with self.assertRaises(AttributeError):
117             # Must be bytes or have an encode() method
118             self.keep_client.put({})
119
120     def test_KeepHeadTest(self):
121         locator = self.keep_client.put('test_head')
122         self.assertRegex(
123             locator,
124             '^b9a772c7049325feb7130fff1f8333e9\+9',
125             'wrong md5 hash from Keep.put for "test_head": ' + locator)
126         self.assertEqual(True, self.keep_client.head(locator))
127         self.assertEqual(self.keep_client.get(locator),
128                          b'test_head',
129                          'wrong content from Keep.get for "test_head"')
130
131 class KeepPermissionTestCase(run_test_server.TestCaseWithServers):
132     MAIN_SERVER = {}
133     KEEP_SERVER = {'blob_signing': True}
134
135     def test_KeepBasicRWTest(self):
136         run_test_server.authorize_with('active')
137         keep_client = arvados.KeepClient()
138         foo_locator = keep_client.put('foo')
139         self.assertRegex(
140             foo_locator,
141             r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
142             'invalid locator from Keep.put("foo"): ' + foo_locator)
143         self.assertEqual(keep_client.get(foo_locator),
144                          b'foo',
145                          'wrong content from Keep.get(md5("foo"))')
146
147         # GET with an unsigned locator => NotFound
148         bar_locator = keep_client.put('bar')
149         unsigned_bar_locator = "37b51d194a7513e45b56f6524f2d51f2+3"
150         self.assertRegex(
151             bar_locator,
152             r'^37b51d194a7513e45b56f6524f2d51f2\+3\+A[a-f0-9]+@[a-f0-9]+$',
153             'invalid locator from Keep.put("bar"): ' + bar_locator)
154         self.assertRaises(arvados.errors.NotFoundError,
155                           keep_client.get,
156                           unsigned_bar_locator)
157
158         # GET from a different user => NotFound
159         run_test_server.authorize_with('spectator')
160         self.assertRaises(arvados.errors.NotFoundError,
161                           arvados.Keep.get,
162                           bar_locator)
163
164         # Unauthenticated GET for a signed locator => NotFound
165         # Unauthenticated GET for an unsigned locator => NotFound
166         keep_client.api_token = ''
167         self.assertRaises(arvados.errors.NotFoundError,
168                           keep_client.get,
169                           bar_locator)
170         self.assertRaises(arvados.errors.NotFoundError,
171                           keep_client.get,
172                           unsigned_bar_locator)
173
174
175 class KeepProxyTestCase(run_test_server.TestCaseWithServers):
176     MAIN_SERVER = {}
177     KEEP_SERVER = {}
178     KEEP_PROXY_SERVER = {}
179
180     @classmethod
181     def setUpClass(cls):
182         super(KeepProxyTestCase, cls).setUpClass()
183         run_test_server.authorize_with('active')
184         cls.api_client = arvados.api('v1')
185
186     def tearDown(self):
187         arvados.config.settings().pop('ARVADOS_EXTERNAL_CLIENT', None)
188         super(KeepProxyTestCase, self).tearDown()
189
190     def test_KeepProxyTest1(self):
191         # Will use ARVADOS_KEEP_SERVICES environment variable that
192         # is set by setUpClass().
193         keep_client = arvados.KeepClient(api_client=self.api_client,
194                                          local_store='')
195         baz_locator = keep_client.put('baz')
196         self.assertRegex(
197             baz_locator,
198             '^73feffa4b7f6bb68e44cf984c85f6e88\+3',
199             'wrong md5 hash from Keep.put("baz"): ' + baz_locator)
200         self.assertEqual(keep_client.get(baz_locator),
201                          b'baz',
202                          'wrong content from Keep.get(md5("baz"))')
203         self.assertTrue(keep_client.using_proxy)
204
205     def test_KeepProxyTest2(self):
206         # Don't instantiate the proxy directly, but set the X-External-Client
207         # header.  The API server should direct us to the proxy.
208         arvados.config.settings()['ARVADOS_EXTERNAL_CLIENT'] = 'true'
209         keep_client = arvados.KeepClient(api_client=self.api_client,
210                                          proxy='', local_store='')
211         baz_locator = keep_client.put('baz2')
212         self.assertRegex(
213             baz_locator,
214             '^91f372a266fe2bf2823cb8ec7fda31ce\+4',
215             'wrong md5 hash from Keep.put("baz2"): ' + baz_locator)
216         self.assertEqual(keep_client.get(baz_locator),
217                          b'baz2',
218                          'wrong content from Keep.get(md5("baz2"))')
219         self.assertTrue(keep_client.using_proxy)
220
221     def test_KeepProxyTestMultipleURIs(self):
222         # Test using ARVADOS_KEEP_SERVICES env var overriding any
223         # existing proxy setting and setting multiple proxies
224         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
225         keep_client = arvados.KeepClient(api_client=self.api_client,
226                                          local_store='')
227         uris = [x['_service_root'] for x in keep_client._keep_services]
228         self.assertEqual(uris, ['http://10.0.0.1/',
229                                 'https://foo.example.org:1234/'])
230
231     def test_KeepProxyTestInvalidURI(self):
232         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org'
233         with self.assertRaises(arvados.errors.ArgumentError):
234             keep_client = arvados.KeepClient(api_client=self.api_client,
235                                              local_store='')
236
237
238 class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
239     def get_service_roots(self, api_client):
240         keep_client = arvados.KeepClient(api_client=api_client)
241         services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
242         return [urllib.parse.urlparse(url) for url in sorted(services)]
243
244     def test_ssl_flag_respected_in_roots(self):
245         for ssl_flag in [False, True]:
246             services = self.get_service_roots(self.mock_keep_services(
247                 service_ssl_flag=ssl_flag))
248             self.assertEqual(
249                 ('https' if ssl_flag else 'http'), services[0].scheme)
250
251     def test_correct_ports_with_ipv6_addresses(self):
252         service = self.get_service_roots(self.mock_keep_services(
253             service_type='proxy', service_host='100::1', service_port=10, count=1))[0]
254         self.assertEqual('100::1', service.hostname)
255         self.assertEqual(10, service.port)
256
257     def test_insecure_disables_tls_verify(self):
258         api_client = self.mock_keep_services(count=1)
259         force_timeout = socket.timeout("timed out")
260
261         api_client.insecure = True
262         with tutil.mock_keep_responses(b'foo', 200) as mock:
263             keep_client = arvados.KeepClient(api_client=api_client)
264             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
265             self.assertEqual(
266                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
267                 0)
268
269         api_client.insecure = False
270         with tutil.mock_keep_responses(b'foo', 200) as mock:
271             keep_client = arvados.KeepClient(api_client=api_client)
272             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
273             # getopt()==None here means we didn't change the
274             # default. If we were using real pycurl instead of a mock,
275             # it would return the default value 1.
276             self.assertEqual(
277                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
278                 None)
279
280     def test_refresh_signature(self):
281         blk_digest = '6f5902ac237024bdd0c176cb93063dc4+11'
282         blk_sig = 'da39a3ee5e6b4b0d3255bfef95601890afd80709@53bed294'
283         local_loc = blk_digest+'+A'+blk_sig
284         remote_loc = blk_digest+'+R'+blk_sig
285         api_client = self.mock_keep_services(count=1)
286         headers = {'X-Keep-Locator':local_loc}
287         with tutil.mock_keep_responses('', 200, **headers):
288             # Check that the translated locator gets returned
289             keep_client = arvados.KeepClient(api_client=api_client)
290             self.assertEqual(local_loc, keep_client.refresh_signature(remote_loc))
291             # Check that refresh_signature() uses the correct method and headers
292             keep_client._get_or_head = mock.MagicMock()
293             keep_client.refresh_signature(remote_loc)
294             args, kwargs = keep_client._get_or_head.call_args_list[0]
295             self.assertIn(remote_loc, args)
296             self.assertEqual("HEAD", kwargs['method'])
297             self.assertIn('X-Keep-Signature', kwargs['headers'])
298
299     # test_*_timeout verify that KeepClient instructs pycurl to use
300     # the appropriate connection and read timeouts. They don't care
301     # whether pycurl actually exhibits the expected timeout behavior
302     # -- those tests are in the KeepClientTimeout test class.
303
304     def test_get_timeout(self):
305         api_client = self.mock_keep_services(count=1)
306         force_timeout = socket.timeout("timed out")
307         with tutil.mock_keep_responses(force_timeout, 0) as mock:
308             keep_client = arvados.KeepClient(api_client=api_client)
309             with self.assertRaises(arvados.errors.KeepReadError):
310                 keep_client.get('ffffffffffffffffffffffffffffffff')
311             self.assertEqual(
312                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
313                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
314             self.assertEqual(
315                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
316                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
317             self.assertEqual(
318                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
319                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
320
321     def test_put_timeout(self):
322         api_client = self.mock_keep_services(count=1)
323         force_timeout = socket.timeout("timed out")
324         with tutil.mock_keep_responses(force_timeout, 0) as mock:
325             keep_client = arvados.KeepClient(api_client=api_client)
326             with self.assertRaises(arvados.errors.KeepWriteError):
327                 keep_client.put(b'foo')
328             self.assertEqual(
329                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
330                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
331             self.assertEqual(
332                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
333                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
334             self.assertEqual(
335                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
336                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
337
338     def test_head_timeout(self):
339         api_client = self.mock_keep_services(count=1)
340         force_timeout = socket.timeout("timed out")
341         with tutil.mock_keep_responses(force_timeout, 0) as mock:
342             keep_client = arvados.KeepClient(api_client=api_client)
343             with self.assertRaises(arvados.errors.KeepReadError):
344                 keep_client.head('ffffffffffffffffffffffffffffffff')
345             self.assertEqual(
346                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
347                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
348             self.assertEqual(
349                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
350                 None)
351             self.assertEqual(
352                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
353                 None)
354
355     def test_proxy_get_timeout(self):
356         api_client = self.mock_keep_services(service_type='proxy', count=1)
357         force_timeout = socket.timeout("timed out")
358         with tutil.mock_keep_responses(force_timeout, 0) as mock:
359             keep_client = arvados.KeepClient(api_client=api_client)
360             with self.assertRaises(arvados.errors.KeepReadError):
361                 keep_client.get('ffffffffffffffffffffffffffffffff')
362             self.assertEqual(
363                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
364                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
365             self.assertEqual(
366                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
367                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
368             self.assertEqual(
369                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
370                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
371
372     def test_proxy_head_timeout(self):
373         api_client = self.mock_keep_services(service_type='proxy', count=1)
374         force_timeout = socket.timeout("timed out")
375         with tutil.mock_keep_responses(force_timeout, 0) as mock:
376             keep_client = arvados.KeepClient(api_client=api_client)
377             with self.assertRaises(arvados.errors.KeepReadError):
378                 keep_client.head('ffffffffffffffffffffffffffffffff')
379             self.assertEqual(
380                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
381                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
382             self.assertEqual(
383                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
384                 None)
385             self.assertEqual(
386                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
387                 None)
388
389     def test_proxy_put_timeout(self):
390         api_client = self.mock_keep_services(service_type='proxy', count=1)
391         force_timeout = socket.timeout("timed out")
392         with tutil.mock_keep_responses(force_timeout, 0) as mock:
393             keep_client = arvados.KeepClient(api_client=api_client)
394             with self.assertRaises(arvados.errors.KeepWriteError):
395                 keep_client.put('foo')
396             self.assertEqual(
397                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
398                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
399             self.assertEqual(
400                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
401                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
402             self.assertEqual(
403                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
404                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
405
406     def check_no_services_error(self, verb, exc_class):
407         api_client = mock.MagicMock(name='api_client')
408         api_client.keep_services().accessible().execute.side_effect = (
409             arvados.errors.ApiError)
410         keep_client = arvados.KeepClient(api_client=api_client)
411         with self.assertRaises(exc_class) as err_check:
412             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
413         self.assertEqual(0, len(err_check.exception.request_errors()))
414
415     def test_get_error_with_no_services(self):
416         self.check_no_services_error('get', arvados.errors.KeepReadError)
417
418     def test_head_error_with_no_services(self):
419         self.check_no_services_error('head', arvados.errors.KeepReadError)
420
421     def test_put_error_with_no_services(self):
422         self.check_no_services_error('put', arvados.errors.KeepWriteError)
423
424     def check_errors_from_last_retry(self, verb, exc_class):
425         api_client = self.mock_keep_services(count=2)
426         req_mock = tutil.mock_keep_responses(
427             "retry error reporting test", 500, 500, 403, 403)
428         with req_mock, tutil.skip_sleep, \
429                 self.assertRaises(exc_class) as err_check:
430             keep_client = arvados.KeepClient(api_client=api_client)
431             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
432                                        num_retries=3)
433         self.assertEqual([403, 403], [
434                 getattr(error, 'status_code', None)
435                 for error in err_check.exception.request_errors().values()])
436
437     def test_get_error_reflects_last_retry(self):
438         self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
439
440     def test_head_error_reflects_last_retry(self):
441         self.check_errors_from_last_retry('head', arvados.errors.KeepReadError)
442
443     def test_put_error_reflects_last_retry(self):
444         self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
445
446     def test_put_error_does_not_include_successful_puts(self):
447         data = 'partial failure test'
448         data_loc = tutil.str_keep_locator(data)
449         api_client = self.mock_keep_services(count=3)
450         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
451                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
452             keep_client = arvados.KeepClient(api_client=api_client)
453             keep_client.put(data)
454         self.assertEqual(2, len(exc_check.exception.request_errors()))
455
456     def test_proxy_put_with_no_writable_services(self):
457         data = 'test with no writable services'
458         data_loc = tutil.str_keep_locator(data)
459         api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
460         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
461                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
462           keep_client = arvados.KeepClient(api_client=api_client)
463           keep_client.put(data)
464         self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
465         self.assertEqual(0, len(exc_check.exception.request_errors()))
466
467     def test_oddball_service_get(self):
468         body = b'oddball service get'
469         api_client = self.mock_keep_services(service_type='fancynewblobstore')
470         with tutil.mock_keep_responses(body, 200):
471             keep_client = arvados.KeepClient(api_client=api_client)
472             actual = keep_client.get(tutil.str_keep_locator(body))
473         self.assertEqual(body, actual)
474
475     def test_oddball_service_put(self):
476         body = b'oddball service put'
477         pdh = tutil.str_keep_locator(body)
478         api_client = self.mock_keep_services(service_type='fancynewblobstore')
479         with tutil.mock_keep_responses(pdh, 200):
480             keep_client = arvados.KeepClient(api_client=api_client)
481             actual = keep_client.put(body, copies=1)
482         self.assertEqual(pdh, actual)
483
484     def test_oddball_service_writer_count(self):
485         body = b'oddball service writer count'
486         pdh = tutil.str_keep_locator(body)
487         api_client = self.mock_keep_services(service_type='fancynewblobstore',
488                                              count=4)
489         headers = {'x-keep-replicas-stored': 3}
490         with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
491                                        **headers) as req_mock:
492             keep_client = arvados.KeepClient(api_client=api_client)
493             actual = keep_client.put(body, copies=2)
494         self.assertEqual(pdh, actual)
495         self.assertEqual(1, req_mock.call_count)
496
497
498 @tutil.skip_sleep
499 class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
500     def setUp(self):
501         self.api_client = self.mock_keep_services(count=2)
502         self.keep_client = arvados.KeepClient(api_client=self.api_client)
503         self.data = b'xyzzy'
504         self.locator = '1271ed5ef305aadabc605b1609e24c52'
505
506     @mock.patch('arvados.KeepClient.KeepService.get')
507     def test_get_request_cache(self, get_mock):
508         with tutil.mock_keep_responses(self.data, 200, 200):
509             self.keep_client.get(self.locator)
510             self.keep_client.get(self.locator)
511         # Request already cached, don't require more than one request
512         get_mock.assert_called_once()
513
514     @mock.patch('arvados.KeepClient.KeepService.get')
515     def test_head_request_cache(self, get_mock):
516         with tutil.mock_keep_responses(self.data, 200, 200):
517             self.keep_client.head(self.locator)
518             self.keep_client.head(self.locator)
519         # Don't cache HEAD requests so that they're not confused with GET reqs
520         self.assertEqual(2, get_mock.call_count)
521
522     @mock.patch('arvados.KeepClient.KeepService.get')
523     def test_head_and_then_get_return_different_responses(self, get_mock):
524         head_resp = None
525         get_resp = None
526         get_mock.side_effect = ['first response', 'second response']
527         with tutil.mock_keep_responses(self.data, 200, 200):
528             head_resp = self.keep_client.head(self.locator)
529             get_resp = self.keep_client.get(self.locator)
530         self.assertEqual('first response', head_resp)
531         # First reponse was not cached because it was from a HEAD request.
532         self.assertNotEqual(head_resp, get_resp)
533
534
535 @tutil.skip_sleep
536 class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock):
537     def setUp(self):
538         self.api_client = self.mock_keep_services(count=2)
539         self.keep_client = arvados.KeepClient(api_client=self.api_client)
540         self.data = b'xyzzy'
541         self.locator = '1271ed5ef305aadabc605b1609e24c52'
542         self.test_id = arvados.util.new_request_id()
543         self.assertRegex(self.test_id, r'^req-[a-z0-9]{20}$')
544         # If we don't set request_id to None explicitly here, it will
545         # return <MagicMock name='api_client_mock.request_id'
546         # id='123456789'>:
547         self.api_client.request_id = None
548
549     def test_default_to_api_client_request_id(self):
550         self.api_client.request_id = self.test_id
551         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
552             self.keep_client.put(self.data)
553         self.assertEqual(2, len(mock.responses))
554         for resp in mock.responses:
555             self.assertProvidedRequestId(resp)
556
557         with tutil.mock_keep_responses(self.data, 200) as mock:
558             self.keep_client.get(self.locator)
559         self.assertProvidedRequestId(mock.responses[0])
560
561         with tutil.mock_keep_responses(b'', 200) as mock:
562             self.keep_client.head(self.locator)
563         self.assertProvidedRequestId(mock.responses[0])
564
565     def test_explicit_request_id(self):
566         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
567             self.keep_client.put(self.data, request_id=self.test_id)
568         self.assertEqual(2, len(mock.responses))
569         for resp in mock.responses:
570             self.assertProvidedRequestId(resp)
571
572         with tutil.mock_keep_responses(self.data, 200) as mock:
573             self.keep_client.get(self.locator, request_id=self.test_id)
574         self.assertProvidedRequestId(mock.responses[0])
575
576         with tutil.mock_keep_responses(b'', 200) as mock:
577             self.keep_client.head(self.locator, request_id=self.test_id)
578         self.assertProvidedRequestId(mock.responses[0])
579
580     def test_automatic_request_id(self):
581         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
582             self.keep_client.put(self.data)
583         self.assertEqual(2, len(mock.responses))
584         for resp in mock.responses:
585             self.assertAutomaticRequestId(resp)
586
587         with tutil.mock_keep_responses(self.data, 200) as mock:
588             self.keep_client.get(self.locator)
589         self.assertAutomaticRequestId(mock.responses[0])
590
591         with tutil.mock_keep_responses(b'', 200) as mock:
592             self.keep_client.head(self.locator)
593         self.assertAutomaticRequestId(mock.responses[0])
594
595     def assertAutomaticRequestId(self, resp):
596         hdr = [x for x in resp.getopt(pycurl.HTTPHEADER)
597                if x.startswith('X-Request-Id: ')][0]
598         self.assertNotEqual(hdr, 'X-Request-Id: '+self.test_id)
599         self.assertRegex(hdr, r'^X-Request-Id: req-[a-z0-9]{20}$')
600
601     def assertProvidedRequestId(self, resp):
602         self.assertIn('X-Request-Id: '+self.test_id,
603                       resp.getopt(pycurl.HTTPHEADER))
604
605
606 @tutil.skip_sleep
607 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
608
609     def setUp(self):
610         # expected_order[i] is the probe order for
611         # hash=md5(sprintf("%064x",i)) where there are 16 services
612         # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
613         # the first probe for the block consisting of 64 "0"
614         # characters is the service whose uuid is
615         # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
616         self.services = 16
617         self.expected_order = [
618             list('3eab2d5fc9681074'),
619             list('097dba52e648f1c3'),
620             list('c5b4e023f8a7d691'),
621             list('9d81c02e76a3bf54'),
622             ]
623         self.blocks = [
624             "{:064x}".format(x).encode()
625             for x in range(len(self.expected_order))]
626         self.hashes = [
627             hashlib.md5(self.blocks[x]).hexdigest()
628             for x in range(len(self.expected_order))]
629         self.api_client = self.mock_keep_services(count=self.services)
630         self.keep_client = arvados.KeepClient(api_client=self.api_client)
631
632     def test_weighted_service_roots_against_reference_set(self):
633         # Confirm weighted_service_roots() returns the correct order
634         for i, hash in enumerate(self.hashes):
635             roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash))
636             got_order = [
637                 re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
638                 for root in roots]
639             self.assertEqual(self.expected_order[i], got_order)
640
641     def test_get_probe_order_against_reference_set(self):
642         self._test_probe_order_against_reference_set(
643             lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
644
645     def test_head_probe_order_against_reference_set(self):
646         self._test_probe_order_against_reference_set(
647             lambda i: self.keep_client.head(self.hashes[i], num_retries=1))
648
649     def test_put_probe_order_against_reference_set(self):
650         # copies=1 prevents the test from being sensitive to races
651         # between writer threads.
652         self._test_probe_order_against_reference_set(
653             lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1))
654
655     def _test_probe_order_against_reference_set(self, op):
656         for i in range(len(self.blocks)):
657             with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \
658                  self.assertRaises(arvados.errors.KeepRequestError):
659                 op(i)
660             got_order = [
661                 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
662                 for resp in mock.responses]
663             self.assertEqual(self.expected_order[i]*2, got_order)
664
665     def test_put_probe_order_multiple_copies(self):
666         for copies in range(2, 4):
667             for i in range(len(self.blocks)):
668                 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \
669                      self.assertRaises(arvados.errors.KeepWriteError):
670                     self.keep_client.put(self.blocks[i], num_retries=2, copies=copies)
671                 got_order = [
672                     re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
673                     for resp in mock.responses]
674                 # With T threads racing to make requests, the position
675                 # of a given server in the sequence of HTTP requests
676                 # (got_order) cannot be more than T-1 positions
677                 # earlier than that server's position in the reference
678                 # probe sequence (expected_order).
679                 #
680                 # Loop invariant: we have accounted for +pos+ expected
681                 # probes, either by seeing them in +got_order+ or by
682                 # putting them in +pending+ in the hope of seeing them
683                 # later. As long as +len(pending)<T+, we haven't
684                 # started a request too early.
685                 pending = []
686                 for pos, expected in enumerate(self.expected_order[i]*3):
687                     got = got_order[pos-len(pending)]
688                     while got in pending:
689                         del pending[pending.index(got)]
690                         got = got_order[pos-len(pending)]
691                     if got != expected:
692                         pending.append(expected)
693                         self.assertLess(
694                             len(pending), copies,
695                             "pending={}, with copies={}, got {}, expected {}".format(
696                                 pending, copies, repr(got_order), repr(self.expected_order[i]*3)))
697
698     def test_probe_waste_adding_one_server(self):
699         hashes = [
700             hashlib.md5("{:064x}".format(x).encode()).hexdigest() for x in range(100)]
701         initial_services = 12
702         self.api_client = self.mock_keep_services(count=initial_services)
703         self.keep_client = arvados.KeepClient(api_client=self.api_client)
704         probes_before = [
705             self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
706         for added_services in range(1, 12):
707             api_client = self.mock_keep_services(count=initial_services+added_services)
708             keep_client = arvados.KeepClient(api_client=api_client)
709             total_penalty = 0
710             for hash_index in range(len(hashes)):
711                 probe_after = keep_client.weighted_service_roots(
712                     arvados.KeepLocator(hashes[hash_index]))
713                 penalty = probe_after.index(probes_before[hash_index][0])
714                 self.assertLessEqual(penalty, added_services)
715                 total_penalty += penalty
716             # Average penalty per block should not exceed
717             # N(added)/N(orig) by more than 20%, and should get closer
718             # to the ideal as we add data points.
719             expect_penalty = (
720                 added_services *
721                 len(hashes) / initial_services)
722             max_penalty = (
723                 expect_penalty *
724                 (120 - added_services)/100)
725             min_penalty = (
726                 expect_penalty * 8/10)
727             self.assertTrue(
728                 min_penalty <= total_penalty <= max_penalty,
729                 "With {}+{} services, {} blocks, penalty {} but expected {}..{}".format(
730                     initial_services,
731                     added_services,
732                     len(hashes),
733                     total_penalty,
734                     min_penalty,
735                     max_penalty))
736
737     def check_64_zeros_error_order(self, verb, exc_class):
738         data = b'0' * 64
739         if verb == 'get':
740             data = tutil.str_keep_locator(data)
741         # Arbitrary port number:
742         aport = random.randint(1024,65535)
743         api_client = self.mock_keep_services(service_port=aport, count=self.services)
744         keep_client = arvados.KeepClient(api_client=api_client)
745         with mock.patch('pycurl.Curl') as curl_mock, \
746              self.assertRaises(exc_class) as err_check:
747             curl_mock.return_value = tutil.FakeCurl.make(code=500, body=b'')
748             getattr(keep_client, verb)(data)
749         urls = [urllib.parse.urlparse(url)
750                 for url in err_check.exception.request_errors()]
751         self.assertEqual([('keep0x' + c, aport) for c in '3eab2d5fc9681074'],
752                          [(url.hostname, url.port) for url in urls])
753
754     def test_get_error_shows_probe_order(self):
755         self.check_64_zeros_error_order('get', arvados.errors.KeepReadError)
756
757     def test_put_error_shows_probe_order(self):
758         self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
759
760
761 class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase):
762     # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
763     # 1s worth of data and then trigger bandwidth errors before running
764     # out of data.
765     DATA = b'x'*2**11
766     BANDWIDTH_LOW_LIM = 1024
767     TIMEOUT_TIME = 1.0
768
769     class assertTakesBetween(unittest.TestCase):
770         def __init__(self, tmin, tmax):
771             self.tmin = tmin
772             self.tmax = tmax
773
774         def __enter__(self):
775             self.t0 = time.time()
776
777         def __exit__(self, *args, **kwargs):
778             # Round times to milliseconds, like CURL. Otherwise, we
779             # fail when CURL reaches a 1s timeout at 0.9998s.
780             delta = round(time.time() - self.t0, 3)
781             self.assertGreaterEqual(delta, self.tmin)
782             self.assertLessEqual(delta, self.tmax)
783
784     class assertTakesGreater(unittest.TestCase):
785         def __init__(self, tmin):
786             self.tmin = tmin
787
788         def __enter__(self):
789             self.t0 = time.time()
790
791         def __exit__(self, *args, **kwargs):
792             delta = round(time.time() - self.t0, 3)
793             self.assertGreaterEqual(delta, self.tmin)
794
795     def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
796         return arvados.KeepClient(
797             api_client=self.api_client,
798             timeout=timeouts)
799
800     def test_timeout_slow_connect(self):
801         # Can't simulate TCP delays with our own socket. Leave our
802         # stub server running uselessly, and try to connect to an
803         # unroutable IP address instead.
804         self.api_client = self.mock_keep_services(
805             count=1,
806             service_host='240.0.0.0',
807         )
808         with self.assertTakesBetween(0.1, 0.5):
809             with self.assertRaises(arvados.errors.KeepWriteError):
810                 self.keepClient().put(self.DATA, copies=1, num_retries=0)
811
812     def test_low_bandwidth_no_delays_success(self):
813         self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
814         kc = self.keepClient()
815         loc = kc.put(self.DATA, copies=1, num_retries=0)
816         self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
817
818     def test_too_low_bandwidth_no_delays_failure(self):
819         # Check that lessening bandwidth corresponds to failing
820         kc = self.keepClient()
821         loc = kc.put(self.DATA, copies=1, num_retries=0)
822         self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
823         with self.assertTakesGreater(self.TIMEOUT_TIME):
824             with self.assertRaises(arvados.errors.KeepReadError):
825                 kc.get(loc, num_retries=0)
826         with self.assertTakesGreater(self.TIMEOUT_TIME):
827             with self.assertRaises(arvados.errors.KeepWriteError):
828                 kc.put(self.DATA, copies=1, num_retries=0)
829
830     def test_low_bandwidth_with_server_response_delay_failure(self):
831         kc = self.keepClient()
832         loc = kc.put(self.DATA, copies=1, num_retries=0)
833         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
834         self.server.setdelays(response=self.TIMEOUT_TIME)
835         with self.assertTakesGreater(self.TIMEOUT_TIME):
836             with self.assertRaises(arvados.errors.KeepReadError):
837                 kc.get(loc, num_retries=0)
838         with self.assertTakesGreater(self.TIMEOUT_TIME):
839             with self.assertRaises(arvados.errors.KeepWriteError):
840                 kc.put(self.DATA, copies=1, num_retries=0)
841         with self.assertTakesGreater(self.TIMEOUT_TIME):
842             kc.head(loc, num_retries=0)
843
844     def test_low_bandwidth_with_server_mid_delay_failure(self):
845         kc = self.keepClient()
846         loc = kc.put(self.DATA, copies=1, num_retries=0)
847         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
848         self.server.setdelays(mid_write=self.TIMEOUT_TIME, mid_read=self.TIMEOUT_TIME)
849         with self.assertTakesGreater(self.TIMEOUT_TIME):
850             with self.assertRaises(arvados.errors.KeepReadError) as e:
851                 kc.get(loc, num_retries=0)
852         with self.assertTakesGreater(self.TIMEOUT_TIME):
853             with self.assertRaises(arvados.errors.KeepWriteError):
854                 kc.put(self.DATA, copies=1, num_retries=0)
855
856     def test_timeout_slow_request(self):
857         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
858         self.server.setdelays(request=.2)
859         self._test_connect_timeout_under_200ms(loc)
860         self.server.setdelays(request=2)
861         self._test_response_timeout_under_2s(loc)
862
863     def test_timeout_slow_response(self):
864         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
865         self.server.setdelays(response=.2)
866         self._test_connect_timeout_under_200ms(loc)
867         self.server.setdelays(response=2)
868         self._test_response_timeout_under_2s(loc)
869
870     def test_timeout_slow_response_body(self):
871         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
872         self.server.setdelays(response_body=.2)
873         self._test_connect_timeout_under_200ms(loc)
874         self.server.setdelays(response_body=2)
875         self._test_response_timeout_under_2s(loc)
876
877     def _test_connect_timeout_under_200ms(self, loc):
878         # Allow 100ms to connect, then 1s for response. Everything
879         # should work, and everything should take at least 200ms to
880         # return.
881         kc = self.keepClient(timeouts=(.1, 1))
882         with self.assertTakesBetween(.2, .3):
883             kc.put(self.DATA, copies=1, num_retries=0)
884         with self.assertTakesBetween(.2, .3):
885             self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
886
887     def _test_response_timeout_under_2s(self, loc):
888         # Allow 10s to connect, then 1s for response. Nothing should
889         # work, and everything should take at least 1s to return.
890         kc = self.keepClient(timeouts=(10, 1))
891         with self.assertTakesBetween(1, 9):
892             with self.assertRaises(arvados.errors.KeepReadError):
893                 kc.get(loc, num_retries=0)
894         with self.assertTakesBetween(1, 9):
895             with self.assertRaises(arvados.errors.KeepWriteError):
896                 kc.put(self.DATA, copies=1, num_retries=0)
897
898
899 class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
900     def mock_disks_and_gateways(self, disks=3, gateways=1):
901         self.gateways = [{
902                 'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
903                 'owner_uuid': 'zzzzz-tpzed-000000000000000',
904                 'service_host': 'gatewayhost{}'.format(i),
905                 'service_port': 12345,
906                 'service_ssl_flag': True,
907                 'service_type': 'gateway:test',
908         } for i in range(gateways)]
909         self.gateway_roots = [
910             "https://{service_host}:{service_port}/".format(**gw)
911             for gw in self.gateways]
912         self.api_client = self.mock_keep_services(
913             count=disks, additional_services=self.gateways)
914         self.keepClient = arvados.KeepClient(api_client=self.api_client)
915
916     @mock.patch('pycurl.Curl')
917     def test_get_with_gateway_hint_first(self, MockCurl):
918         MockCurl.return_value = tutil.FakeCurl.make(
919             code=200, body='foo', headers={'Content-Length': 3})
920         self.mock_disks_and_gateways()
921         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@' + self.gateways[0]['uuid']
922         self.assertEqual(b'foo', self.keepClient.get(locator))
923         self.assertEqual(self.gateway_roots[0]+locator,
924                          MockCurl.return_value.getopt(pycurl.URL).decode())
925         self.assertEqual(True, self.keepClient.head(locator))
926
927     @mock.patch('pycurl.Curl')
928     def test_get_with_gateway_hints_in_order(self, MockCurl):
929         gateways = 4
930         disks = 3
931         mocks = [
932             tutil.FakeCurl.make(code=404, body='')
933             for _ in range(gateways+disks)
934         ]
935         MockCurl.side_effect = tutil.queue_with(mocks)
936         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
937         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
938                            ['K@'+gw['uuid'] for gw in self.gateways])
939         with self.assertRaises(arvados.errors.NotFoundError):
940             self.keepClient.get(locator)
941         # Gateways are tried first, in the order given.
942         for i, root in enumerate(self.gateway_roots):
943             self.assertEqual(root+locator,
944                              mocks[i].getopt(pycurl.URL).decode())
945         # Disk services are tried next.
946         for i in range(gateways, gateways+disks):
947             self.assertRegex(
948                 mocks[i].getopt(pycurl.URL).decode(),
949                 r'keep0x')
950
951     @mock.patch('pycurl.Curl')
952     def test_head_with_gateway_hints_in_order(self, MockCurl):
953         gateways = 4
954         disks = 3
955         mocks = [
956             tutil.FakeCurl.make(code=404, body=b'')
957             for _ in range(gateways+disks)
958         ]
959         MockCurl.side_effect = tutil.queue_with(mocks)
960         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
961         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
962                            ['K@'+gw['uuid'] for gw in self.gateways])
963         with self.assertRaises(arvados.errors.NotFoundError):
964             self.keepClient.head(locator)
965         # Gateways are tried first, in the order given.
966         for i, root in enumerate(self.gateway_roots):
967             self.assertEqual(root+locator,
968                              mocks[i].getopt(pycurl.URL).decode())
969         # Disk services are tried next.
970         for i in range(gateways, gateways+disks):
971             self.assertRegex(
972                 mocks[i].getopt(pycurl.URL).decode(),
973                 r'keep0x')
974
975     @mock.patch('pycurl.Curl')
976     def test_get_with_remote_proxy_hint(self, MockCurl):
977         MockCurl.return_value = tutil.FakeCurl.make(
978             code=200, body=b'foo', headers={'Content-Length': 3})
979         self.mock_disks_and_gateways()
980         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
981         self.assertEqual(b'foo', self.keepClient.get(locator))
982         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
983                          MockCurl.return_value.getopt(pycurl.URL).decode())
984
985     @mock.patch('pycurl.Curl')
986     def test_head_with_remote_proxy_hint(self, MockCurl):
987         MockCurl.return_value = tutil.FakeCurl.make(
988             code=200, body=b'foo', headers={'Content-Length': 3})
989         self.mock_disks_and_gateways()
990         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
991         self.assertEqual(True, self.keepClient.head(locator))
992         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
993                          MockCurl.return_value.getopt(pycurl.URL).decode())
994
995
996 class KeepClientRetryTestMixin(object):
997     # Testing with a local Keep store won't exercise the retry behavior.
998     # Instead, our strategy is:
999     # * Create a client with one proxy specified (pointed at a black
1000     #   hole), so there's no need to instantiate an API client, and
1001     #   all HTTP requests come from one place.
1002     # * Mock httplib's request method to provide simulated responses.
1003     # This lets us test the retry logic extensively without relying on any
1004     # supporting servers, and prevents side effects in case something hiccups.
1005     # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
1006     # run_method().
1007     #
1008     # Test classes must define TEST_PATCHER to a method that mocks
1009     # out appropriate methods in the client.
1010
1011     PROXY_ADDR = 'http://[%s]:65535/' % (tutil.TEST_HOST,)
1012     TEST_DATA = b'testdata'
1013     TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
1014
1015     def setUp(self):
1016         self.client_kwargs = {'proxy': self.PROXY_ADDR, 'local_store': ''}
1017
1018     def new_client(self, **caller_kwargs):
1019         kwargs = self.client_kwargs.copy()
1020         kwargs.update(caller_kwargs)
1021         return arvados.KeepClient(**kwargs)
1022
1023     def run_method(self, *args, **kwargs):
1024         raise NotImplementedError("test subclasses must define run_method")
1025
1026     def check_success(self, expected=None, *args, **kwargs):
1027         if expected is None:
1028             expected = self.DEFAULT_EXPECT
1029         self.assertEqual(expected, self.run_method(*args, **kwargs))
1030
1031     def check_exception(self, error_class=None, *args, **kwargs):
1032         if error_class is None:
1033             error_class = self.DEFAULT_EXCEPTION
1034         self.assertRaises(error_class, self.run_method, *args, **kwargs)
1035
1036     def test_immediate_success(self):
1037         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 200):
1038             self.check_success()
1039
1040     def test_retry_then_success(self):
1041         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1042             self.check_success(num_retries=3)
1043
1044     def test_exception_then_success(self):
1045         with self.TEST_PATCHER(self.DEFAULT_EXPECT, Exception('mock err'), 200):
1046             self.check_success(num_retries=3)
1047
1048     def test_no_default_retry(self):
1049         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1050             self.check_exception()
1051
1052     def test_no_retry_after_permanent_error(self):
1053         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 403, 200):
1054             self.check_exception(num_retries=3)
1055
1056     def test_error_after_retries_exhausted(self):
1057         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 500, 200):
1058             self.check_exception(num_retries=1)
1059
1060     def test_num_retries_instance_fallback(self):
1061         self.client_kwargs['num_retries'] = 3
1062         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1063             self.check_success()
1064
1065
1066 @tutil.skip_sleep
1067 class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1068     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
1069     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1070     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1071     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1072
1073     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1074                    *args, **kwargs):
1075         return self.new_client().get(locator, *args, **kwargs)
1076
1077     def test_specific_exception_when_not_found(self):
1078         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1079             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1080
1081     def test_general_exception_with_mixed_errors(self):
1082         # get should raise a NotFoundError if no server returns the block,
1083         # and a high threshold of servers report that it's not found.
1084         # This test rigs up 50/50 disagreement between two servers, and
1085         # checks that it does not become a NotFoundError.
1086         client = self.new_client()
1087         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1088             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1089                 client.get(self.HINTED_LOCATOR)
1090             self.assertNotIsInstance(
1091                 exc_check.exception, arvados.errors.NotFoundError,
1092                 "mixed errors raised NotFoundError")
1093
1094     def test_hint_server_can_succeed_without_retries(self):
1095         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1096             self.check_success(locator=self.HINTED_LOCATOR)
1097
1098     def test_try_next_server_after_timeout(self):
1099         with tutil.mock_keep_responses(
1100                 (socket.timeout("timed out"), 200),
1101                 (self.DEFAULT_EXPECT, 200)):
1102             self.check_success(locator=self.HINTED_LOCATOR)
1103
1104     def test_retry_data_with_wrong_checksum(self):
1105         with tutil.mock_keep_responses(
1106                 ('baddata', 200),
1107                 (self.DEFAULT_EXPECT, 200)):
1108             self.check_success(locator=self.HINTED_LOCATOR)
1109
1110 @tutil.skip_sleep
1111 class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1112     DEFAULT_EXPECT = True
1113     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1114     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1115     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1116
1117     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1118                    *args, **kwargs):
1119         return self.new_client().head(locator, *args, **kwargs)
1120
1121     def test_specific_exception_when_not_found(self):
1122         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1123             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1124
1125     def test_general_exception_with_mixed_errors(self):
1126         # head should raise a NotFoundError if no server returns the block,
1127         # and a high threshold of servers report that it's not found.
1128         # This test rigs up 50/50 disagreement between two servers, and
1129         # checks that it does not become a NotFoundError.
1130         client = self.new_client()
1131         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1132             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1133                 client.head(self.HINTED_LOCATOR)
1134             self.assertNotIsInstance(
1135                 exc_check.exception, arvados.errors.NotFoundError,
1136                 "mixed errors raised NotFoundError")
1137
1138     def test_hint_server_can_succeed_without_retries(self):
1139         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1140             self.check_success(locator=self.HINTED_LOCATOR)
1141
1142     def test_try_next_server_after_timeout(self):
1143         with tutil.mock_keep_responses(
1144                 (socket.timeout("timed out"), 200),
1145                 (self.DEFAULT_EXPECT, 200)):
1146             self.check_success(locator=self.HINTED_LOCATOR)
1147
1148 @tutil.skip_sleep
1149 class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1150     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
1151     DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
1152     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1153
1154     def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
1155                    copies=1, *args, **kwargs):
1156         return self.new_client().put(data, copies, *args, **kwargs)
1157
1158     def test_do_not_send_multiple_copies_to_same_server(self):
1159         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200):
1160             self.check_exception(copies=2, num_retries=3)
1161
1162
1163 class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
1164
1165     class FakeKeepService(object):
1166         def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
1167             self.delay = delay
1168             self.will_succeed = will_succeed
1169             self.will_raise = will_raise
1170             self._result = {}
1171             self._result['headers'] = {}
1172             self._result['headers']['x-keep-replicas-stored'] = str(replicas)
1173             self._result['body'] = 'foobar'
1174
1175         def put(self, data_hash, data, timeout):
1176             time.sleep(self.delay)
1177             if self.will_raise is not None:
1178                 raise self.will_raise
1179             return self.will_succeed
1180
1181         def last_result(self):
1182             if self.will_succeed:
1183                 return self._result
1184
1185         def finished(self):
1186             return False
1187
1188     def setUp(self):
1189         self.copies = 3
1190         self.pool = arvados.KeepClient.KeepWriterThreadPool(
1191             data = 'foo',
1192             data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1193             max_service_replicas = self.copies,
1194             copies = self.copies
1195         )
1196
1197     def test_only_write_enough_on_success(self):
1198         for i in range(10):
1199             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1200             self.pool.add_task(ks, None)
1201         self.pool.join()
1202         self.assertEqual(self.pool.done(), self.copies)
1203
1204     def test_only_write_enough_on_partial_success(self):
1205         for i in range(5):
1206             ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
1207             self.pool.add_task(ks, None)
1208             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1209             self.pool.add_task(ks, None)
1210         self.pool.join()
1211         self.assertEqual(self.pool.done(), self.copies)
1212
1213     def test_only_write_enough_when_some_crash(self):
1214         for i in range(5):
1215             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1216             self.pool.add_task(ks, None)
1217             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1218             self.pool.add_task(ks, None)
1219         self.pool.join()
1220         self.assertEqual(self.pool.done(), self.copies)
1221
1222     def test_fail_when_too_many_crash(self):
1223         for i in range(self.copies+1):
1224             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1225             self.pool.add_task(ks, None)
1226         for i in range(self.copies-1):
1227             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1228             self.pool.add_task(ks, None)
1229         self.pool.join()
1230         self.assertEqual(self.pool.done(), self.copies-1)
1231
1232
1233 @tutil.skip_sleep
1234 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock):
1235     # Test put()s that need two distinct servers to succeed, possibly
1236     # requiring multiple passes through the retry loop.
1237
1238     def setUp(self):
1239         self.api_client = self.mock_keep_services(count=2)
1240         self.keep_client = arvados.KeepClient(api_client=self.api_client)
1241
1242     def test_success_after_exception(self):
1243         with tutil.mock_keep_responses(
1244                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1245                 Exception('mock err'), 200, 200) as req_mock:
1246             self.keep_client.put('foo', num_retries=1, copies=2)
1247         self.assertEqual(3, req_mock.call_count)
1248
1249     def test_success_after_retryable_error(self):
1250         with tutil.mock_keep_responses(
1251                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1252                 500, 200, 200) as req_mock:
1253             self.keep_client.put('foo', num_retries=1, copies=2)
1254         self.assertEqual(3, req_mock.call_count)
1255
1256     def test_fail_after_final_error(self):
1257         # First retry loop gets a 200 (can't achieve replication by
1258         # storing again on that server) and a 400 (can't retry that
1259         # server at all), so we shouldn't try a third request.
1260         with tutil.mock_keep_responses(
1261                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1262                 200, 400, 200) as req_mock:
1263             with self.assertRaises(arvados.errors.KeepWriteError):
1264                 self.keep_client.put('foo', num_retries=1, copies=2)
1265         self.assertEqual(2, req_mock.call_count)
1266
1267 class KeepClientAPIErrorTest(unittest.TestCase):
1268     def test_api_fail(self):
1269         class ApiMock(object):
1270             def __getattr__(self, r):
1271                 if r == "api_token":
1272                     return "abc"
1273                 elif r == "insecure":
1274                     return False
1275                 else:
1276                     raise arvados.errors.KeepReadError()
1277         keep_client = arvados.KeepClient(api_client=ApiMock(),
1278                                              proxy='', local_store='')
1279
1280         # The bug this is testing for is that if an API (not
1281         # keepstore) exception is thrown as part of a get(), the next
1282         # attempt to get that same block will result in a deadlock.
1283         # This is why there are two get()s in a row.  Unfortunately,
1284         # the failure mode for this test is that the test suite
1285         # deadlocks, there isn't a good way to avoid that without
1286         # adding a special case that has no use except for this test.
1287
1288         with self.assertRaises(arvados.errors.KeepReadError):
1289             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1290         with self.assertRaises(arvados.errors.KeepReadError):
1291             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")