13558: Merge branch 'master' into wtsi-hgi-13558-debug-log-tag-req-id
[arvados.git] / sdk / python / tests / test_keep_client.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 standard_library.install_aliases()
9 from builtins import str
10 from builtins import range
11 from builtins import object
12 import hashlib
13 import mock
14 import os
15 import pycurl
16 import random
17 import re
18 import socket
19 import sys
20 import time
21 import unittest
22 import urllib.parse
23
24 import arvados
25 import arvados.retry
26 import arvados.util
27 from . import arvados_testutil as tutil
28 from . import keepstub
29 from . import run_test_server
30
31 class KeepTestCase(run_test_server.TestCaseWithServers):
32     MAIN_SERVER = {}
33     KEEP_SERVER = {}
34
35     @classmethod
36     def setUpClass(cls):
37         super(KeepTestCase, cls).setUpClass()
38         run_test_server.authorize_with("admin")
39         cls.api_client = arvados.api('v1')
40         cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
41                                              proxy='', local_store='')
42
43     def test_KeepBasicRWTest(self):
44         self.assertEqual(0, self.keep_client.upload_counter.get())
45         foo_locator = self.keep_client.put('foo')
46         self.assertRegex(
47             foo_locator,
48             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
49             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
50
51         # 6 bytes because uploaded 2 copies
52         self.assertEqual(6, self.keep_client.upload_counter.get())
53
54         self.assertEqual(0, self.keep_client.download_counter.get())
55         self.assertEqual(self.keep_client.get(foo_locator),
56                          b'foo',
57                          'wrong content from Keep.get(md5("foo"))')
58         self.assertEqual(3, self.keep_client.download_counter.get())
59
60     def test_KeepBinaryRWTest(self):
61         blob_str = b'\xff\xfe\xf7\x00\x01\x02'
62         blob_locator = self.keep_client.put(blob_str)
63         self.assertRegex(
64             blob_locator,
65             '^7fc7c53b45e53926ba52821140fef396\+6',
66             ('wrong locator from Keep.put(<binarydata>):' + blob_locator))
67         self.assertEqual(self.keep_client.get(blob_locator),
68                          blob_str,
69                          'wrong content from Keep.get(md5(<binarydata>))')
70
71     def test_KeepLongBinaryRWTest(self):
72         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
73         for i in range(0,23):
74             blob_data = blob_data + blob_data
75         blob_locator = self.keep_client.put(blob_data)
76         self.assertRegex(
77             blob_locator,
78             '^84d90fc0d8175dd5dcfab04b999bc956\+67108864',
79             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
80         self.assertEqual(self.keep_client.get(blob_locator),
81                          blob_data,
82                          'wrong content from Keep.get(md5(<binarydata>))')
83
84     @unittest.skip("unreliable test - please fix and close #8752")
85     def test_KeepSingleCopyRWTest(self):
86         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
87         blob_locator = self.keep_client.put(blob_data, copies=1)
88         self.assertRegex(
89             blob_locator,
90             '^c902006bc98a3eb4a3663b65ab4a6fab\+8',
91             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
92         self.assertEqual(self.keep_client.get(blob_locator),
93                          blob_data,
94                          'wrong content from Keep.get(md5(<binarydata>))')
95
96     def test_KeepEmptyCollectionTest(self):
97         blob_locator = self.keep_client.put('', copies=1)
98         self.assertRegex(
99             blob_locator,
100             '^d41d8cd98f00b204e9800998ecf8427e\+0',
101             ('wrong locator from Keep.put(""): ' + blob_locator))
102
103     def test_unicode_must_be_ascii(self):
104         # If unicode type, must only consist of valid ASCII
105         foo_locator = self.keep_client.put(u'foo')
106         self.assertRegex(
107             foo_locator,
108             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
109             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
110
111         if sys.version_info < (3, 0):
112             with self.assertRaises(UnicodeEncodeError):
113                 # Error if it is not ASCII
114                 self.keep_client.put(u'\xe2')
115
116         with self.assertRaises(AttributeError):
117             # Must be bytes or have an encode() method
118             self.keep_client.put({})
119
120     def test_KeepHeadTest(self):
121         locator = self.keep_client.put('test_head')
122         self.assertRegex(
123             locator,
124             '^b9a772c7049325feb7130fff1f8333e9\+9',
125             'wrong md5 hash from Keep.put for "test_head": ' + locator)
126         self.assertEqual(True, self.keep_client.head(locator))
127         self.assertEqual(self.keep_client.get(locator),
128                          b'test_head',
129                          'wrong content from Keep.get for "test_head"')
130
131 class KeepPermissionTestCase(run_test_server.TestCaseWithServers):
132     MAIN_SERVER = {}
133     KEEP_SERVER = {'blob_signing_key': 'abcdefghijk0123456789',
134                    'enforce_permissions': True}
135
136     def test_KeepBasicRWTest(self):
137         run_test_server.authorize_with('active')
138         keep_client = arvados.KeepClient()
139         foo_locator = keep_client.put('foo')
140         self.assertRegex(
141             foo_locator,
142             r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
143             'invalid locator from Keep.put("foo"): ' + foo_locator)
144         self.assertEqual(keep_client.get(foo_locator),
145                          b'foo',
146                          'wrong content from Keep.get(md5("foo"))')
147
148         # GET with an unsigned locator => NotFound
149         bar_locator = keep_client.put('bar')
150         unsigned_bar_locator = "37b51d194a7513e45b56f6524f2d51f2+3"
151         self.assertRegex(
152             bar_locator,
153             r'^37b51d194a7513e45b56f6524f2d51f2\+3\+A[a-f0-9]+@[a-f0-9]+$',
154             'invalid locator from Keep.put("bar"): ' + bar_locator)
155         self.assertRaises(arvados.errors.NotFoundError,
156                           keep_client.get,
157                           unsigned_bar_locator)
158
159         # GET from a different user => NotFound
160         run_test_server.authorize_with('spectator')
161         self.assertRaises(arvados.errors.NotFoundError,
162                           arvados.Keep.get,
163                           bar_locator)
164
165         # Unauthenticated GET for a signed locator => NotFound
166         # Unauthenticated GET for an unsigned locator => NotFound
167         keep_client.api_token = ''
168         self.assertRaises(arvados.errors.NotFoundError,
169                           keep_client.get,
170                           bar_locator)
171         self.assertRaises(arvados.errors.NotFoundError,
172                           keep_client.get,
173                           unsigned_bar_locator)
174
175
176 # KeepOptionalPermission: starts Keep with --permission-key-file
177 # but not --enforce-permissions (i.e. generate signatures on PUT
178 # requests, but do not require them for GET requests)
179 #
180 # All of these requests should succeed when permissions are optional:
181 # * authenticated request, signed locator
182 # * authenticated request, unsigned locator
183 # * unauthenticated request, signed locator
184 # * unauthenticated request, unsigned locator
185 class KeepOptionalPermission(run_test_server.TestCaseWithServers):
186     MAIN_SERVER = {}
187     KEEP_SERVER = {'blob_signing_key': 'abcdefghijk0123456789',
188                    'enforce_permissions': False}
189
190     @classmethod
191     def setUpClass(cls):
192         super(KeepOptionalPermission, cls).setUpClass()
193         run_test_server.authorize_with("admin")
194         cls.api_client = arvados.api('v1')
195
196     def setUp(self):
197         super(KeepOptionalPermission, self).setUp()
198         self.keep_client = arvados.KeepClient(api_client=self.api_client,
199                                               proxy='', local_store='')
200
201     def _put_foo_and_check(self):
202         signed_locator = self.keep_client.put('foo')
203         self.assertRegex(
204             signed_locator,
205             r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
206             'invalid locator from Keep.put("foo"): ' + signed_locator)
207         return signed_locator
208
209     def test_KeepAuthenticatedSignedTest(self):
210         signed_locator = self._put_foo_and_check()
211         self.assertEqual(self.keep_client.get(signed_locator),
212                          b'foo',
213                          'wrong content from Keep.get(md5("foo"))')
214
215     def test_KeepAuthenticatedUnsignedTest(self):
216         signed_locator = self._put_foo_and_check()
217         self.assertEqual(self.keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8"),
218                          b'foo',
219                          'wrong content from Keep.get(md5("foo"))')
220
221     def test_KeepUnauthenticatedSignedTest(self):
222         # Check that signed GET requests work even when permissions
223         # enforcement is off.
224         signed_locator = self._put_foo_and_check()
225         self.keep_client.api_token = ''
226         self.assertEqual(self.keep_client.get(signed_locator),
227                          b'foo',
228                          'wrong content from Keep.get(md5("foo"))')
229
230     def test_KeepUnauthenticatedUnsignedTest(self):
231         # Since --enforce-permissions is not in effect, GET requests
232         # need not be authenticated.
233         signed_locator = self._put_foo_and_check()
234         self.keep_client.api_token = ''
235         self.assertEqual(self.keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8"),
236                          b'foo',
237                          'wrong content from Keep.get(md5("foo"))')
238
239
240 class KeepProxyTestCase(run_test_server.TestCaseWithServers):
241     MAIN_SERVER = {}
242     KEEP_SERVER = {}
243     KEEP_PROXY_SERVER = {}
244
245     @classmethod
246     def setUpClass(cls):
247         super(KeepProxyTestCase, cls).setUpClass()
248         run_test_server.authorize_with('active')
249         cls.api_client = arvados.api('v1')
250
251     def tearDown(self):
252         arvados.config.settings().pop('ARVADOS_EXTERNAL_CLIENT', None)
253         super(KeepProxyTestCase, self).tearDown()
254
255     def test_KeepProxyTest1(self):
256         # Will use ARVADOS_KEEP_SERVICES environment variable that
257         # is set by setUpClass().
258         keep_client = arvados.KeepClient(api_client=self.api_client,
259                                          local_store='')
260         baz_locator = keep_client.put('baz')
261         self.assertRegex(
262             baz_locator,
263             '^73feffa4b7f6bb68e44cf984c85f6e88\+3',
264             'wrong md5 hash from Keep.put("baz"): ' + baz_locator)
265         self.assertEqual(keep_client.get(baz_locator),
266                          b'baz',
267                          'wrong content from Keep.get(md5("baz"))')
268         self.assertTrue(keep_client.using_proxy)
269
270     def test_KeepProxyTest2(self):
271         # Don't instantiate the proxy directly, but set the X-External-Client
272         # header.  The API server should direct us to the proxy.
273         arvados.config.settings()['ARVADOS_EXTERNAL_CLIENT'] = 'true'
274         keep_client = arvados.KeepClient(api_client=self.api_client,
275                                          proxy='', local_store='')
276         baz_locator = keep_client.put('baz2')
277         self.assertRegex(
278             baz_locator,
279             '^91f372a266fe2bf2823cb8ec7fda31ce\+4',
280             'wrong md5 hash from Keep.put("baz2"): ' + baz_locator)
281         self.assertEqual(keep_client.get(baz_locator),
282                          b'baz2',
283                          'wrong content from Keep.get(md5("baz2"))')
284         self.assertTrue(keep_client.using_proxy)
285
286     def test_KeepProxyTestMultipleURIs(self):
287         # Test using ARVADOS_KEEP_SERVICES env var overriding any
288         # existing proxy setting and setting multiple proxies
289         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
290         keep_client = arvados.KeepClient(api_client=self.api_client,
291                                          local_store='')
292         uris = [x['_service_root'] for x in keep_client._keep_services]
293         self.assertEqual(uris, ['http://10.0.0.1/',
294                                 'https://foo.example.org:1234/'])
295
296     def test_KeepProxyTestInvalidURI(self):
297         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org'
298         with self.assertRaises(arvados.errors.ArgumentError):
299             keep_client = arvados.KeepClient(api_client=self.api_client,
300                                              local_store='')
301
302
303 class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
304     def get_service_roots(self, api_client):
305         keep_client = arvados.KeepClient(api_client=api_client)
306         services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
307         return [urllib.parse.urlparse(url) for url in sorted(services)]
308
309     def test_ssl_flag_respected_in_roots(self):
310         for ssl_flag in [False, True]:
311             services = self.get_service_roots(self.mock_keep_services(
312                 service_ssl_flag=ssl_flag))
313             self.assertEqual(
314                 ('https' if ssl_flag else 'http'), services[0].scheme)
315
316     def test_correct_ports_with_ipv6_addresses(self):
317         service = self.get_service_roots(self.mock_keep_services(
318             service_type='proxy', service_host='100::1', service_port=10, count=1))[0]
319         self.assertEqual('100::1', service.hostname)
320         self.assertEqual(10, service.port)
321
322     def test_insecure_disables_tls_verify(self):
323         api_client = self.mock_keep_services(count=1)
324         force_timeout = socket.timeout("timed out")
325
326         api_client.insecure = True
327         with tutil.mock_keep_responses(b'foo', 200) as mock:
328             keep_client = arvados.KeepClient(api_client=api_client)
329             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
330             self.assertEqual(
331                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
332                 0)
333
334         api_client.insecure = False
335         with tutil.mock_keep_responses(b'foo', 200) as mock:
336             keep_client = arvados.KeepClient(api_client=api_client)
337             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
338             # getopt()==None here means we didn't change the
339             # default. If we were using real pycurl instead of a mock,
340             # it would return the default value 1.
341             self.assertEqual(
342                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
343                 None)
344
345     # test_*_timeout verify that KeepClient instructs pycurl to use
346     # the appropriate connection and read timeouts. They don't care
347     # whether pycurl actually exhibits the expected timeout behavior
348     # -- those tests are in the KeepClientTimeout test class.
349
350     def test_get_timeout(self):
351         api_client = self.mock_keep_services(count=1)
352         force_timeout = socket.timeout("timed out")
353         with tutil.mock_keep_responses(force_timeout, 0) as mock:
354             keep_client = arvados.KeepClient(api_client=api_client)
355             with self.assertRaises(arvados.errors.KeepReadError):
356                 keep_client.get('ffffffffffffffffffffffffffffffff')
357             self.assertEqual(
358                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
359                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
360             self.assertEqual(
361                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
362                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
363             self.assertEqual(
364                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
365                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
366
367     def test_put_timeout(self):
368         api_client = self.mock_keep_services(count=1)
369         force_timeout = socket.timeout("timed out")
370         with tutil.mock_keep_responses(force_timeout, 0) as mock:
371             keep_client = arvados.KeepClient(api_client=api_client)
372             with self.assertRaises(arvados.errors.KeepWriteError):
373                 keep_client.put(b'foo')
374             self.assertEqual(
375                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
376                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
377             self.assertEqual(
378                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
379                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
380             self.assertEqual(
381                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
382                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
383
384     def test_head_timeout(self):
385         api_client = self.mock_keep_services(count=1)
386         force_timeout = socket.timeout("timed out")
387         with tutil.mock_keep_responses(force_timeout, 0) as mock:
388             keep_client = arvados.KeepClient(api_client=api_client)
389             with self.assertRaises(arvados.errors.KeepReadError):
390                 keep_client.head('ffffffffffffffffffffffffffffffff')
391             self.assertEqual(
392                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
393                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
394             self.assertEqual(
395                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
396                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
397             self.assertEqual(
398                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
399                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
400
401     def test_proxy_get_timeout(self):
402         api_client = self.mock_keep_services(service_type='proxy', count=1)
403         force_timeout = socket.timeout("timed out")
404         with tutil.mock_keep_responses(force_timeout, 0) as mock:
405             keep_client = arvados.KeepClient(api_client=api_client)
406             with self.assertRaises(arvados.errors.KeepReadError):
407                 keep_client.get('ffffffffffffffffffffffffffffffff')
408             self.assertEqual(
409                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
410                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
411             self.assertEqual(
412                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
413                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
414             self.assertEqual(
415                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
416                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
417
418     def test_proxy_head_timeout(self):
419         api_client = self.mock_keep_services(service_type='proxy', count=1)
420         force_timeout = socket.timeout("timed out")
421         with tutil.mock_keep_responses(force_timeout, 0) as mock:
422             keep_client = arvados.KeepClient(api_client=api_client)
423             with self.assertRaises(arvados.errors.KeepReadError):
424                 keep_client.head('ffffffffffffffffffffffffffffffff')
425             self.assertEqual(
426                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
427                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
428             self.assertEqual(
429                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
430                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
431             self.assertEqual(
432                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
433                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
434
435     def test_proxy_put_timeout(self):
436         api_client = self.mock_keep_services(service_type='proxy', count=1)
437         force_timeout = socket.timeout("timed out")
438         with tutil.mock_keep_responses(force_timeout, 0) as mock:
439             keep_client = arvados.KeepClient(api_client=api_client)
440             with self.assertRaises(arvados.errors.KeepWriteError):
441                 keep_client.put('foo')
442             self.assertEqual(
443                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
444                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
445             self.assertEqual(
446                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
447                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
448             self.assertEqual(
449                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
450                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
451
452     def check_no_services_error(self, verb, exc_class):
453         api_client = mock.MagicMock(name='api_client')
454         api_client.keep_services().accessible().execute.side_effect = (
455             arvados.errors.ApiError)
456         keep_client = arvados.KeepClient(api_client=api_client)
457         with self.assertRaises(exc_class) as err_check:
458             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
459         self.assertEqual(0, len(err_check.exception.request_errors()))
460
461     def test_get_error_with_no_services(self):
462         self.check_no_services_error('get', arvados.errors.KeepReadError)
463
464     def test_head_error_with_no_services(self):
465         self.check_no_services_error('head', arvados.errors.KeepReadError)
466
467     def test_put_error_with_no_services(self):
468         self.check_no_services_error('put', arvados.errors.KeepWriteError)
469
470     def check_errors_from_last_retry(self, verb, exc_class):
471         api_client = self.mock_keep_services(count=2)
472         req_mock = tutil.mock_keep_responses(
473             "retry error reporting test", 500, 500, 403, 403)
474         with req_mock, tutil.skip_sleep, \
475                 self.assertRaises(exc_class) as err_check:
476             keep_client = arvados.KeepClient(api_client=api_client)
477             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
478                                        num_retries=3)
479         self.assertEqual([403, 403], [
480                 getattr(error, 'status_code', None)
481                 for error in err_check.exception.request_errors().values()])
482
483     def test_get_error_reflects_last_retry(self):
484         self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
485
486     def test_head_error_reflects_last_retry(self):
487         self.check_errors_from_last_retry('head', arvados.errors.KeepReadError)
488
489     def test_put_error_reflects_last_retry(self):
490         self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
491
492     def test_put_error_does_not_include_successful_puts(self):
493         data = 'partial failure test'
494         data_loc = tutil.str_keep_locator(data)
495         api_client = self.mock_keep_services(count=3)
496         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
497                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
498             keep_client = arvados.KeepClient(api_client=api_client)
499             keep_client.put(data)
500         self.assertEqual(2, len(exc_check.exception.request_errors()))
501
502     def test_proxy_put_with_no_writable_services(self):
503         data = 'test with no writable services'
504         data_loc = tutil.str_keep_locator(data)
505         api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
506         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
507                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
508           keep_client = arvados.KeepClient(api_client=api_client)
509           keep_client.put(data)
510         self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
511         self.assertEqual(0, len(exc_check.exception.request_errors()))
512
513     def test_oddball_service_get(self):
514         body = b'oddball service get'
515         api_client = self.mock_keep_services(service_type='fancynewblobstore')
516         with tutil.mock_keep_responses(body, 200):
517             keep_client = arvados.KeepClient(api_client=api_client)
518             actual = keep_client.get(tutil.str_keep_locator(body))
519         self.assertEqual(body, actual)
520
521     def test_oddball_service_put(self):
522         body = b'oddball service put'
523         pdh = tutil.str_keep_locator(body)
524         api_client = self.mock_keep_services(service_type='fancynewblobstore')
525         with tutil.mock_keep_responses(pdh, 200):
526             keep_client = arvados.KeepClient(api_client=api_client)
527             actual = keep_client.put(body, copies=1)
528         self.assertEqual(pdh, actual)
529
530     def test_oddball_service_writer_count(self):
531         body = b'oddball service writer count'
532         pdh = tutil.str_keep_locator(body)
533         api_client = self.mock_keep_services(service_type='fancynewblobstore',
534                                              count=4)
535         headers = {'x-keep-replicas-stored': 3}
536         with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
537                                        **headers) as req_mock:
538             keep_client = arvados.KeepClient(api_client=api_client)
539             actual = keep_client.put(body, copies=2)
540         self.assertEqual(pdh, actual)
541         self.assertEqual(1, req_mock.call_count)
542
543
544 @tutil.skip_sleep
545 class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock):
546     def setUp(self):
547         self.api_client = self.mock_keep_services(count=2)
548         self.keep_client = arvados.KeepClient(api_client=self.api_client)
549         self.data = b'xyzzy'
550         self.locator = '1271ed5ef305aadabc605b1609e24c52'
551         self.test_id = arvados.util.new_request_id()
552         self.assertRegex(self.test_id, r'^req-[a-z0-9]{20}$')
553         # If we don't set request_id to None explicitly here, it will
554         # return <MagicMock name='api_client_mock.request_id'
555         # id='123456789'>:
556         self.api_client.request_id = None
557
558     def test_default_to_api_client_request_id(self):
559         self.api_client.request_id = self.test_id
560         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
561             self.keep_client.put(self.data)
562         self.assertEqual(2, len(mock.responses))
563         for resp in mock.responses:
564             self.assertProvidedRequestId(resp)
565
566         with tutil.mock_keep_responses(self.data, 200) as mock:
567             self.keep_client.get(self.locator)
568         self.assertProvidedRequestId(mock.responses[0])
569
570         with tutil.mock_keep_responses(b'', 200) as mock:
571             self.keep_client.head(self.locator)
572         self.assertProvidedRequestId(mock.responses[0])
573
574     def test_explicit_request_id(self):
575         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
576             self.keep_client.put(self.data, request_id=self.test_id)
577         self.assertEqual(2, len(mock.responses))
578         for resp in mock.responses:
579             self.assertProvidedRequestId(resp)
580
581         with tutil.mock_keep_responses(self.data, 200) as mock:
582             self.keep_client.get(self.locator, request_id=self.test_id)
583         self.assertProvidedRequestId(mock.responses[0])
584
585         with tutil.mock_keep_responses(b'', 200) as mock:
586             self.keep_client.head(self.locator, request_id=self.test_id)
587         self.assertProvidedRequestId(mock.responses[0])
588
589     def test_automatic_request_id(self):
590         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
591             self.keep_client.put(self.data)
592         self.assertEqual(2, len(mock.responses))
593         for resp in mock.responses:
594             self.assertAutomaticRequestId(resp)
595
596         with tutil.mock_keep_responses(self.data, 200) as mock:
597             self.keep_client.get(self.locator)
598         self.assertAutomaticRequestId(mock.responses[0])
599
600         with tutil.mock_keep_responses(b'', 200) as mock:
601             self.keep_client.head(self.locator)
602         self.assertAutomaticRequestId(mock.responses[0])
603
604     def assertAutomaticRequestId(self, resp):
605         hdr = [x for x in resp.getopt(pycurl.HTTPHEADER)
606                if x.startswith('X-Request-Id: ')][0]
607         self.assertNotEqual(hdr, 'X-Request-Id: '+self.test_id)
608         self.assertRegex(hdr, r'^X-Request-Id: req-[a-z0-9]{20}$')
609
610     def assertProvidedRequestId(self, resp):
611         self.assertIn('X-Request-Id: '+self.test_id,
612                       resp.getopt(pycurl.HTTPHEADER))
613
614
615 @tutil.skip_sleep
616 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
617
618     def setUp(self):
619         # expected_order[i] is the probe order for
620         # hash=md5(sprintf("%064x",i)) where there are 16 services
621         # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
622         # the first probe for the block consisting of 64 "0"
623         # characters is the service whose uuid is
624         # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
625         self.services = 16
626         self.expected_order = [
627             list('3eab2d5fc9681074'),
628             list('097dba52e648f1c3'),
629             list('c5b4e023f8a7d691'),
630             list('9d81c02e76a3bf54'),
631             ]
632         self.blocks = [
633             "{:064x}".format(x).encode()
634             for x in range(len(self.expected_order))]
635         self.hashes = [
636             hashlib.md5(self.blocks[x]).hexdigest()
637             for x in range(len(self.expected_order))]
638         self.api_client = self.mock_keep_services(count=self.services)
639         self.keep_client = arvados.KeepClient(api_client=self.api_client)
640
641     def test_weighted_service_roots_against_reference_set(self):
642         # Confirm weighted_service_roots() returns the correct order
643         for i, hash in enumerate(self.hashes):
644             roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash))
645             got_order = [
646                 re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
647                 for root in roots]
648             self.assertEqual(self.expected_order[i], got_order)
649
650     def test_get_probe_order_against_reference_set(self):
651         self._test_probe_order_against_reference_set(
652             lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
653
654     def test_head_probe_order_against_reference_set(self):
655         self._test_probe_order_against_reference_set(
656             lambda i: self.keep_client.head(self.hashes[i], num_retries=1))
657
658     def test_put_probe_order_against_reference_set(self):
659         # copies=1 prevents the test from being sensitive to races
660         # between writer threads.
661         self._test_probe_order_against_reference_set(
662             lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1))
663
664     def _test_probe_order_against_reference_set(self, op):
665         for i in range(len(self.blocks)):
666             with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \
667                  self.assertRaises(arvados.errors.KeepRequestError):
668                 op(i)
669             got_order = [
670                 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
671                 for resp in mock.responses]
672             self.assertEqual(self.expected_order[i]*2, got_order)
673
674     def test_put_probe_order_multiple_copies(self):
675         for copies in range(2, 4):
676             for i in range(len(self.blocks)):
677                 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \
678                      self.assertRaises(arvados.errors.KeepWriteError):
679                     self.keep_client.put(self.blocks[i], num_retries=2, copies=copies)
680                 got_order = [
681                     re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
682                     for resp in mock.responses]
683                 # With T threads racing to make requests, the position
684                 # of a given server in the sequence of HTTP requests
685                 # (got_order) cannot be more than T-1 positions
686                 # earlier than that server's position in the reference
687                 # probe sequence (expected_order).
688                 #
689                 # Loop invariant: we have accounted for +pos+ expected
690                 # probes, either by seeing them in +got_order+ or by
691                 # putting them in +pending+ in the hope of seeing them
692                 # later. As long as +len(pending)<T+, we haven't
693                 # started a request too early.
694                 pending = []
695                 for pos, expected in enumerate(self.expected_order[i]*3):
696                     got = got_order[pos-len(pending)]
697                     while got in pending:
698                         del pending[pending.index(got)]
699                         got = got_order[pos-len(pending)]
700                     if got != expected:
701                         pending.append(expected)
702                         self.assertLess(
703                             len(pending), copies,
704                             "pending={}, with copies={}, got {}, expected {}".format(
705                                 pending, copies, repr(got_order), repr(self.expected_order[i]*3)))
706
707     def test_probe_waste_adding_one_server(self):
708         hashes = [
709             hashlib.md5("{:064x}".format(x).encode()).hexdigest() for x in range(100)]
710         initial_services = 12
711         self.api_client = self.mock_keep_services(count=initial_services)
712         self.keep_client = arvados.KeepClient(api_client=self.api_client)
713         probes_before = [
714             self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
715         for added_services in range(1, 12):
716             api_client = self.mock_keep_services(count=initial_services+added_services)
717             keep_client = arvados.KeepClient(api_client=api_client)
718             total_penalty = 0
719             for hash_index in range(len(hashes)):
720                 probe_after = keep_client.weighted_service_roots(
721                     arvados.KeepLocator(hashes[hash_index]))
722                 penalty = probe_after.index(probes_before[hash_index][0])
723                 self.assertLessEqual(penalty, added_services)
724                 total_penalty += penalty
725             # Average penalty per block should not exceed
726             # N(added)/N(orig) by more than 20%, and should get closer
727             # to the ideal as we add data points.
728             expect_penalty = (
729                 added_services *
730                 len(hashes) / initial_services)
731             max_penalty = (
732                 expect_penalty *
733                 (120 - added_services)/100)
734             min_penalty = (
735                 expect_penalty * 8/10)
736             self.assertTrue(
737                 min_penalty <= total_penalty <= max_penalty,
738                 "With {}+{} services, {} blocks, penalty {} but expected {}..{}".format(
739                     initial_services,
740                     added_services,
741                     len(hashes),
742                     total_penalty,
743                     min_penalty,
744                     max_penalty))
745
746     def check_64_zeros_error_order(self, verb, exc_class):
747         data = b'0' * 64
748         if verb == 'get':
749             data = tutil.str_keep_locator(data)
750         # Arbitrary port number:
751         aport = random.randint(1024,65535)
752         api_client = self.mock_keep_services(service_port=aport, count=self.services)
753         keep_client = arvados.KeepClient(api_client=api_client)
754         with mock.patch('pycurl.Curl') as curl_mock, \
755              self.assertRaises(exc_class) as err_check:
756             curl_mock.return_value = tutil.FakeCurl.make(code=500, body=b'')
757             getattr(keep_client, verb)(data)
758         urls = [urllib.parse.urlparse(url)
759                 for url in err_check.exception.request_errors()]
760         self.assertEqual([('keep0x' + c, aport) for c in '3eab2d5fc9681074'],
761                          [(url.hostname, url.port) for url in urls])
762
763     def test_get_error_shows_probe_order(self):
764         self.check_64_zeros_error_order('get', arvados.errors.KeepReadError)
765
766     def test_put_error_shows_probe_order(self):
767         self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
768
769
770 class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase):
771     # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
772     # 1s worth of data and then trigger bandwidth errors before running
773     # out of data.
774     DATA = b'x'*2**11
775     BANDWIDTH_LOW_LIM = 1024
776     TIMEOUT_TIME = 1.0
777
778     class assertTakesBetween(unittest.TestCase):
779         def __init__(self, tmin, tmax):
780             self.tmin = tmin
781             self.tmax = tmax
782
783         def __enter__(self):
784             self.t0 = time.time()
785
786         def __exit__(self, *args, **kwargs):
787             # Round times to milliseconds, like CURL. Otherwise, we
788             # fail when CURL reaches a 1s timeout at 0.9998s.
789             delta = round(time.time() - self.t0, 3)
790             self.assertGreaterEqual(delta, self.tmin)
791             self.assertLessEqual(delta, self.tmax)
792
793     class assertTakesGreater(unittest.TestCase):
794         def __init__(self, tmin):
795             self.tmin = tmin
796
797         def __enter__(self):
798             self.t0 = time.time()
799
800         def __exit__(self, *args, **kwargs):
801             delta = round(time.time() - self.t0, 3)
802             self.assertGreaterEqual(delta, self.tmin)
803
804     def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
805         return arvados.KeepClient(
806             api_client=self.api_client,
807             timeout=timeouts)
808
809     def test_timeout_slow_connect(self):
810         # Can't simulate TCP delays with our own socket. Leave our
811         # stub server running uselessly, and try to connect to an
812         # unroutable IP address instead.
813         self.api_client = self.mock_keep_services(
814             count=1,
815             service_host='240.0.0.0',
816         )
817         with self.assertTakesBetween(0.1, 0.5):
818             with self.assertRaises(arvados.errors.KeepWriteError):
819                 self.keepClient().put(self.DATA, copies=1, num_retries=0)
820
821     def test_low_bandwidth_no_delays_success(self):
822         self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
823         kc = self.keepClient()
824         loc = kc.put(self.DATA, copies=1, num_retries=0)
825         self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
826
827     def test_too_low_bandwidth_no_delays_failure(self):
828         # Check that lessening bandwidth corresponds to failing
829         kc = self.keepClient()
830         loc = kc.put(self.DATA, copies=1, num_retries=0)
831         self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
832         with self.assertTakesGreater(self.TIMEOUT_TIME):
833             with self.assertRaises(arvados.errors.KeepReadError) as e:
834                 kc.get(loc, num_retries=0)
835         with self.assertTakesGreater(self.TIMEOUT_TIME):
836             with self.assertRaises(arvados.errors.KeepWriteError):
837                 kc.put(self.DATA, copies=1, num_retries=0)
838
839     def test_low_bandwidth_with_server_response_delay_failure(self):
840         kc = self.keepClient()
841         loc = kc.put(self.DATA, copies=1, num_retries=0)
842         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
843         self.server.setdelays(response=self.TIMEOUT_TIME)
844         with self.assertTakesGreater(self.TIMEOUT_TIME):
845             with self.assertRaises(arvados.errors.KeepReadError) as e:
846                 kc.get(loc, num_retries=0)
847         with self.assertTakesGreater(self.TIMEOUT_TIME):
848             with self.assertRaises(arvados.errors.KeepWriteError):
849                 kc.put(self.DATA, copies=1, num_retries=0)
850         with self.assertTakesGreater(self.TIMEOUT_TIME):
851             with self.assertRaises(arvados.errors.KeepReadError) as e:
852                 kc.head(loc, num_retries=0)
853
854     def test_low_bandwidth_with_server_mid_delay_failure(self):
855         kc = self.keepClient()
856         loc = kc.put(self.DATA, copies=1, num_retries=0)
857         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
858         self.server.setdelays(mid_write=self.TIMEOUT_TIME, mid_read=self.TIMEOUT_TIME)
859         with self.assertTakesGreater(self.TIMEOUT_TIME):
860             with self.assertRaises(arvados.errors.KeepReadError) as e:
861                 kc.get(loc, num_retries=0)
862         with self.assertTakesGreater(self.TIMEOUT_TIME):
863             with self.assertRaises(arvados.errors.KeepWriteError):
864                 kc.put(self.DATA, copies=1, num_retries=0)
865
866     def test_timeout_slow_request(self):
867         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
868         self.server.setdelays(request=.2)
869         self._test_connect_timeout_under_200ms(loc)
870         self.server.setdelays(request=2)
871         self._test_response_timeout_under_2s(loc)
872
873     def test_timeout_slow_response(self):
874         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
875         self.server.setdelays(response=.2)
876         self._test_connect_timeout_under_200ms(loc)
877         self.server.setdelays(response=2)
878         self._test_response_timeout_under_2s(loc)
879
880     def test_timeout_slow_response_body(self):
881         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
882         self.server.setdelays(response_body=.2)
883         self._test_connect_timeout_under_200ms(loc)
884         self.server.setdelays(response_body=2)
885         self._test_response_timeout_under_2s(loc)
886
887     def _test_connect_timeout_under_200ms(self, loc):
888         # Allow 100ms to connect, then 1s for response. Everything
889         # should work, and everything should take at least 200ms to
890         # return.
891         kc = self.keepClient(timeouts=(.1, 1))
892         with self.assertTakesBetween(.2, .3):
893             kc.put(self.DATA, copies=1, num_retries=0)
894         with self.assertTakesBetween(.2, .3):
895             self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
896
897     def _test_response_timeout_under_2s(self, loc):
898         # Allow 10s to connect, then 1s for response. Nothing should
899         # work, and everything should take at least 1s to return.
900         kc = self.keepClient(timeouts=(10, 1))
901         with self.assertTakesBetween(1, 9):
902             with self.assertRaises(arvados.errors.KeepReadError):
903                 kc.get(loc, num_retries=0)
904         with self.assertTakesBetween(1, 9):
905             with self.assertRaises(arvados.errors.KeepWriteError):
906                 kc.put(self.DATA, copies=1, num_retries=0)
907
908
909 class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
910     def mock_disks_and_gateways(self, disks=3, gateways=1):
911         self.gateways = [{
912                 'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
913                 'owner_uuid': 'zzzzz-tpzed-000000000000000',
914                 'service_host': 'gatewayhost{}'.format(i),
915                 'service_port': 12345,
916                 'service_ssl_flag': True,
917                 'service_type': 'gateway:test',
918         } for i in range(gateways)]
919         self.gateway_roots = [
920             "https://{service_host}:{service_port}/".format(**gw)
921             for gw in self.gateways]
922         self.api_client = self.mock_keep_services(
923             count=disks, additional_services=self.gateways)
924         self.keepClient = arvados.KeepClient(api_client=self.api_client)
925
926     @mock.patch('pycurl.Curl')
927     def test_get_with_gateway_hint_first(self, MockCurl):
928         MockCurl.return_value = tutil.FakeCurl.make(
929             code=200, body='foo', headers={'Content-Length': 3})
930         self.mock_disks_and_gateways()
931         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@' + self.gateways[0]['uuid']
932         self.assertEqual(b'foo', self.keepClient.get(locator))
933         self.assertEqual(self.gateway_roots[0]+locator,
934                          MockCurl.return_value.getopt(pycurl.URL).decode())
935         self.assertEqual(True, self.keepClient.head(locator))
936
937     @mock.patch('pycurl.Curl')
938     def test_get_with_gateway_hints_in_order(self, MockCurl):
939         gateways = 4
940         disks = 3
941         mocks = [
942             tutil.FakeCurl.make(code=404, body='')
943             for _ in range(gateways+disks)
944         ]
945         MockCurl.side_effect = tutil.queue_with(mocks)
946         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
947         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
948                            ['K@'+gw['uuid'] for gw in self.gateways])
949         with self.assertRaises(arvados.errors.NotFoundError):
950             self.keepClient.get(locator)
951         # Gateways are tried first, in the order given.
952         for i, root in enumerate(self.gateway_roots):
953             self.assertEqual(root+locator,
954                              mocks[i].getopt(pycurl.URL).decode())
955         # Disk services are tried next.
956         for i in range(gateways, gateways+disks):
957             self.assertRegex(
958                 mocks[i].getopt(pycurl.URL).decode(),
959                 r'keep0x')
960
961     @mock.patch('pycurl.Curl')
962     def test_head_with_gateway_hints_in_order(self, MockCurl):
963         gateways = 4
964         disks = 3
965         mocks = [
966             tutil.FakeCurl.make(code=404, body=b'')
967             for _ in range(gateways+disks)
968         ]
969         MockCurl.side_effect = tutil.queue_with(mocks)
970         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
971         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
972                            ['K@'+gw['uuid'] for gw in self.gateways])
973         with self.assertRaises(arvados.errors.NotFoundError):
974             self.keepClient.head(locator)
975         # Gateways are tried first, in the order given.
976         for i, root in enumerate(self.gateway_roots):
977             self.assertEqual(root+locator,
978                              mocks[i].getopt(pycurl.URL).decode())
979         # Disk services are tried next.
980         for i in range(gateways, gateways+disks):
981             self.assertRegex(
982                 mocks[i].getopt(pycurl.URL).decode(),
983                 r'keep0x')
984
985     @mock.patch('pycurl.Curl')
986     def test_get_with_remote_proxy_hint(self, MockCurl):
987         MockCurl.return_value = tutil.FakeCurl.make(
988             code=200, body=b'foo', headers={'Content-Length': 3})
989         self.mock_disks_and_gateways()
990         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
991         self.assertEqual(b'foo', self.keepClient.get(locator))
992         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
993                          MockCurl.return_value.getopt(pycurl.URL).decode())
994
995     @mock.patch('pycurl.Curl')
996     def test_head_with_remote_proxy_hint(self, MockCurl):
997         MockCurl.return_value = tutil.FakeCurl.make(
998             code=200, body=b'foo', headers={'Content-Length': 3})
999         self.mock_disks_and_gateways()
1000         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1001         self.assertEqual(True, self.keepClient.head(locator))
1002         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1003                          MockCurl.return_value.getopt(pycurl.URL).decode())
1004
1005
1006 class KeepClientRetryTestMixin(object):
1007     # Testing with a local Keep store won't exercise the retry behavior.
1008     # Instead, our strategy is:
1009     # * Create a client with one proxy specified (pointed at a black
1010     #   hole), so there's no need to instantiate an API client, and
1011     #   all HTTP requests come from one place.
1012     # * Mock httplib's request method to provide simulated responses.
1013     # This lets us test the retry logic extensively without relying on any
1014     # supporting servers, and prevents side effects in case something hiccups.
1015     # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
1016     # run_method().
1017     #
1018     # Test classes must define TEST_PATCHER to a method that mocks
1019     # out appropriate methods in the client.
1020
1021     PROXY_ADDR = 'http://[%s]:65535/' % (tutil.TEST_HOST,)
1022     TEST_DATA = b'testdata'
1023     TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
1024
1025     def setUp(self):
1026         self.client_kwargs = {'proxy': self.PROXY_ADDR, 'local_store': ''}
1027
1028     def new_client(self, **caller_kwargs):
1029         kwargs = self.client_kwargs.copy()
1030         kwargs.update(caller_kwargs)
1031         return arvados.KeepClient(**kwargs)
1032
1033     def run_method(self, *args, **kwargs):
1034         raise NotImplementedError("test subclasses must define run_method")
1035
1036     def check_success(self, expected=None, *args, **kwargs):
1037         if expected is None:
1038             expected = self.DEFAULT_EXPECT
1039         self.assertEqual(expected, self.run_method(*args, **kwargs))
1040
1041     def check_exception(self, error_class=None, *args, **kwargs):
1042         if error_class is None:
1043             error_class = self.DEFAULT_EXCEPTION
1044         self.assertRaises(error_class, self.run_method, *args, **kwargs)
1045
1046     def test_immediate_success(self):
1047         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 200):
1048             self.check_success()
1049
1050     def test_retry_then_success(self):
1051         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1052             self.check_success(num_retries=3)
1053
1054     def test_exception_then_success(self):
1055         with self.TEST_PATCHER(self.DEFAULT_EXPECT, Exception('mock err'), 200):
1056             self.check_success(num_retries=3)
1057
1058     def test_no_default_retry(self):
1059         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1060             self.check_exception()
1061
1062     def test_no_retry_after_permanent_error(self):
1063         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 403, 200):
1064             self.check_exception(num_retries=3)
1065
1066     def test_error_after_retries_exhausted(self):
1067         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 500, 200):
1068             self.check_exception(num_retries=1)
1069
1070     def test_num_retries_instance_fallback(self):
1071         self.client_kwargs['num_retries'] = 3
1072         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1073             self.check_success()
1074
1075
1076 @tutil.skip_sleep
1077 class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1078     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
1079     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1080     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1081     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1082
1083     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1084                    *args, **kwargs):
1085         return self.new_client().get(locator, *args, **kwargs)
1086
1087     def test_specific_exception_when_not_found(self):
1088         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1089             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1090
1091     def test_general_exception_with_mixed_errors(self):
1092         # get should raise a NotFoundError if no server returns the block,
1093         # and a high threshold of servers report that it's not found.
1094         # This test rigs up 50/50 disagreement between two servers, and
1095         # checks that it does not become a NotFoundError.
1096         client = self.new_client()
1097         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1098             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1099                 client.get(self.HINTED_LOCATOR)
1100             self.assertNotIsInstance(
1101                 exc_check.exception, arvados.errors.NotFoundError,
1102                 "mixed errors raised NotFoundError")
1103
1104     def test_hint_server_can_succeed_without_retries(self):
1105         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1106             self.check_success(locator=self.HINTED_LOCATOR)
1107
1108     def test_try_next_server_after_timeout(self):
1109         with tutil.mock_keep_responses(
1110                 (socket.timeout("timed out"), 200),
1111                 (self.DEFAULT_EXPECT, 200)):
1112             self.check_success(locator=self.HINTED_LOCATOR)
1113
1114     def test_retry_data_with_wrong_checksum(self):
1115         with tutil.mock_keep_responses(
1116                 ('baddata', 200),
1117                 (self.DEFAULT_EXPECT, 200)):
1118             self.check_success(locator=self.HINTED_LOCATOR)
1119
1120 @tutil.skip_sleep
1121 class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1122     DEFAULT_EXPECT = True
1123     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1124     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1125     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1126
1127     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1128                    *args, **kwargs):
1129         return self.new_client().head(locator, *args, **kwargs)
1130
1131     def test_specific_exception_when_not_found(self):
1132         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1133             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1134
1135     def test_general_exception_with_mixed_errors(self):
1136         # head should raise a NotFoundError if no server returns the block,
1137         # and a high threshold of servers report that it's not found.
1138         # This test rigs up 50/50 disagreement between two servers, and
1139         # checks that it does not become a NotFoundError.
1140         client = self.new_client()
1141         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1142             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1143                 client.head(self.HINTED_LOCATOR)
1144             self.assertNotIsInstance(
1145                 exc_check.exception, arvados.errors.NotFoundError,
1146                 "mixed errors raised NotFoundError")
1147
1148     def test_hint_server_can_succeed_without_retries(self):
1149         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1150             self.check_success(locator=self.HINTED_LOCATOR)
1151
1152     def test_try_next_server_after_timeout(self):
1153         with tutil.mock_keep_responses(
1154                 (socket.timeout("timed out"), 200),
1155                 (self.DEFAULT_EXPECT, 200)):
1156             self.check_success(locator=self.HINTED_LOCATOR)
1157
1158 @tutil.skip_sleep
1159 class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1160     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
1161     DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
1162     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1163
1164     def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
1165                    copies=1, *args, **kwargs):
1166         return self.new_client().put(data, copies, *args, **kwargs)
1167
1168     def test_do_not_send_multiple_copies_to_same_server(self):
1169         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200):
1170             self.check_exception(copies=2, num_retries=3)
1171
1172
1173 class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
1174
1175     class FakeKeepService(object):
1176         def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
1177             self.delay = delay
1178             self.will_succeed = will_succeed
1179             self.will_raise = will_raise
1180             self._result = {}
1181             self._result['headers'] = {}
1182             self._result['headers']['x-keep-replicas-stored'] = str(replicas)
1183             self._result['body'] = 'foobar'
1184
1185         def put(self, data_hash, data, timeout):
1186             time.sleep(self.delay)
1187             if self.will_raise is not None:
1188                 raise self.will_raise
1189             return self.will_succeed
1190
1191         def last_result(self):
1192             if self.will_succeed:
1193                 return self._result
1194
1195         def finished(self):
1196             return False
1197
1198     def setUp(self):
1199         self.copies = 3
1200         self.pool = arvados.KeepClient.KeepWriterThreadPool(
1201             data = 'foo',
1202             data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1203             max_service_replicas = self.copies,
1204             copies = self.copies
1205         )
1206
1207     def test_only_write_enough_on_success(self):
1208         for i in range(10):
1209             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1210             self.pool.add_task(ks, None)
1211         self.pool.join()
1212         self.assertEqual(self.pool.done(), self.copies)
1213
1214     def test_only_write_enough_on_partial_success(self):
1215         for i in range(5):
1216             ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
1217             self.pool.add_task(ks, None)
1218             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1219             self.pool.add_task(ks, None)
1220         self.pool.join()
1221         self.assertEqual(self.pool.done(), self.copies)
1222
1223     def test_only_write_enough_when_some_crash(self):
1224         for i in range(5):
1225             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1226             self.pool.add_task(ks, None)
1227             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1228             self.pool.add_task(ks, None)
1229         self.pool.join()
1230         self.assertEqual(self.pool.done(), self.copies)
1231
1232     def test_fail_when_too_many_crash(self):
1233         for i in range(self.copies+1):
1234             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1235             self.pool.add_task(ks, None)
1236         for i in range(self.copies-1):
1237             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1238             self.pool.add_task(ks, None)
1239         self.pool.join()
1240         self.assertEqual(self.pool.done(), self.copies-1)
1241
1242
1243 @tutil.skip_sleep
1244 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock):
1245     # Test put()s that need two distinct servers to succeed, possibly
1246     # requiring multiple passes through the retry loop.
1247
1248     def setUp(self):
1249         self.api_client = self.mock_keep_services(count=2)
1250         self.keep_client = arvados.KeepClient(api_client=self.api_client)
1251
1252     def test_success_after_exception(self):
1253         with tutil.mock_keep_responses(
1254                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1255                 Exception('mock err'), 200, 200) as req_mock:
1256             self.keep_client.put('foo', num_retries=1, copies=2)
1257         self.assertEqual(3, req_mock.call_count)
1258
1259     def test_success_after_retryable_error(self):
1260         with tutil.mock_keep_responses(
1261                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1262                 500, 200, 200) as req_mock:
1263             self.keep_client.put('foo', num_retries=1, copies=2)
1264         self.assertEqual(3, req_mock.call_count)
1265
1266     def test_fail_after_final_error(self):
1267         # First retry loop gets a 200 (can't achieve replication by
1268         # storing again on that server) and a 400 (can't retry that
1269         # server at all), so we shouldn't try a third request.
1270         with tutil.mock_keep_responses(
1271                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1272                 200, 400, 200) as req_mock:
1273             with self.assertRaises(arvados.errors.KeepWriteError):
1274                 self.keep_client.put('foo', num_retries=1, copies=2)
1275         self.assertEqual(2, req_mock.call_count)
1276
1277 class KeepClientAPIErrorTest(unittest.TestCase):
1278     def test_api_fail(self):
1279         class ApiMock(object):
1280             def __getattr__(self, r):
1281                 if r == "api_token":
1282                     return "abc"
1283                 elif r == "insecure":
1284                     return False
1285                 else:
1286                     raise arvados.errors.KeepReadError()
1287         keep_client = arvados.KeepClient(api_client=ApiMock(),
1288                                              proxy='', local_store='')
1289
1290         # The bug this is testing for is that if an API (not
1291         # keepstore) exception is thrown as part of a get(), the next
1292         # attempt to get that same block will result in a deadlock.
1293         # This is why there are two get()s in a row.  Unfortunately,
1294         # the failure mode for this test is that the test suite
1295         # deadlocks, there isn't a good way to avoid that without
1296         # adding a special case that has no use except for this test.
1297
1298         with self.assertRaises(arvados.errors.KeepReadError):
1299             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1300         with self.assertRaises(arvados.errors.KeepReadError):
1301             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")