11308: Merge branch 'master' into 11308-python3
[arvados.git] / sdk / python / tests / test_keep_client.py
1 from __future__ import absolute_import
2 from __future__ import division
3 from future import standard_library
4 standard_library.install_aliases()
5 from builtins import str
6 from builtins import range
7 from builtins import object
8 import hashlib
9 import mock
10 import os
11 import pycurl
12 import random
13 import re
14 import socket
15 import sys
16 import time
17 import unittest
18 import urllib.parse
19
20 import arvados
21 import arvados.retry
22 from . import arvados_testutil as tutil
23 from . import keepstub
24 from . import run_test_server
25
26 class KeepTestCase(run_test_server.TestCaseWithServers):
27     MAIN_SERVER = {}
28     KEEP_SERVER = {}
29
30     @classmethod
31     def setUpClass(cls):
32         super(KeepTestCase, cls).setUpClass()
33         run_test_server.authorize_with("admin")
34         cls.api_client = arvados.api('v1')
35         cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
36                                              proxy='', local_store='')
37
38     def test_KeepBasicRWTest(self):
39         self.assertEqual(0, self.keep_client.upload_counter.get())
40         foo_locator = self.keep_client.put('foo')
41         self.assertRegex(
42             foo_locator,
43             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
44             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
45
46         # 6 bytes because uploaded 2 copies
47         self.assertEqual(6, self.keep_client.upload_counter.get())
48
49         self.assertEqual(0, self.keep_client.download_counter.get())
50         self.assertEqual(self.keep_client.get(foo_locator),
51                          b'foo',
52                          'wrong content from Keep.get(md5("foo"))')
53         self.assertEqual(3, self.keep_client.download_counter.get())
54
55     def test_KeepBinaryRWTest(self):
56         blob_str = b'\xff\xfe\xf7\x00\x01\x02'
57         blob_locator = self.keep_client.put(blob_str)
58         self.assertRegex(
59             blob_locator,
60             '^7fc7c53b45e53926ba52821140fef396\+6',
61             ('wrong locator from Keep.put(<binarydata>):' + blob_locator))
62         self.assertEqual(self.keep_client.get(blob_locator),
63                          blob_str,
64                          'wrong content from Keep.get(md5(<binarydata>))')
65
66     def test_KeepLongBinaryRWTest(self):
67         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
68         for i in range(0,23):
69             blob_data = blob_data + blob_data
70         blob_locator = self.keep_client.put(blob_data)
71         self.assertRegex(
72             blob_locator,
73             '^84d90fc0d8175dd5dcfab04b999bc956\+67108864',
74             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
75         self.assertEqual(self.keep_client.get(blob_locator),
76                          blob_data,
77                          'wrong content from Keep.get(md5(<binarydata>))')
78
79     @unittest.skip("unreliable test - please fix and close #8752")
80     def test_KeepSingleCopyRWTest(self):
81         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
82         blob_locator = self.keep_client.put(blob_data, copies=1)
83         self.assertRegex(
84             blob_locator,
85             '^c902006bc98a3eb4a3663b65ab4a6fab\+8',
86             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
87         self.assertEqual(self.keep_client.get(blob_locator),
88                          blob_data,
89                          'wrong content from Keep.get(md5(<binarydata>))')
90
91     def test_KeepEmptyCollectionTest(self):
92         blob_locator = self.keep_client.put('', copies=1)
93         self.assertRegex(
94             blob_locator,
95             '^d41d8cd98f00b204e9800998ecf8427e\+0',
96             ('wrong locator from Keep.put(""): ' + blob_locator))
97
98     def test_unicode_must_be_ascii(self):
99         # If unicode type, must only consist of valid ASCII
100         foo_locator = self.keep_client.put(u'foo')
101         self.assertRegex(
102             foo_locator,
103             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
104             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
105
106         if sys.version_info < (3, 0):
107             with self.assertRaises(UnicodeEncodeError):
108                 # Error if it is not ASCII
109                 self.keep_client.put(u'\xe2')
110
111         with self.assertRaises(AttributeError):
112             # Must be bytes or have an encode() method
113             self.keep_client.put({})
114
115     def test_KeepHeadTest(self):
116         locator = self.keep_client.put('test_head')
117         self.assertRegex(
118             locator,
119             '^b9a772c7049325feb7130fff1f8333e9\+9',
120             'wrong md5 hash from Keep.put for "test_head": ' + locator)
121         self.assertEqual(True, self.keep_client.head(locator))
122         self.assertEqual(self.keep_client.get(locator),
123                          b'test_head',
124                          'wrong content from Keep.get for "test_head"')
125
126 class KeepPermissionTestCase(run_test_server.TestCaseWithServers):
127     MAIN_SERVER = {}
128     KEEP_SERVER = {'blob_signing_key': 'abcdefghijk0123456789',
129                    'enforce_permissions': True}
130
131     def test_KeepBasicRWTest(self):
132         run_test_server.authorize_with('active')
133         keep_client = arvados.KeepClient()
134         foo_locator = keep_client.put('foo')
135         self.assertRegex(
136             foo_locator,
137             r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
138             'invalid locator from Keep.put("foo"): ' + foo_locator)
139         self.assertEqual(keep_client.get(foo_locator),
140                          b'foo',
141                          'wrong content from Keep.get(md5("foo"))')
142
143         # GET with an unsigned locator => NotFound
144         bar_locator = keep_client.put('bar')
145         unsigned_bar_locator = "37b51d194a7513e45b56f6524f2d51f2+3"
146         self.assertRegex(
147             bar_locator,
148             r'^37b51d194a7513e45b56f6524f2d51f2\+3\+A[a-f0-9]+@[a-f0-9]+$',
149             'invalid locator from Keep.put("bar"): ' + bar_locator)
150         self.assertRaises(arvados.errors.NotFoundError,
151                           keep_client.get,
152                           unsigned_bar_locator)
153
154         # GET from a different user => NotFound
155         run_test_server.authorize_with('spectator')
156         self.assertRaises(arvados.errors.NotFoundError,
157                           arvados.Keep.get,
158                           bar_locator)
159
160         # Unauthenticated GET for a signed locator => NotFound
161         # Unauthenticated GET for an unsigned locator => NotFound
162         keep_client.api_token = ''
163         self.assertRaises(arvados.errors.NotFoundError,
164                           keep_client.get,
165                           bar_locator)
166         self.assertRaises(arvados.errors.NotFoundError,
167                           keep_client.get,
168                           unsigned_bar_locator)
169
170
171 # KeepOptionalPermission: starts Keep with --permission-key-file
172 # but not --enforce-permissions (i.e. generate signatures on PUT
173 # requests, but do not require them for GET requests)
174 #
175 # All of these requests should succeed when permissions are optional:
176 # * authenticated request, signed locator
177 # * authenticated request, unsigned locator
178 # * unauthenticated request, signed locator
179 # * unauthenticated request, unsigned locator
180 class KeepOptionalPermission(run_test_server.TestCaseWithServers):
181     MAIN_SERVER = {}
182     KEEP_SERVER = {'blob_signing_key': 'abcdefghijk0123456789',
183                    'enforce_permissions': False}
184
185     @classmethod
186     def setUpClass(cls):
187         super(KeepOptionalPermission, cls).setUpClass()
188         run_test_server.authorize_with("admin")
189         cls.api_client = arvados.api('v1')
190
191     def setUp(self):
192         super(KeepOptionalPermission, self).setUp()
193         self.keep_client = arvados.KeepClient(api_client=self.api_client,
194                                               proxy='', local_store='')
195
196     def _put_foo_and_check(self):
197         signed_locator = self.keep_client.put('foo')
198         self.assertRegex(
199             signed_locator,
200             r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
201             'invalid locator from Keep.put("foo"): ' + signed_locator)
202         return signed_locator
203
204     def test_KeepAuthenticatedSignedTest(self):
205         signed_locator = self._put_foo_and_check()
206         self.assertEqual(self.keep_client.get(signed_locator),
207                          b'foo',
208                          'wrong content from Keep.get(md5("foo"))')
209
210     def test_KeepAuthenticatedUnsignedTest(self):
211         signed_locator = self._put_foo_and_check()
212         self.assertEqual(self.keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8"),
213                          b'foo',
214                          'wrong content from Keep.get(md5("foo"))')
215
216     def test_KeepUnauthenticatedSignedTest(self):
217         # Check that signed GET requests work even when permissions
218         # enforcement is off.
219         signed_locator = self._put_foo_and_check()
220         self.keep_client.api_token = ''
221         self.assertEqual(self.keep_client.get(signed_locator),
222                          b'foo',
223                          'wrong content from Keep.get(md5("foo"))')
224
225     def test_KeepUnauthenticatedUnsignedTest(self):
226         # Since --enforce-permissions is not in effect, GET requests
227         # need not be authenticated.
228         signed_locator = self._put_foo_and_check()
229         self.keep_client.api_token = ''
230         self.assertEqual(self.keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8"),
231                          b'foo',
232                          'wrong content from Keep.get(md5("foo"))')
233
234
235 class KeepProxyTestCase(run_test_server.TestCaseWithServers):
236     MAIN_SERVER = {}
237     KEEP_SERVER = {}
238     KEEP_PROXY_SERVER = {}
239
240     @classmethod
241     def setUpClass(cls):
242         super(KeepProxyTestCase, cls).setUpClass()
243         run_test_server.authorize_with('active')
244         cls.api_client = arvados.api('v1')
245
246     def tearDown(self):
247         arvados.config.settings().pop('ARVADOS_EXTERNAL_CLIENT', None)
248         super(KeepProxyTestCase, self).tearDown()
249
250     def test_KeepProxyTest1(self):
251         # Will use ARVADOS_KEEP_SERVICES environment variable that
252         # is set by setUpClass().
253         keep_client = arvados.KeepClient(api_client=self.api_client,
254                                          local_store='')
255         baz_locator = keep_client.put('baz')
256         self.assertRegex(
257             baz_locator,
258             '^73feffa4b7f6bb68e44cf984c85f6e88\+3',
259             'wrong md5 hash from Keep.put("baz"): ' + baz_locator)
260         self.assertEqual(keep_client.get(baz_locator),
261                          b'baz',
262                          'wrong content from Keep.get(md5("baz"))')
263         self.assertTrue(keep_client.using_proxy)
264
265     def test_KeepProxyTest2(self):
266         # Don't instantiate the proxy directly, but set the X-External-Client
267         # header.  The API server should direct us to the proxy.
268         arvados.config.settings()['ARVADOS_EXTERNAL_CLIENT'] = 'true'
269         keep_client = arvados.KeepClient(api_client=self.api_client,
270                                          proxy='', local_store='')
271         baz_locator = keep_client.put('baz2')
272         self.assertRegex(
273             baz_locator,
274             '^91f372a266fe2bf2823cb8ec7fda31ce\+4',
275             'wrong md5 hash from Keep.put("baz2"): ' + baz_locator)
276         self.assertEqual(keep_client.get(baz_locator),
277                          b'baz2',
278                          'wrong content from Keep.get(md5("baz2"))')
279         self.assertTrue(keep_client.using_proxy)
280
281     def test_KeepProxyTestMultipleURIs(self):
282         # Test using ARVADOS_KEEP_SERVICES env var overriding any
283         # existing proxy setting and setting multiple proxies
284         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
285         keep_client = arvados.KeepClient(api_client=self.api_client,
286                                          local_store='')
287         uris = [x['_service_root'] for x in keep_client._keep_services]
288         self.assertEqual(uris, ['http://10.0.0.1/',
289                                 'https://foo.example.org:1234/'])
290
291     def test_KeepProxyTestInvalidURI(self):
292         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org'
293         with self.assertRaises(arvados.errors.ArgumentError):
294             keep_client = arvados.KeepClient(api_client=self.api_client,
295                                              local_store='')
296
297
298 class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
299     def get_service_roots(self, api_client):
300         keep_client = arvados.KeepClient(api_client=api_client)
301         services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
302         return [urllib.parse.urlparse(url) for url in sorted(services)]
303
304     def test_ssl_flag_respected_in_roots(self):
305         for ssl_flag in [False, True]:
306             services = self.get_service_roots(self.mock_keep_services(
307                 service_ssl_flag=ssl_flag))
308             self.assertEqual(
309                 ('https' if ssl_flag else 'http'), services[0].scheme)
310
311     def test_correct_ports_with_ipv6_addresses(self):
312         service = self.get_service_roots(self.mock_keep_services(
313             service_type='proxy', service_host='100::1', service_port=10, count=1))[0]
314         self.assertEqual('100::1', service.hostname)
315         self.assertEqual(10, service.port)
316
317     # test_*_timeout verify that KeepClient instructs pycurl to use
318     # the appropriate connection and read timeouts. They don't care
319     # whether pycurl actually exhibits the expected timeout behavior
320     # -- those tests are in the KeepClientTimeout test class.
321
322     def test_get_timeout(self):
323         api_client = self.mock_keep_services(count=1)
324         force_timeout = socket.timeout("timed out")
325         with tutil.mock_keep_responses(force_timeout, 0) as mock:
326             keep_client = arvados.KeepClient(api_client=api_client)
327             with self.assertRaises(arvados.errors.KeepReadError):
328                 keep_client.get('ffffffffffffffffffffffffffffffff')
329             self.assertEqual(
330                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
331                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
332             self.assertEqual(
333                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
334                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
335             self.assertEqual(
336                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
337                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
338
339     def test_put_timeout(self):
340         api_client = self.mock_keep_services(count=1)
341         force_timeout = socket.timeout("timed out")
342         with tutil.mock_keep_responses(force_timeout, 0) as mock:
343             keep_client = arvados.KeepClient(api_client=api_client)
344             with self.assertRaises(arvados.errors.KeepWriteError):
345                 keep_client.put(b'foo')
346             self.assertEqual(
347                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
348                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
349             self.assertEqual(
350                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
351                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
352             self.assertEqual(
353                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
354                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
355
356     def test_head_timeout(self):
357         api_client = self.mock_keep_services(count=1)
358         force_timeout = socket.timeout("timed out")
359         with tutil.mock_keep_responses(force_timeout, 0) as mock:
360             keep_client = arvados.KeepClient(api_client=api_client)
361             with self.assertRaises(arvados.errors.KeepReadError):
362                 keep_client.head('ffffffffffffffffffffffffffffffff')
363             self.assertEqual(
364                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
365                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
366             self.assertEqual(
367                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
368                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
369             self.assertEqual(
370                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
371                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
372
373     def test_proxy_get_timeout(self):
374         api_client = self.mock_keep_services(service_type='proxy', count=1)
375         force_timeout = socket.timeout("timed out")
376         with tutil.mock_keep_responses(force_timeout, 0) as mock:
377             keep_client = arvados.KeepClient(api_client=api_client)
378             with self.assertRaises(arvados.errors.KeepReadError):
379                 keep_client.get('ffffffffffffffffffffffffffffffff')
380             self.assertEqual(
381                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
382                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
383             self.assertEqual(
384                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
385                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
386             self.assertEqual(
387                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
388                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
389
390     def test_proxy_head_timeout(self):
391         api_client = self.mock_keep_services(service_type='proxy', count=1)
392         force_timeout = socket.timeout("timed out")
393         with tutil.mock_keep_responses(force_timeout, 0) as mock:
394             keep_client = arvados.KeepClient(api_client=api_client)
395             with self.assertRaises(arvados.errors.KeepReadError):
396                 keep_client.head('ffffffffffffffffffffffffffffffff')
397             self.assertEqual(
398                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
399                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
400             self.assertEqual(
401                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
402                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
403             self.assertEqual(
404                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
405                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
406
407     def test_proxy_put_timeout(self):
408         api_client = self.mock_keep_services(service_type='proxy', count=1)
409         force_timeout = socket.timeout("timed out")
410         with tutil.mock_keep_responses(force_timeout, 0) as mock:
411             keep_client = arvados.KeepClient(api_client=api_client)
412             with self.assertRaises(arvados.errors.KeepWriteError):
413                 keep_client.put('foo')
414             self.assertEqual(
415                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
416                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
417             self.assertEqual(
418                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
419                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
420             self.assertEqual(
421                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
422                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
423
424     def check_no_services_error(self, verb, exc_class):
425         api_client = mock.MagicMock(name='api_client')
426         api_client.keep_services().accessible().execute.side_effect = (
427             arvados.errors.ApiError)
428         keep_client = arvados.KeepClient(api_client=api_client)
429         with self.assertRaises(exc_class) as err_check:
430             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
431         self.assertEqual(0, len(err_check.exception.request_errors()))
432
433     def test_get_error_with_no_services(self):
434         self.check_no_services_error('get', arvados.errors.KeepReadError)
435
436     def test_head_error_with_no_services(self):
437         self.check_no_services_error('head', arvados.errors.KeepReadError)
438
439     def test_put_error_with_no_services(self):
440         self.check_no_services_error('put', arvados.errors.KeepWriteError)
441
442     def check_errors_from_last_retry(self, verb, exc_class):
443         api_client = self.mock_keep_services(count=2)
444         req_mock = tutil.mock_keep_responses(
445             "retry error reporting test", 500, 500, 403, 403)
446         with req_mock, tutil.skip_sleep, \
447                 self.assertRaises(exc_class) as err_check:
448             keep_client = arvados.KeepClient(api_client=api_client)
449             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
450                                        num_retries=3)
451         self.assertEqual([403, 403], [
452                 getattr(error, 'status_code', None)
453                 for error in err_check.exception.request_errors().values()])
454
455     def test_get_error_reflects_last_retry(self):
456         self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
457
458     def test_head_error_reflects_last_retry(self):
459         self.check_errors_from_last_retry('head', arvados.errors.KeepReadError)
460
461     def test_put_error_reflects_last_retry(self):
462         self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
463
464     def test_put_error_does_not_include_successful_puts(self):
465         data = 'partial failure test'
466         data_loc = tutil.str_keep_locator(data)
467         api_client = self.mock_keep_services(count=3)
468         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
469                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
470             keep_client = arvados.KeepClient(api_client=api_client)
471             keep_client.put(data)
472         self.assertEqual(2, len(exc_check.exception.request_errors()))
473
474     def test_proxy_put_with_no_writable_services(self):
475         data = 'test with no writable services'
476         data_loc = tutil.str_keep_locator(data)
477         api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
478         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
479                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
480           keep_client = arvados.KeepClient(api_client=api_client)
481           keep_client.put(data)
482         self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
483         self.assertEqual(0, len(exc_check.exception.request_errors()))
484
485     def test_oddball_service_get(self):
486         body = b'oddball service get'
487         api_client = self.mock_keep_services(service_type='fancynewblobstore')
488         with tutil.mock_keep_responses(body, 200):
489             keep_client = arvados.KeepClient(api_client=api_client)
490             actual = keep_client.get(tutil.str_keep_locator(body))
491         self.assertEqual(body, actual)
492
493     def test_oddball_service_put(self):
494         body = b'oddball service put'
495         pdh = tutil.str_keep_locator(body)
496         api_client = self.mock_keep_services(service_type='fancynewblobstore')
497         with tutil.mock_keep_responses(pdh, 200):
498             keep_client = arvados.KeepClient(api_client=api_client)
499             actual = keep_client.put(body, copies=1)
500         self.assertEqual(pdh, actual)
501
502     def test_oddball_service_writer_count(self):
503         body = b'oddball service writer count'
504         pdh = tutil.str_keep_locator(body)
505         api_client = self.mock_keep_services(service_type='fancynewblobstore',
506                                              count=4)
507         headers = {'x-keep-replicas-stored': 3}
508         with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
509                                        **headers) as req_mock:
510             keep_client = arvados.KeepClient(api_client=api_client)
511             actual = keep_client.put(body, copies=2)
512         self.assertEqual(pdh, actual)
513         self.assertEqual(1, req_mock.call_count)
514
515
516 @tutil.skip_sleep
517 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
518
519     def setUp(self):
520         # expected_order[i] is the probe order for
521         # hash=md5(sprintf("%064x",i)) where there are 16 services
522         # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
523         # the first probe for the block consisting of 64 "0"
524         # characters is the service whose uuid is
525         # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
526         self.services = 16
527         self.expected_order = [
528             list('3eab2d5fc9681074'),
529             list('097dba52e648f1c3'),
530             list('c5b4e023f8a7d691'),
531             list('9d81c02e76a3bf54'),
532             ]
533         self.blocks = [
534             "{:064x}".format(x).encode()
535             for x in range(len(self.expected_order))]
536         self.hashes = [
537             hashlib.md5(self.blocks[x]).hexdigest()
538             for x in range(len(self.expected_order))]
539         self.api_client = self.mock_keep_services(count=self.services)
540         self.keep_client = arvados.KeepClient(api_client=self.api_client)
541
542     def test_weighted_service_roots_against_reference_set(self):
543         # Confirm weighted_service_roots() returns the correct order
544         for i, hash in enumerate(self.hashes):
545             roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash))
546             got_order = [
547                 re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
548                 for root in roots]
549             self.assertEqual(self.expected_order[i], got_order)
550
551     def test_get_probe_order_against_reference_set(self):
552         self._test_probe_order_against_reference_set(
553             lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
554
555     def test_head_probe_order_against_reference_set(self):
556         self._test_probe_order_against_reference_set(
557             lambda i: self.keep_client.head(self.hashes[i], num_retries=1))
558
559     def test_put_probe_order_against_reference_set(self):
560         # copies=1 prevents the test from being sensitive to races
561         # between writer threads.
562         self._test_probe_order_against_reference_set(
563             lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1))
564
565     def _test_probe_order_against_reference_set(self, op):
566         for i in range(len(self.blocks)):
567             with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \
568                  self.assertRaises(arvados.errors.KeepRequestError):
569                 op(i)
570             got_order = [
571                 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
572                 for resp in mock.responses]
573             self.assertEqual(self.expected_order[i]*2, got_order)
574
575     def test_put_probe_order_multiple_copies(self):
576         for copies in range(2, 4):
577             for i in range(len(self.blocks)):
578                 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \
579                      self.assertRaises(arvados.errors.KeepWriteError):
580                     self.keep_client.put(self.blocks[i], num_retries=2, copies=copies)
581                 got_order = [
582                     re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
583                     for resp in mock.responses]
584                 # With T threads racing to make requests, the position
585                 # of a given server in the sequence of HTTP requests
586                 # (got_order) cannot be more than T-1 positions
587                 # earlier than that server's position in the reference
588                 # probe sequence (expected_order).
589                 #
590                 # Loop invariant: we have accounted for +pos+ expected
591                 # probes, either by seeing them in +got_order+ or by
592                 # putting them in +pending+ in the hope of seeing them
593                 # later. As long as +len(pending)<T+, we haven't
594                 # started a request too early.
595                 pending = []
596                 for pos, expected in enumerate(self.expected_order[i]*3):
597                     got = got_order[pos-len(pending)]
598                     while got in pending:
599                         del pending[pending.index(got)]
600                         got = got_order[pos-len(pending)]
601                     if got != expected:
602                         pending.append(expected)
603                         self.assertLess(
604                             len(pending), copies,
605                             "pending={}, with copies={}, got {}, expected {}".format(
606                                 pending, copies, repr(got_order), repr(self.expected_order[i]*3)))
607
608     def test_probe_waste_adding_one_server(self):
609         hashes = [
610             hashlib.md5("{:064x}".format(x).encode()).hexdigest() for x in range(100)]
611         initial_services = 12
612         self.api_client = self.mock_keep_services(count=initial_services)
613         self.keep_client = arvados.KeepClient(api_client=self.api_client)
614         probes_before = [
615             self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
616         for added_services in range(1, 12):
617             api_client = self.mock_keep_services(count=initial_services+added_services)
618             keep_client = arvados.KeepClient(api_client=api_client)
619             total_penalty = 0
620             for hash_index in range(len(hashes)):
621                 probe_after = keep_client.weighted_service_roots(
622                     arvados.KeepLocator(hashes[hash_index]))
623                 penalty = probe_after.index(probes_before[hash_index][0])
624                 self.assertLessEqual(penalty, added_services)
625                 total_penalty += penalty
626             # Average penalty per block should not exceed
627             # N(added)/N(orig) by more than 20%, and should get closer
628             # to the ideal as we add data points.
629             expect_penalty = (
630                 added_services *
631                 len(hashes) / initial_services)
632             max_penalty = (
633                 expect_penalty *
634                 (120 - added_services)/100)
635             min_penalty = (
636                 expect_penalty * 8/10)
637             self.assertTrue(
638                 min_penalty <= total_penalty <= max_penalty,
639                 "With {}+{} services, {} blocks, penalty {} but expected {}..{}".format(
640                     initial_services,
641                     added_services,
642                     len(hashes),
643                     total_penalty,
644                     min_penalty,
645                     max_penalty))
646
647     def check_64_zeros_error_order(self, verb, exc_class):
648         data = b'0' * 64
649         if verb == 'get':
650             data = tutil.str_keep_locator(data)
651         # Arbitrary port number:
652         aport = random.randint(1024,65535)
653         api_client = self.mock_keep_services(service_port=aport, count=self.services)
654         keep_client = arvados.KeepClient(api_client=api_client)
655         with mock.patch('pycurl.Curl') as curl_mock, \
656              self.assertRaises(exc_class) as err_check:
657             curl_mock.return_value = tutil.FakeCurl.make(code=500, body=b'')
658             getattr(keep_client, verb)(data)
659         urls = [urllib.parse.urlparse(url)
660                 for url in err_check.exception.request_errors()]
661         self.assertEqual([('keep0x' + c, aport) for c in '3eab2d5fc9681074'],
662                          [(url.hostname, url.port) for url in urls])
663
664     def test_get_error_shows_probe_order(self):
665         self.check_64_zeros_error_order('get', arvados.errors.KeepReadError)
666
667     def test_put_error_shows_probe_order(self):
668         self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
669
670
671 class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase):
672     # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
673     # 1s worth of data and then trigger bandwidth errors before running
674     # out of data.
675     DATA = b'x'*2**11
676     BANDWIDTH_LOW_LIM = 1024
677     TIMEOUT_TIME = 1.0
678
679     class assertTakesBetween(unittest.TestCase):
680         def __init__(self, tmin, tmax):
681             self.tmin = tmin
682             self.tmax = tmax
683
684         def __enter__(self):
685             self.t0 = time.time()
686
687         def __exit__(self, *args, **kwargs):
688             # Round times to milliseconds, like CURL. Otherwise, we
689             # fail when CURL reaches a 1s timeout at 0.9998s.
690             delta = round(time.time() - self.t0, 3)
691             self.assertGreaterEqual(delta, self.tmin)
692             self.assertLessEqual(delta, self.tmax)
693
694     class assertTakesGreater(unittest.TestCase):
695         def __init__(self, tmin):
696             self.tmin = tmin
697
698         def __enter__(self):
699             self.t0 = time.time()
700
701         def __exit__(self, *args, **kwargs):
702             delta = round(time.time() - self.t0, 3)
703             self.assertGreaterEqual(delta, self.tmin)
704
705     def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
706         return arvados.KeepClient(
707             api_client=self.api_client,
708             timeout=timeouts)
709
710     def test_timeout_slow_connect(self):
711         # Can't simulate TCP delays with our own socket. Leave our
712         # stub server running uselessly, and try to connect to an
713         # unroutable IP address instead.
714         self.api_client = self.mock_keep_services(
715             count=1,
716             service_host='240.0.0.0',
717         )
718         with self.assertTakesBetween(0.1, 0.5):
719             with self.assertRaises(arvados.errors.KeepWriteError):
720                 self.keepClient().put(self.DATA, copies=1, num_retries=0)
721
722     def test_low_bandwidth_no_delays_success(self):
723         self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
724         kc = self.keepClient()
725         loc = kc.put(self.DATA, copies=1, num_retries=0)
726         self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
727
728     def test_too_low_bandwidth_no_delays_failure(self):
729         # Check that lessening bandwidth corresponds to failing
730         kc = self.keepClient()
731         loc = kc.put(self.DATA, copies=1, num_retries=0)
732         self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
733         with self.assertTakesGreater(self.TIMEOUT_TIME):
734             with self.assertRaises(arvados.errors.KeepReadError) as e:
735                 kc.get(loc, num_retries=0)
736         with self.assertTakesGreater(self.TIMEOUT_TIME):
737             with self.assertRaises(arvados.errors.KeepWriteError):
738                 kc.put(self.DATA, copies=1, num_retries=0)
739
740     def test_low_bandwidth_with_server_response_delay_failure(self):
741         kc = self.keepClient()
742         loc = kc.put(self.DATA, copies=1, num_retries=0)
743         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
744         self.server.setdelays(response=self.TIMEOUT_TIME)
745         with self.assertTakesGreater(self.TIMEOUT_TIME):
746             with self.assertRaises(arvados.errors.KeepReadError) as e:
747                 kc.get(loc, num_retries=0)
748         with self.assertTakesGreater(self.TIMEOUT_TIME):
749             with self.assertRaises(arvados.errors.KeepWriteError):
750                 kc.put(self.DATA, copies=1, num_retries=0)
751         with self.assertTakesGreater(self.TIMEOUT_TIME):
752             with self.assertRaises(arvados.errors.KeepReadError) as e:
753                 kc.head(loc, num_retries=0)
754
755     def test_low_bandwidth_with_server_mid_delay_failure(self):
756         kc = self.keepClient()
757         loc = kc.put(self.DATA, copies=1, num_retries=0)
758         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
759         self.server.setdelays(mid_write=self.TIMEOUT_TIME, mid_read=self.TIMEOUT_TIME)
760         with self.assertTakesGreater(self.TIMEOUT_TIME):
761             with self.assertRaises(arvados.errors.KeepReadError) as e:
762                 kc.get(loc, num_retries=0)
763         with self.assertTakesGreater(self.TIMEOUT_TIME):
764             with self.assertRaises(arvados.errors.KeepWriteError):
765                 kc.put(self.DATA, copies=1, num_retries=0)
766
767     def test_timeout_slow_request(self):
768         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
769         self.server.setdelays(request=.2)
770         self._test_connect_timeout_under_200ms(loc)
771         self.server.setdelays(request=2)
772         self._test_response_timeout_under_2s(loc)
773
774     def test_timeout_slow_response(self):
775         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
776         self.server.setdelays(response=.2)
777         self._test_connect_timeout_under_200ms(loc)
778         self.server.setdelays(response=2)
779         self._test_response_timeout_under_2s(loc)
780
781     def test_timeout_slow_response_body(self):
782         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
783         self.server.setdelays(response_body=.2)
784         self._test_connect_timeout_under_200ms(loc)
785         self.server.setdelays(response_body=2)
786         self._test_response_timeout_under_2s(loc)
787
788     def _test_connect_timeout_under_200ms(self, loc):
789         # Allow 100ms to connect, then 1s for response. Everything
790         # should work, and everything should take at least 200ms to
791         # return.
792         kc = self.keepClient(timeouts=(.1, 1))
793         with self.assertTakesBetween(.2, .3):
794             kc.put(self.DATA, copies=1, num_retries=0)
795         with self.assertTakesBetween(.2, .3):
796             self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
797
798     def _test_response_timeout_under_2s(self, loc):
799         # Allow 10s to connect, then 1s for response. Nothing should
800         # work, and everything should take at least 1s to return.
801         kc = self.keepClient(timeouts=(10, 1))
802         with self.assertTakesBetween(1, 9):
803             with self.assertRaises(arvados.errors.KeepReadError):
804                 kc.get(loc, num_retries=0)
805         with self.assertTakesBetween(1, 9):
806             with self.assertRaises(arvados.errors.KeepWriteError):
807                 kc.put(self.DATA, copies=1, num_retries=0)
808
809
810 class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
811     def mock_disks_and_gateways(self, disks=3, gateways=1):
812         self.gateways = [{
813                 'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
814                 'owner_uuid': 'zzzzz-tpzed-000000000000000',
815                 'service_host': 'gatewayhost{}'.format(i),
816                 'service_port': 12345,
817                 'service_ssl_flag': True,
818                 'service_type': 'gateway:test',
819         } for i in range(gateways)]
820         self.gateway_roots = [
821             "https://{service_host}:{service_port}/".format(**gw)
822             for gw in self.gateways]
823         self.api_client = self.mock_keep_services(
824             count=disks, additional_services=self.gateways)
825         self.keepClient = arvados.KeepClient(api_client=self.api_client)
826
827     @mock.patch('pycurl.Curl')
828     def test_get_with_gateway_hint_first(self, MockCurl):
829         MockCurl.return_value = tutil.FakeCurl.make(
830             code=200, body='foo', headers={'Content-Length': 3})
831         self.mock_disks_and_gateways()
832         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@' + self.gateways[0]['uuid']
833         self.assertEqual(b'foo', self.keepClient.get(locator))
834         self.assertEqual(self.gateway_roots[0]+locator,
835                          MockCurl.return_value.getopt(pycurl.URL).decode())
836         self.assertEqual(True, self.keepClient.head(locator))
837
838     @mock.patch('pycurl.Curl')
839     def test_get_with_gateway_hints_in_order(self, MockCurl):
840         gateways = 4
841         disks = 3
842         mocks = [
843             tutil.FakeCurl.make(code=404, body='')
844             for _ in range(gateways+disks)
845         ]
846         MockCurl.side_effect = tutil.queue_with(mocks)
847         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
848         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
849                            ['K@'+gw['uuid'] for gw in self.gateways])
850         with self.assertRaises(arvados.errors.NotFoundError):
851             self.keepClient.get(locator)
852         # Gateways are tried first, in the order given.
853         for i, root in enumerate(self.gateway_roots):
854             self.assertEqual(root+locator,
855                              mocks[i].getopt(pycurl.URL).decode())
856         # Disk services are tried next.
857         for i in range(gateways, gateways+disks):
858             self.assertRegex(
859                 mocks[i].getopt(pycurl.URL).decode(),
860                 r'keep0x')
861
862     @mock.patch('pycurl.Curl')
863     def test_head_with_gateway_hints_in_order(self, MockCurl):
864         gateways = 4
865         disks = 3
866         mocks = [
867             tutil.FakeCurl.make(code=404, body=b'')
868             for _ in range(gateways+disks)
869         ]
870         MockCurl.side_effect = tutil.queue_with(mocks)
871         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
872         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
873                            ['K@'+gw['uuid'] for gw in self.gateways])
874         with self.assertRaises(arvados.errors.NotFoundError):
875             self.keepClient.head(locator)
876         # Gateways are tried first, in the order given.
877         for i, root in enumerate(self.gateway_roots):
878             self.assertEqual(root+locator,
879                              mocks[i].getopt(pycurl.URL).decode())
880         # Disk services are tried next.
881         for i in range(gateways, gateways+disks):
882             self.assertRegex(
883                 mocks[i].getopt(pycurl.URL).decode(),
884                 r'keep0x')
885
886     @mock.patch('pycurl.Curl')
887     def test_get_with_remote_proxy_hint(self, MockCurl):
888         MockCurl.return_value = tutil.FakeCurl.make(
889             code=200, body=b'foo', headers={'Content-Length': 3})
890         self.mock_disks_and_gateways()
891         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
892         self.assertEqual(b'foo', self.keepClient.get(locator))
893         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
894                          MockCurl.return_value.getopt(pycurl.URL).decode())
895
896     @mock.patch('pycurl.Curl')
897     def test_head_with_remote_proxy_hint(self, MockCurl):
898         MockCurl.return_value = tutil.FakeCurl.make(
899             code=200, body=b'foo', headers={'Content-Length': 3})
900         self.mock_disks_and_gateways()
901         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
902         self.assertEqual(True, self.keepClient.head(locator))
903         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
904                          MockCurl.return_value.getopt(pycurl.URL).decode())
905
906
907 class KeepClientRetryTestMixin(object):
908     # Testing with a local Keep store won't exercise the retry behavior.
909     # Instead, our strategy is:
910     # * Create a client with one proxy specified (pointed at a black
911     #   hole), so there's no need to instantiate an API client, and
912     #   all HTTP requests come from one place.
913     # * Mock httplib's request method to provide simulated responses.
914     # This lets us test the retry logic extensively without relying on any
915     # supporting servers, and prevents side effects in case something hiccups.
916     # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
917     # run_method().
918     #
919     # Test classes must define TEST_PATCHER to a method that mocks
920     # out appropriate methods in the client.
921
922     PROXY_ADDR = 'http://[%s]:65535/' % (tutil.TEST_HOST,)
923     TEST_DATA = b'testdata'
924     TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
925
926     def setUp(self):
927         self.client_kwargs = {'proxy': self.PROXY_ADDR, 'local_store': ''}
928
929     def new_client(self, **caller_kwargs):
930         kwargs = self.client_kwargs.copy()
931         kwargs.update(caller_kwargs)
932         return arvados.KeepClient(**kwargs)
933
934     def run_method(self, *args, **kwargs):
935         raise NotImplementedError("test subclasses must define run_method")
936
937     def check_success(self, expected=None, *args, **kwargs):
938         if expected is None:
939             expected = self.DEFAULT_EXPECT
940         self.assertEqual(expected, self.run_method(*args, **kwargs))
941
942     def check_exception(self, error_class=None, *args, **kwargs):
943         if error_class is None:
944             error_class = self.DEFAULT_EXCEPTION
945         self.assertRaises(error_class, self.run_method, *args, **kwargs)
946
947     def test_immediate_success(self):
948         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 200):
949             self.check_success()
950
951     def test_retry_then_success(self):
952         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
953             self.check_success(num_retries=3)
954
955     def test_exception_then_success(self):
956         with self.TEST_PATCHER(self.DEFAULT_EXPECT, Exception('mock err'), 200):
957             self.check_success(num_retries=3)
958
959     def test_no_default_retry(self):
960         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
961             self.check_exception()
962
963     def test_no_retry_after_permanent_error(self):
964         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 403, 200):
965             self.check_exception(num_retries=3)
966
967     def test_error_after_retries_exhausted(self):
968         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 500, 200):
969             self.check_exception(num_retries=1)
970
971     def test_num_retries_instance_fallback(self):
972         self.client_kwargs['num_retries'] = 3
973         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
974             self.check_success()
975
976
977 @tutil.skip_sleep
978 class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
979     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
980     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
981     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
982     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
983
984     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
985                    *args, **kwargs):
986         return self.new_client().get(locator, *args, **kwargs)
987
988     def test_specific_exception_when_not_found(self):
989         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
990             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
991
992     def test_general_exception_with_mixed_errors(self):
993         # get should raise a NotFoundError if no server returns the block,
994         # and a high threshold of servers report that it's not found.
995         # This test rigs up 50/50 disagreement between two servers, and
996         # checks that it does not become a NotFoundError.
997         client = self.new_client()
998         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
999             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1000                 client.get(self.HINTED_LOCATOR)
1001             self.assertNotIsInstance(
1002                 exc_check.exception, arvados.errors.NotFoundError,
1003                 "mixed errors raised NotFoundError")
1004
1005     def test_hint_server_can_succeed_without_retries(self):
1006         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1007             self.check_success(locator=self.HINTED_LOCATOR)
1008
1009     def test_try_next_server_after_timeout(self):
1010         with tutil.mock_keep_responses(
1011                 (socket.timeout("timed out"), 200),
1012                 (self.DEFAULT_EXPECT, 200)):
1013             self.check_success(locator=self.HINTED_LOCATOR)
1014
1015     def test_retry_data_with_wrong_checksum(self):
1016         with tutil.mock_keep_responses(
1017                 ('baddata', 200),
1018                 (self.DEFAULT_EXPECT, 200)):
1019             self.check_success(locator=self.HINTED_LOCATOR)
1020
1021 @tutil.skip_sleep
1022 class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1023     DEFAULT_EXPECT = True
1024     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1025     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1026     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1027
1028     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1029                    *args, **kwargs):
1030         return self.new_client().head(locator, *args, **kwargs)
1031
1032     def test_specific_exception_when_not_found(self):
1033         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1034             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1035
1036     def test_general_exception_with_mixed_errors(self):
1037         # head should raise a NotFoundError if no server returns the block,
1038         # and a high threshold of servers report that it's not found.
1039         # This test rigs up 50/50 disagreement between two servers, and
1040         # checks that it does not become a NotFoundError.
1041         client = self.new_client()
1042         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1043             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1044                 client.head(self.HINTED_LOCATOR)
1045             self.assertNotIsInstance(
1046                 exc_check.exception, arvados.errors.NotFoundError,
1047                 "mixed errors raised NotFoundError")
1048
1049     def test_hint_server_can_succeed_without_retries(self):
1050         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1051             self.check_success(locator=self.HINTED_LOCATOR)
1052
1053     def test_try_next_server_after_timeout(self):
1054         with tutil.mock_keep_responses(
1055                 (socket.timeout("timed out"), 200),
1056                 (self.DEFAULT_EXPECT, 200)):
1057             self.check_success(locator=self.HINTED_LOCATOR)
1058
1059 @tutil.skip_sleep
1060 class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1061     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
1062     DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
1063     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1064
1065     def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
1066                    copies=1, *args, **kwargs):
1067         return self.new_client().put(data, copies, *args, **kwargs)
1068
1069     def test_do_not_send_multiple_copies_to_same_server(self):
1070         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200):
1071             self.check_exception(copies=2, num_retries=3)
1072
1073
1074 class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
1075
1076     class FakeKeepService(object):
1077         def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
1078             self.delay = delay
1079             self.will_succeed = will_succeed
1080             self.will_raise = will_raise
1081             self._result = {}
1082             self._result['headers'] = {}
1083             self._result['headers']['x-keep-replicas-stored'] = str(replicas)
1084             self._result['body'] = 'foobar'
1085
1086         def put(self, data_hash, data, timeout):
1087             time.sleep(self.delay)
1088             if self.will_raise is not None:
1089                 raise self.will_raise
1090             return self.will_succeed
1091
1092         def last_result(self):
1093             if self.will_succeed:
1094                 return self._result
1095
1096         def finished(self):
1097             return False
1098     
1099     def setUp(self):
1100         self.copies = 3
1101         self.pool = arvados.KeepClient.KeepWriterThreadPool(
1102             data = 'foo',
1103             data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1104             max_service_replicas = self.copies,
1105             copies = self.copies
1106         )
1107
1108     def test_only_write_enough_on_success(self):
1109         for i in range(10):
1110             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1111             self.pool.add_task(ks, None)
1112         self.pool.join()
1113         self.assertEqual(self.pool.done(), self.copies)
1114
1115     def test_only_write_enough_on_partial_success(self):
1116         for i in range(5):
1117             ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
1118             self.pool.add_task(ks, None)
1119             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1120             self.pool.add_task(ks, None)
1121         self.pool.join()
1122         self.assertEqual(self.pool.done(), self.copies)
1123
1124     def test_only_write_enough_when_some_crash(self):
1125         for i in range(5):
1126             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1127             self.pool.add_task(ks, None)
1128             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1129             self.pool.add_task(ks, None)
1130         self.pool.join()
1131         self.assertEqual(self.pool.done(), self.copies)
1132
1133     def test_fail_when_too_many_crash(self):
1134         for i in range(self.copies+1):
1135             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1136             self.pool.add_task(ks, None)
1137         for i in range(self.copies-1):
1138             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1139             self.pool.add_task(ks, None)
1140         self.pool.join()
1141         self.assertEqual(self.pool.done(), self.copies-1)
1142     
1143
1144 @tutil.skip_sleep
1145 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock):
1146     # Test put()s that need two distinct servers to succeed, possibly
1147     # requiring multiple passes through the retry loop.
1148
1149     def setUp(self):
1150         self.api_client = self.mock_keep_services(count=2)
1151         self.keep_client = arvados.KeepClient(api_client=self.api_client)
1152
1153     def test_success_after_exception(self):
1154         with tutil.mock_keep_responses(
1155                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1156                 Exception('mock err'), 200, 200) as req_mock:
1157             self.keep_client.put('foo', num_retries=1, copies=2)
1158         self.assertEqual(3, req_mock.call_count)
1159
1160     def test_success_after_retryable_error(self):
1161         with tutil.mock_keep_responses(
1162                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1163                 500, 200, 200) as req_mock:
1164             self.keep_client.put('foo', num_retries=1, copies=2)
1165         self.assertEqual(3, req_mock.call_count)
1166
1167     def test_fail_after_final_error(self):
1168         # First retry loop gets a 200 (can't achieve replication by
1169         # storing again on that server) and a 400 (can't retry that
1170         # server at all), so we shouldn't try a third request.
1171         with tutil.mock_keep_responses(
1172                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1173                 200, 400, 200) as req_mock:
1174             with self.assertRaises(arvados.errors.KeepWriteError):
1175                 self.keep_client.put('foo', num_retries=1, copies=2)
1176         self.assertEqual(2, req_mock.call_count)