Merge branch '8784-dir-listings'
[arvados.git] / sdk / python / tests / test_keep_client.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 standard_library.install_aliases()
9 from builtins import str
10 from builtins import range
11 from builtins import object
12 import hashlib
13 import mock
14 import os
15 import pycurl
16 import random
17 import re
18 import socket
19 import sys
20 import time
21 import unittest
22 import urllib.parse
23
24 import arvados
25 import arvados.retry
26 from . import arvados_testutil as tutil
27 from . import keepstub
28 from . import run_test_server
29
30 class KeepTestCase(run_test_server.TestCaseWithServers):
31     MAIN_SERVER = {}
32     KEEP_SERVER = {}
33
34     @classmethod
35     def setUpClass(cls):
36         super(KeepTestCase, cls).setUpClass()
37         run_test_server.authorize_with("admin")
38         cls.api_client = arvados.api('v1')
39         cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
40                                              proxy='', local_store='')
41
42     def test_KeepBasicRWTest(self):
43         self.assertEqual(0, self.keep_client.upload_counter.get())
44         foo_locator = self.keep_client.put('foo')
45         self.assertRegex(
46             foo_locator,
47             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
48             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
49
50         # 6 bytes because uploaded 2 copies
51         self.assertEqual(6, self.keep_client.upload_counter.get())
52
53         self.assertEqual(0, self.keep_client.download_counter.get())
54         self.assertEqual(self.keep_client.get(foo_locator),
55                          b'foo',
56                          'wrong content from Keep.get(md5("foo"))')
57         self.assertEqual(3, self.keep_client.download_counter.get())
58
59     def test_KeepBinaryRWTest(self):
60         blob_str = b'\xff\xfe\xf7\x00\x01\x02'
61         blob_locator = self.keep_client.put(blob_str)
62         self.assertRegex(
63             blob_locator,
64             '^7fc7c53b45e53926ba52821140fef396\+6',
65             ('wrong locator from Keep.put(<binarydata>):' + blob_locator))
66         self.assertEqual(self.keep_client.get(blob_locator),
67                          blob_str,
68                          'wrong content from Keep.get(md5(<binarydata>))')
69
70     def test_KeepLongBinaryRWTest(self):
71         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
72         for i in range(0,23):
73             blob_data = blob_data + blob_data
74         blob_locator = self.keep_client.put(blob_data)
75         self.assertRegex(
76             blob_locator,
77             '^84d90fc0d8175dd5dcfab04b999bc956\+67108864',
78             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
79         self.assertEqual(self.keep_client.get(blob_locator),
80                          blob_data,
81                          'wrong content from Keep.get(md5(<binarydata>))')
82
83     @unittest.skip("unreliable test - please fix and close #8752")
84     def test_KeepSingleCopyRWTest(self):
85         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
86         blob_locator = self.keep_client.put(blob_data, copies=1)
87         self.assertRegex(
88             blob_locator,
89             '^c902006bc98a3eb4a3663b65ab4a6fab\+8',
90             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
91         self.assertEqual(self.keep_client.get(blob_locator),
92                          blob_data,
93                          'wrong content from Keep.get(md5(<binarydata>))')
94
95     def test_KeepEmptyCollectionTest(self):
96         blob_locator = self.keep_client.put('', copies=1)
97         self.assertRegex(
98             blob_locator,
99             '^d41d8cd98f00b204e9800998ecf8427e\+0',
100             ('wrong locator from Keep.put(""): ' + blob_locator))
101
102     def test_unicode_must_be_ascii(self):
103         # If unicode type, must only consist of valid ASCII
104         foo_locator = self.keep_client.put(u'foo')
105         self.assertRegex(
106             foo_locator,
107             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
108             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
109
110         if sys.version_info < (3, 0):
111             with self.assertRaises(UnicodeEncodeError):
112                 # Error if it is not ASCII
113                 self.keep_client.put(u'\xe2')
114
115         with self.assertRaises(AttributeError):
116             # Must be bytes or have an encode() method
117             self.keep_client.put({})
118
119     def test_KeepHeadTest(self):
120         locator = self.keep_client.put('test_head')
121         self.assertRegex(
122             locator,
123             '^b9a772c7049325feb7130fff1f8333e9\+9',
124             'wrong md5 hash from Keep.put for "test_head": ' + locator)
125         self.assertEqual(True, self.keep_client.head(locator))
126         self.assertEqual(self.keep_client.get(locator),
127                          b'test_head',
128                          'wrong content from Keep.get for "test_head"')
129
130 class KeepPermissionTestCase(run_test_server.TestCaseWithServers):
131     MAIN_SERVER = {}
132     KEEP_SERVER = {'blob_signing_key': 'abcdefghijk0123456789',
133                    'enforce_permissions': True}
134
135     def test_KeepBasicRWTest(self):
136         run_test_server.authorize_with('active')
137         keep_client = arvados.KeepClient()
138         foo_locator = keep_client.put('foo')
139         self.assertRegex(
140             foo_locator,
141             r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
142             'invalid locator from Keep.put("foo"): ' + foo_locator)
143         self.assertEqual(keep_client.get(foo_locator),
144                          b'foo',
145                          'wrong content from Keep.get(md5("foo"))')
146
147         # GET with an unsigned locator => NotFound
148         bar_locator = keep_client.put('bar')
149         unsigned_bar_locator = "37b51d194a7513e45b56f6524f2d51f2+3"
150         self.assertRegex(
151             bar_locator,
152             r'^37b51d194a7513e45b56f6524f2d51f2\+3\+A[a-f0-9]+@[a-f0-9]+$',
153             'invalid locator from Keep.put("bar"): ' + bar_locator)
154         self.assertRaises(arvados.errors.NotFoundError,
155                           keep_client.get,
156                           unsigned_bar_locator)
157
158         # GET from a different user => NotFound
159         run_test_server.authorize_with('spectator')
160         self.assertRaises(arvados.errors.NotFoundError,
161                           arvados.Keep.get,
162                           bar_locator)
163
164         # Unauthenticated GET for a signed locator => NotFound
165         # Unauthenticated GET for an unsigned locator => NotFound
166         keep_client.api_token = ''
167         self.assertRaises(arvados.errors.NotFoundError,
168                           keep_client.get,
169                           bar_locator)
170         self.assertRaises(arvados.errors.NotFoundError,
171                           keep_client.get,
172                           unsigned_bar_locator)
173
174
175 # KeepOptionalPermission: starts Keep with --permission-key-file
176 # but not --enforce-permissions (i.e. generate signatures on PUT
177 # requests, but do not require them for GET requests)
178 #
179 # All of these requests should succeed when permissions are optional:
180 # * authenticated request, signed locator
181 # * authenticated request, unsigned locator
182 # * unauthenticated request, signed locator
183 # * unauthenticated request, unsigned locator
184 class KeepOptionalPermission(run_test_server.TestCaseWithServers):
185     MAIN_SERVER = {}
186     KEEP_SERVER = {'blob_signing_key': 'abcdefghijk0123456789',
187                    'enforce_permissions': False}
188
189     @classmethod
190     def setUpClass(cls):
191         super(KeepOptionalPermission, cls).setUpClass()
192         run_test_server.authorize_with("admin")
193         cls.api_client = arvados.api('v1')
194
195     def setUp(self):
196         super(KeepOptionalPermission, self).setUp()
197         self.keep_client = arvados.KeepClient(api_client=self.api_client,
198                                               proxy='', local_store='')
199
200     def _put_foo_and_check(self):
201         signed_locator = self.keep_client.put('foo')
202         self.assertRegex(
203             signed_locator,
204             r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
205             'invalid locator from Keep.put("foo"): ' + signed_locator)
206         return signed_locator
207
208     def test_KeepAuthenticatedSignedTest(self):
209         signed_locator = self._put_foo_and_check()
210         self.assertEqual(self.keep_client.get(signed_locator),
211                          b'foo',
212                          'wrong content from Keep.get(md5("foo"))')
213
214     def test_KeepAuthenticatedUnsignedTest(self):
215         signed_locator = self._put_foo_and_check()
216         self.assertEqual(self.keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8"),
217                          b'foo',
218                          'wrong content from Keep.get(md5("foo"))')
219
220     def test_KeepUnauthenticatedSignedTest(self):
221         # Check that signed GET requests work even when permissions
222         # enforcement is off.
223         signed_locator = self._put_foo_and_check()
224         self.keep_client.api_token = ''
225         self.assertEqual(self.keep_client.get(signed_locator),
226                          b'foo',
227                          'wrong content from Keep.get(md5("foo"))')
228
229     def test_KeepUnauthenticatedUnsignedTest(self):
230         # Since --enforce-permissions is not in effect, GET requests
231         # need not be authenticated.
232         signed_locator = self._put_foo_and_check()
233         self.keep_client.api_token = ''
234         self.assertEqual(self.keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8"),
235                          b'foo',
236                          'wrong content from Keep.get(md5("foo"))')
237
238
239 class KeepProxyTestCase(run_test_server.TestCaseWithServers):
240     MAIN_SERVER = {}
241     KEEP_SERVER = {}
242     KEEP_PROXY_SERVER = {}
243
244     @classmethod
245     def setUpClass(cls):
246         super(KeepProxyTestCase, cls).setUpClass()
247         run_test_server.authorize_with('active')
248         cls.api_client = arvados.api('v1')
249
250     def tearDown(self):
251         arvados.config.settings().pop('ARVADOS_EXTERNAL_CLIENT', None)
252         super(KeepProxyTestCase, self).tearDown()
253
254     def test_KeepProxyTest1(self):
255         # Will use ARVADOS_KEEP_SERVICES environment variable that
256         # is set by setUpClass().
257         keep_client = arvados.KeepClient(api_client=self.api_client,
258                                          local_store='')
259         baz_locator = keep_client.put('baz')
260         self.assertRegex(
261             baz_locator,
262             '^73feffa4b7f6bb68e44cf984c85f6e88\+3',
263             'wrong md5 hash from Keep.put("baz"): ' + baz_locator)
264         self.assertEqual(keep_client.get(baz_locator),
265                          b'baz',
266                          'wrong content from Keep.get(md5("baz"))')
267         self.assertTrue(keep_client.using_proxy)
268
269     def test_KeepProxyTest2(self):
270         # Don't instantiate the proxy directly, but set the X-External-Client
271         # header.  The API server should direct us to the proxy.
272         arvados.config.settings()['ARVADOS_EXTERNAL_CLIENT'] = 'true'
273         keep_client = arvados.KeepClient(api_client=self.api_client,
274                                          proxy='', local_store='')
275         baz_locator = keep_client.put('baz2')
276         self.assertRegex(
277             baz_locator,
278             '^91f372a266fe2bf2823cb8ec7fda31ce\+4',
279             'wrong md5 hash from Keep.put("baz2"): ' + baz_locator)
280         self.assertEqual(keep_client.get(baz_locator),
281                          b'baz2',
282                          'wrong content from Keep.get(md5("baz2"))')
283         self.assertTrue(keep_client.using_proxy)
284
285     def test_KeepProxyTestMultipleURIs(self):
286         # Test using ARVADOS_KEEP_SERVICES env var overriding any
287         # existing proxy setting and setting multiple proxies
288         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
289         keep_client = arvados.KeepClient(api_client=self.api_client,
290                                          local_store='')
291         uris = [x['_service_root'] for x in keep_client._keep_services]
292         self.assertEqual(uris, ['http://10.0.0.1/',
293                                 'https://foo.example.org:1234/'])
294
295     def test_KeepProxyTestInvalidURI(self):
296         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org'
297         with self.assertRaises(arvados.errors.ArgumentError):
298             keep_client = arvados.KeepClient(api_client=self.api_client,
299                                              local_store='')
300
301
302 class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
303     def get_service_roots(self, api_client):
304         keep_client = arvados.KeepClient(api_client=api_client)
305         services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
306         return [urllib.parse.urlparse(url) for url in sorted(services)]
307
308     def test_ssl_flag_respected_in_roots(self):
309         for ssl_flag in [False, True]:
310             services = self.get_service_roots(self.mock_keep_services(
311                 service_ssl_flag=ssl_flag))
312             self.assertEqual(
313                 ('https' if ssl_flag else 'http'), services[0].scheme)
314
315     def test_correct_ports_with_ipv6_addresses(self):
316         service = self.get_service_roots(self.mock_keep_services(
317             service_type='proxy', service_host='100::1', service_port=10, count=1))[0]
318         self.assertEqual('100::1', service.hostname)
319         self.assertEqual(10, service.port)
320
321     # test_*_timeout verify that KeepClient instructs pycurl to use
322     # the appropriate connection and read timeouts. They don't care
323     # whether pycurl actually exhibits the expected timeout behavior
324     # -- those tests are in the KeepClientTimeout test class.
325
326     def test_get_timeout(self):
327         api_client = self.mock_keep_services(count=1)
328         force_timeout = socket.timeout("timed out")
329         with tutil.mock_keep_responses(force_timeout, 0) as mock:
330             keep_client = arvados.KeepClient(api_client=api_client)
331             with self.assertRaises(arvados.errors.KeepReadError):
332                 keep_client.get('ffffffffffffffffffffffffffffffff')
333             self.assertEqual(
334                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
335                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
336             self.assertEqual(
337                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
338                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
339             self.assertEqual(
340                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
341                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
342
343     def test_put_timeout(self):
344         api_client = self.mock_keep_services(count=1)
345         force_timeout = socket.timeout("timed out")
346         with tutil.mock_keep_responses(force_timeout, 0) as mock:
347             keep_client = arvados.KeepClient(api_client=api_client)
348             with self.assertRaises(arvados.errors.KeepWriteError):
349                 keep_client.put(b'foo')
350             self.assertEqual(
351                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
352                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
353             self.assertEqual(
354                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
355                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
356             self.assertEqual(
357                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
358                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
359
360     def test_head_timeout(self):
361         api_client = self.mock_keep_services(count=1)
362         force_timeout = socket.timeout("timed out")
363         with tutil.mock_keep_responses(force_timeout, 0) as mock:
364             keep_client = arvados.KeepClient(api_client=api_client)
365             with self.assertRaises(arvados.errors.KeepReadError):
366                 keep_client.head('ffffffffffffffffffffffffffffffff')
367             self.assertEqual(
368                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
369                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
370             self.assertEqual(
371                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
372                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
373             self.assertEqual(
374                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
375                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
376
377     def test_proxy_get_timeout(self):
378         api_client = self.mock_keep_services(service_type='proxy', count=1)
379         force_timeout = socket.timeout("timed out")
380         with tutil.mock_keep_responses(force_timeout, 0) as mock:
381             keep_client = arvados.KeepClient(api_client=api_client)
382             with self.assertRaises(arvados.errors.KeepReadError):
383                 keep_client.get('ffffffffffffffffffffffffffffffff')
384             self.assertEqual(
385                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
386                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
387             self.assertEqual(
388                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
389                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
390             self.assertEqual(
391                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
392                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
393
394     def test_proxy_head_timeout(self):
395         api_client = self.mock_keep_services(service_type='proxy', count=1)
396         force_timeout = socket.timeout("timed out")
397         with tutil.mock_keep_responses(force_timeout, 0) as mock:
398             keep_client = arvados.KeepClient(api_client=api_client)
399             with self.assertRaises(arvados.errors.KeepReadError):
400                 keep_client.head('ffffffffffffffffffffffffffffffff')
401             self.assertEqual(
402                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
403                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
404             self.assertEqual(
405                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
406                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
407             self.assertEqual(
408                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
409                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
410
411     def test_proxy_put_timeout(self):
412         api_client = self.mock_keep_services(service_type='proxy', count=1)
413         force_timeout = socket.timeout("timed out")
414         with tutil.mock_keep_responses(force_timeout, 0) as mock:
415             keep_client = arvados.KeepClient(api_client=api_client)
416             with self.assertRaises(arvados.errors.KeepWriteError):
417                 keep_client.put('foo')
418             self.assertEqual(
419                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
420                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
421             self.assertEqual(
422                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
423                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
424             self.assertEqual(
425                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
426                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
427
428     def check_no_services_error(self, verb, exc_class):
429         api_client = mock.MagicMock(name='api_client')
430         api_client.keep_services().accessible().execute.side_effect = (
431             arvados.errors.ApiError)
432         keep_client = arvados.KeepClient(api_client=api_client)
433         with self.assertRaises(exc_class) as err_check:
434             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
435         self.assertEqual(0, len(err_check.exception.request_errors()))
436
437     def test_get_error_with_no_services(self):
438         self.check_no_services_error('get', arvados.errors.KeepReadError)
439
440     def test_head_error_with_no_services(self):
441         self.check_no_services_error('head', arvados.errors.KeepReadError)
442
443     def test_put_error_with_no_services(self):
444         self.check_no_services_error('put', arvados.errors.KeepWriteError)
445
446     def check_errors_from_last_retry(self, verb, exc_class):
447         api_client = self.mock_keep_services(count=2)
448         req_mock = tutil.mock_keep_responses(
449             "retry error reporting test", 500, 500, 403, 403)
450         with req_mock, tutil.skip_sleep, \
451                 self.assertRaises(exc_class) as err_check:
452             keep_client = arvados.KeepClient(api_client=api_client)
453             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
454                                        num_retries=3)
455         self.assertEqual([403, 403], [
456                 getattr(error, 'status_code', None)
457                 for error in err_check.exception.request_errors().values()])
458
459     def test_get_error_reflects_last_retry(self):
460         self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
461
462     def test_head_error_reflects_last_retry(self):
463         self.check_errors_from_last_retry('head', arvados.errors.KeepReadError)
464
465     def test_put_error_reflects_last_retry(self):
466         self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
467
468     def test_put_error_does_not_include_successful_puts(self):
469         data = 'partial failure test'
470         data_loc = tutil.str_keep_locator(data)
471         api_client = self.mock_keep_services(count=3)
472         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
473                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
474             keep_client = arvados.KeepClient(api_client=api_client)
475             keep_client.put(data)
476         self.assertEqual(2, len(exc_check.exception.request_errors()))
477
478     def test_proxy_put_with_no_writable_services(self):
479         data = 'test with no writable services'
480         data_loc = tutil.str_keep_locator(data)
481         api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
482         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
483                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
484           keep_client = arvados.KeepClient(api_client=api_client)
485           keep_client.put(data)
486         self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
487         self.assertEqual(0, len(exc_check.exception.request_errors()))
488
489     def test_oddball_service_get(self):
490         body = b'oddball service get'
491         api_client = self.mock_keep_services(service_type='fancynewblobstore')
492         with tutil.mock_keep_responses(body, 200):
493             keep_client = arvados.KeepClient(api_client=api_client)
494             actual = keep_client.get(tutil.str_keep_locator(body))
495         self.assertEqual(body, actual)
496
497     def test_oddball_service_put(self):
498         body = b'oddball service put'
499         pdh = tutil.str_keep_locator(body)
500         api_client = self.mock_keep_services(service_type='fancynewblobstore')
501         with tutil.mock_keep_responses(pdh, 200):
502             keep_client = arvados.KeepClient(api_client=api_client)
503             actual = keep_client.put(body, copies=1)
504         self.assertEqual(pdh, actual)
505
506     def test_oddball_service_writer_count(self):
507         body = b'oddball service writer count'
508         pdh = tutil.str_keep_locator(body)
509         api_client = self.mock_keep_services(service_type='fancynewblobstore',
510                                              count=4)
511         headers = {'x-keep-replicas-stored': 3}
512         with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
513                                        **headers) as req_mock:
514             keep_client = arvados.KeepClient(api_client=api_client)
515             actual = keep_client.put(body, copies=2)
516         self.assertEqual(pdh, actual)
517         self.assertEqual(1, req_mock.call_count)
518
519
520 @tutil.skip_sleep
521 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
522
523     def setUp(self):
524         # expected_order[i] is the probe order for
525         # hash=md5(sprintf("%064x",i)) where there are 16 services
526         # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
527         # the first probe for the block consisting of 64 "0"
528         # characters is the service whose uuid is
529         # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
530         self.services = 16
531         self.expected_order = [
532             list('3eab2d5fc9681074'),
533             list('097dba52e648f1c3'),
534             list('c5b4e023f8a7d691'),
535             list('9d81c02e76a3bf54'),
536             ]
537         self.blocks = [
538             "{:064x}".format(x).encode()
539             for x in range(len(self.expected_order))]
540         self.hashes = [
541             hashlib.md5(self.blocks[x]).hexdigest()
542             for x in range(len(self.expected_order))]
543         self.api_client = self.mock_keep_services(count=self.services)
544         self.keep_client = arvados.KeepClient(api_client=self.api_client)
545
546     def test_weighted_service_roots_against_reference_set(self):
547         # Confirm weighted_service_roots() returns the correct order
548         for i, hash in enumerate(self.hashes):
549             roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash))
550             got_order = [
551                 re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
552                 for root in roots]
553             self.assertEqual(self.expected_order[i], got_order)
554
555     def test_get_probe_order_against_reference_set(self):
556         self._test_probe_order_against_reference_set(
557             lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
558
559     def test_head_probe_order_against_reference_set(self):
560         self._test_probe_order_against_reference_set(
561             lambda i: self.keep_client.head(self.hashes[i], num_retries=1))
562
563     def test_put_probe_order_against_reference_set(self):
564         # copies=1 prevents the test from being sensitive to races
565         # between writer threads.
566         self._test_probe_order_against_reference_set(
567             lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1))
568
569     def _test_probe_order_against_reference_set(self, op):
570         for i in range(len(self.blocks)):
571             with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \
572                  self.assertRaises(arvados.errors.KeepRequestError):
573                 op(i)
574             got_order = [
575                 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
576                 for resp in mock.responses]
577             self.assertEqual(self.expected_order[i]*2, got_order)
578
579     def test_put_probe_order_multiple_copies(self):
580         for copies in range(2, 4):
581             for i in range(len(self.blocks)):
582                 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \
583                      self.assertRaises(arvados.errors.KeepWriteError):
584                     self.keep_client.put(self.blocks[i], num_retries=2, copies=copies)
585                 got_order = [
586                     re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
587                     for resp in mock.responses]
588                 # With T threads racing to make requests, the position
589                 # of a given server in the sequence of HTTP requests
590                 # (got_order) cannot be more than T-1 positions
591                 # earlier than that server's position in the reference
592                 # probe sequence (expected_order).
593                 #
594                 # Loop invariant: we have accounted for +pos+ expected
595                 # probes, either by seeing them in +got_order+ or by
596                 # putting them in +pending+ in the hope of seeing them
597                 # later. As long as +len(pending)<T+, we haven't
598                 # started a request too early.
599                 pending = []
600                 for pos, expected in enumerate(self.expected_order[i]*3):
601                     got = got_order[pos-len(pending)]
602                     while got in pending:
603                         del pending[pending.index(got)]
604                         got = got_order[pos-len(pending)]
605                     if got != expected:
606                         pending.append(expected)
607                         self.assertLess(
608                             len(pending), copies,
609                             "pending={}, with copies={}, got {}, expected {}".format(
610                                 pending, copies, repr(got_order), repr(self.expected_order[i]*3)))
611
612     def test_probe_waste_adding_one_server(self):
613         hashes = [
614             hashlib.md5("{:064x}".format(x).encode()).hexdigest() for x in range(100)]
615         initial_services = 12
616         self.api_client = self.mock_keep_services(count=initial_services)
617         self.keep_client = arvados.KeepClient(api_client=self.api_client)
618         probes_before = [
619             self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
620         for added_services in range(1, 12):
621             api_client = self.mock_keep_services(count=initial_services+added_services)
622             keep_client = arvados.KeepClient(api_client=api_client)
623             total_penalty = 0
624             for hash_index in range(len(hashes)):
625                 probe_after = keep_client.weighted_service_roots(
626                     arvados.KeepLocator(hashes[hash_index]))
627                 penalty = probe_after.index(probes_before[hash_index][0])
628                 self.assertLessEqual(penalty, added_services)
629                 total_penalty += penalty
630             # Average penalty per block should not exceed
631             # N(added)/N(orig) by more than 20%, and should get closer
632             # to the ideal as we add data points.
633             expect_penalty = (
634                 added_services *
635                 len(hashes) / initial_services)
636             max_penalty = (
637                 expect_penalty *
638                 (120 - added_services)/100)
639             min_penalty = (
640                 expect_penalty * 8/10)
641             self.assertTrue(
642                 min_penalty <= total_penalty <= max_penalty,
643                 "With {}+{} services, {} blocks, penalty {} but expected {}..{}".format(
644                     initial_services,
645                     added_services,
646                     len(hashes),
647                     total_penalty,
648                     min_penalty,
649                     max_penalty))
650
651     def check_64_zeros_error_order(self, verb, exc_class):
652         data = b'0' * 64
653         if verb == 'get':
654             data = tutil.str_keep_locator(data)
655         # Arbitrary port number:
656         aport = random.randint(1024,65535)
657         api_client = self.mock_keep_services(service_port=aport, count=self.services)
658         keep_client = arvados.KeepClient(api_client=api_client)
659         with mock.patch('pycurl.Curl') as curl_mock, \
660              self.assertRaises(exc_class) as err_check:
661             curl_mock.return_value = tutil.FakeCurl.make(code=500, body=b'')
662             getattr(keep_client, verb)(data)
663         urls = [urllib.parse.urlparse(url)
664                 for url in err_check.exception.request_errors()]
665         self.assertEqual([('keep0x' + c, aport) for c in '3eab2d5fc9681074'],
666                          [(url.hostname, url.port) for url in urls])
667
668     def test_get_error_shows_probe_order(self):
669         self.check_64_zeros_error_order('get', arvados.errors.KeepReadError)
670
671     def test_put_error_shows_probe_order(self):
672         self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
673
674
675 class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase):
676     # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
677     # 1s worth of data and then trigger bandwidth errors before running
678     # out of data.
679     DATA = b'x'*2**11
680     BANDWIDTH_LOW_LIM = 1024
681     TIMEOUT_TIME = 1.0
682
683     class assertTakesBetween(unittest.TestCase):
684         def __init__(self, tmin, tmax):
685             self.tmin = tmin
686             self.tmax = tmax
687
688         def __enter__(self):
689             self.t0 = time.time()
690
691         def __exit__(self, *args, **kwargs):
692             # Round times to milliseconds, like CURL. Otherwise, we
693             # fail when CURL reaches a 1s timeout at 0.9998s.
694             delta = round(time.time() - self.t0, 3)
695             self.assertGreaterEqual(delta, self.tmin)
696             self.assertLessEqual(delta, self.tmax)
697
698     class assertTakesGreater(unittest.TestCase):
699         def __init__(self, tmin):
700             self.tmin = tmin
701
702         def __enter__(self):
703             self.t0 = time.time()
704
705         def __exit__(self, *args, **kwargs):
706             delta = round(time.time() - self.t0, 3)
707             self.assertGreaterEqual(delta, self.tmin)
708
709     def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
710         return arvados.KeepClient(
711             api_client=self.api_client,
712             timeout=timeouts)
713
714     def test_timeout_slow_connect(self):
715         # Can't simulate TCP delays with our own socket. Leave our
716         # stub server running uselessly, and try to connect to an
717         # unroutable IP address instead.
718         self.api_client = self.mock_keep_services(
719             count=1,
720             service_host='240.0.0.0',
721         )
722         with self.assertTakesBetween(0.1, 0.5):
723             with self.assertRaises(arvados.errors.KeepWriteError):
724                 self.keepClient().put(self.DATA, copies=1, num_retries=0)
725
726     def test_low_bandwidth_no_delays_success(self):
727         self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
728         kc = self.keepClient()
729         loc = kc.put(self.DATA, copies=1, num_retries=0)
730         self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
731
732     def test_too_low_bandwidth_no_delays_failure(self):
733         # Check that lessening bandwidth corresponds to failing
734         kc = self.keepClient()
735         loc = kc.put(self.DATA, copies=1, num_retries=0)
736         self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
737         with self.assertTakesGreater(self.TIMEOUT_TIME):
738             with self.assertRaises(arvados.errors.KeepReadError) as e:
739                 kc.get(loc, num_retries=0)
740         with self.assertTakesGreater(self.TIMEOUT_TIME):
741             with self.assertRaises(arvados.errors.KeepWriteError):
742                 kc.put(self.DATA, copies=1, num_retries=0)
743
744     def test_low_bandwidth_with_server_response_delay_failure(self):
745         kc = self.keepClient()
746         loc = kc.put(self.DATA, copies=1, num_retries=0)
747         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
748         self.server.setdelays(response=self.TIMEOUT_TIME)
749         with self.assertTakesGreater(self.TIMEOUT_TIME):
750             with self.assertRaises(arvados.errors.KeepReadError) as e:
751                 kc.get(loc, num_retries=0)
752         with self.assertTakesGreater(self.TIMEOUT_TIME):
753             with self.assertRaises(arvados.errors.KeepWriteError):
754                 kc.put(self.DATA, copies=1, num_retries=0)
755         with self.assertTakesGreater(self.TIMEOUT_TIME):
756             with self.assertRaises(arvados.errors.KeepReadError) as e:
757                 kc.head(loc, num_retries=0)
758
759     def test_low_bandwidth_with_server_mid_delay_failure(self):
760         kc = self.keepClient()
761         loc = kc.put(self.DATA, copies=1, num_retries=0)
762         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
763         self.server.setdelays(mid_write=self.TIMEOUT_TIME, mid_read=self.TIMEOUT_TIME)
764         with self.assertTakesGreater(self.TIMEOUT_TIME):
765             with self.assertRaises(arvados.errors.KeepReadError) as e:
766                 kc.get(loc, num_retries=0)
767         with self.assertTakesGreater(self.TIMEOUT_TIME):
768             with self.assertRaises(arvados.errors.KeepWriteError):
769                 kc.put(self.DATA, copies=1, num_retries=0)
770
771     def test_timeout_slow_request(self):
772         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
773         self.server.setdelays(request=.2)
774         self._test_connect_timeout_under_200ms(loc)
775         self.server.setdelays(request=2)
776         self._test_response_timeout_under_2s(loc)
777
778     def test_timeout_slow_response(self):
779         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
780         self.server.setdelays(response=.2)
781         self._test_connect_timeout_under_200ms(loc)
782         self.server.setdelays(response=2)
783         self._test_response_timeout_under_2s(loc)
784
785     def test_timeout_slow_response_body(self):
786         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
787         self.server.setdelays(response_body=.2)
788         self._test_connect_timeout_under_200ms(loc)
789         self.server.setdelays(response_body=2)
790         self._test_response_timeout_under_2s(loc)
791
792     def _test_connect_timeout_under_200ms(self, loc):
793         # Allow 100ms to connect, then 1s for response. Everything
794         # should work, and everything should take at least 200ms to
795         # return.
796         kc = self.keepClient(timeouts=(.1, 1))
797         with self.assertTakesBetween(.2, .3):
798             kc.put(self.DATA, copies=1, num_retries=0)
799         with self.assertTakesBetween(.2, .3):
800             self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
801
802     def _test_response_timeout_under_2s(self, loc):
803         # Allow 10s to connect, then 1s for response. Nothing should
804         # work, and everything should take at least 1s to return.
805         kc = self.keepClient(timeouts=(10, 1))
806         with self.assertTakesBetween(1, 9):
807             with self.assertRaises(arvados.errors.KeepReadError):
808                 kc.get(loc, num_retries=0)
809         with self.assertTakesBetween(1, 9):
810             with self.assertRaises(arvados.errors.KeepWriteError):
811                 kc.put(self.DATA, copies=1, num_retries=0)
812
813
814 class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
815     def mock_disks_and_gateways(self, disks=3, gateways=1):
816         self.gateways = [{
817                 'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
818                 'owner_uuid': 'zzzzz-tpzed-000000000000000',
819                 'service_host': 'gatewayhost{}'.format(i),
820                 'service_port': 12345,
821                 'service_ssl_flag': True,
822                 'service_type': 'gateway:test',
823         } for i in range(gateways)]
824         self.gateway_roots = [
825             "https://{service_host}:{service_port}/".format(**gw)
826             for gw in self.gateways]
827         self.api_client = self.mock_keep_services(
828             count=disks, additional_services=self.gateways)
829         self.keepClient = arvados.KeepClient(api_client=self.api_client)
830
831     @mock.patch('pycurl.Curl')
832     def test_get_with_gateway_hint_first(self, MockCurl):
833         MockCurl.return_value = tutil.FakeCurl.make(
834             code=200, body='foo', headers={'Content-Length': 3})
835         self.mock_disks_and_gateways()
836         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@' + self.gateways[0]['uuid']
837         self.assertEqual(b'foo', self.keepClient.get(locator))
838         self.assertEqual(self.gateway_roots[0]+locator,
839                          MockCurl.return_value.getopt(pycurl.URL).decode())
840         self.assertEqual(True, self.keepClient.head(locator))
841
842     @mock.patch('pycurl.Curl')
843     def test_get_with_gateway_hints_in_order(self, MockCurl):
844         gateways = 4
845         disks = 3
846         mocks = [
847             tutil.FakeCurl.make(code=404, body='')
848             for _ in range(gateways+disks)
849         ]
850         MockCurl.side_effect = tutil.queue_with(mocks)
851         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
852         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
853                            ['K@'+gw['uuid'] for gw in self.gateways])
854         with self.assertRaises(arvados.errors.NotFoundError):
855             self.keepClient.get(locator)
856         # Gateways are tried first, in the order given.
857         for i, root in enumerate(self.gateway_roots):
858             self.assertEqual(root+locator,
859                              mocks[i].getopt(pycurl.URL).decode())
860         # Disk services are tried next.
861         for i in range(gateways, gateways+disks):
862             self.assertRegex(
863                 mocks[i].getopt(pycurl.URL).decode(),
864                 r'keep0x')
865
866     @mock.patch('pycurl.Curl')
867     def test_head_with_gateway_hints_in_order(self, MockCurl):
868         gateways = 4
869         disks = 3
870         mocks = [
871             tutil.FakeCurl.make(code=404, body=b'')
872             for _ in range(gateways+disks)
873         ]
874         MockCurl.side_effect = tutil.queue_with(mocks)
875         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
876         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
877                            ['K@'+gw['uuid'] for gw in self.gateways])
878         with self.assertRaises(arvados.errors.NotFoundError):
879             self.keepClient.head(locator)
880         # Gateways are tried first, in the order given.
881         for i, root in enumerate(self.gateway_roots):
882             self.assertEqual(root+locator,
883                              mocks[i].getopt(pycurl.URL).decode())
884         # Disk services are tried next.
885         for i in range(gateways, gateways+disks):
886             self.assertRegex(
887                 mocks[i].getopt(pycurl.URL).decode(),
888                 r'keep0x')
889
890     @mock.patch('pycurl.Curl')
891     def test_get_with_remote_proxy_hint(self, MockCurl):
892         MockCurl.return_value = tutil.FakeCurl.make(
893             code=200, body=b'foo', headers={'Content-Length': 3})
894         self.mock_disks_and_gateways()
895         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
896         self.assertEqual(b'foo', self.keepClient.get(locator))
897         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
898                          MockCurl.return_value.getopt(pycurl.URL).decode())
899
900     @mock.patch('pycurl.Curl')
901     def test_head_with_remote_proxy_hint(self, MockCurl):
902         MockCurl.return_value = tutil.FakeCurl.make(
903             code=200, body=b'foo', headers={'Content-Length': 3})
904         self.mock_disks_and_gateways()
905         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
906         self.assertEqual(True, self.keepClient.head(locator))
907         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
908                          MockCurl.return_value.getopt(pycurl.URL).decode())
909
910
911 class KeepClientRetryTestMixin(object):
912     # Testing with a local Keep store won't exercise the retry behavior.
913     # Instead, our strategy is:
914     # * Create a client with one proxy specified (pointed at a black
915     #   hole), so there's no need to instantiate an API client, and
916     #   all HTTP requests come from one place.
917     # * Mock httplib's request method to provide simulated responses.
918     # This lets us test the retry logic extensively without relying on any
919     # supporting servers, and prevents side effects in case something hiccups.
920     # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
921     # run_method().
922     #
923     # Test classes must define TEST_PATCHER to a method that mocks
924     # out appropriate methods in the client.
925
926     PROXY_ADDR = 'http://[%s]:65535/' % (tutil.TEST_HOST,)
927     TEST_DATA = b'testdata'
928     TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
929
930     def setUp(self):
931         self.client_kwargs = {'proxy': self.PROXY_ADDR, 'local_store': ''}
932
933     def new_client(self, **caller_kwargs):
934         kwargs = self.client_kwargs.copy()
935         kwargs.update(caller_kwargs)
936         return arvados.KeepClient(**kwargs)
937
938     def run_method(self, *args, **kwargs):
939         raise NotImplementedError("test subclasses must define run_method")
940
941     def check_success(self, expected=None, *args, **kwargs):
942         if expected is None:
943             expected = self.DEFAULT_EXPECT
944         self.assertEqual(expected, self.run_method(*args, **kwargs))
945
946     def check_exception(self, error_class=None, *args, **kwargs):
947         if error_class is None:
948             error_class = self.DEFAULT_EXCEPTION
949         self.assertRaises(error_class, self.run_method, *args, **kwargs)
950
951     def test_immediate_success(self):
952         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 200):
953             self.check_success()
954
955     def test_retry_then_success(self):
956         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
957             self.check_success(num_retries=3)
958
959     def test_exception_then_success(self):
960         with self.TEST_PATCHER(self.DEFAULT_EXPECT, Exception('mock err'), 200):
961             self.check_success(num_retries=3)
962
963     def test_no_default_retry(self):
964         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
965             self.check_exception()
966
967     def test_no_retry_after_permanent_error(self):
968         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 403, 200):
969             self.check_exception(num_retries=3)
970
971     def test_error_after_retries_exhausted(self):
972         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 500, 200):
973             self.check_exception(num_retries=1)
974
975     def test_num_retries_instance_fallback(self):
976         self.client_kwargs['num_retries'] = 3
977         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
978             self.check_success()
979
980
981 @tutil.skip_sleep
982 class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
983     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
984     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
985     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
986     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
987
988     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
989                    *args, **kwargs):
990         return self.new_client().get(locator, *args, **kwargs)
991
992     def test_specific_exception_when_not_found(self):
993         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
994             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
995
996     def test_general_exception_with_mixed_errors(self):
997         # get should raise a NotFoundError if no server returns the block,
998         # and a high threshold of servers report that it's not found.
999         # This test rigs up 50/50 disagreement between two servers, and
1000         # checks that it does not become a NotFoundError.
1001         client = self.new_client()
1002         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1003             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1004                 client.get(self.HINTED_LOCATOR)
1005             self.assertNotIsInstance(
1006                 exc_check.exception, arvados.errors.NotFoundError,
1007                 "mixed errors raised NotFoundError")
1008
1009     def test_hint_server_can_succeed_without_retries(self):
1010         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1011             self.check_success(locator=self.HINTED_LOCATOR)
1012
1013     def test_try_next_server_after_timeout(self):
1014         with tutil.mock_keep_responses(
1015                 (socket.timeout("timed out"), 200),
1016                 (self.DEFAULT_EXPECT, 200)):
1017             self.check_success(locator=self.HINTED_LOCATOR)
1018
1019     def test_retry_data_with_wrong_checksum(self):
1020         with tutil.mock_keep_responses(
1021                 ('baddata', 200),
1022                 (self.DEFAULT_EXPECT, 200)):
1023             self.check_success(locator=self.HINTED_LOCATOR)
1024
1025 @tutil.skip_sleep
1026 class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1027     DEFAULT_EXPECT = True
1028     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1029     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1030     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1031
1032     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1033                    *args, **kwargs):
1034         return self.new_client().head(locator, *args, **kwargs)
1035
1036     def test_specific_exception_when_not_found(self):
1037         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1038             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1039
1040     def test_general_exception_with_mixed_errors(self):
1041         # head should raise a NotFoundError if no server returns the block,
1042         # and a high threshold of servers report that it's not found.
1043         # This test rigs up 50/50 disagreement between two servers, and
1044         # checks that it does not become a NotFoundError.
1045         client = self.new_client()
1046         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1047             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1048                 client.head(self.HINTED_LOCATOR)
1049             self.assertNotIsInstance(
1050                 exc_check.exception, arvados.errors.NotFoundError,
1051                 "mixed errors raised NotFoundError")
1052
1053     def test_hint_server_can_succeed_without_retries(self):
1054         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1055             self.check_success(locator=self.HINTED_LOCATOR)
1056
1057     def test_try_next_server_after_timeout(self):
1058         with tutil.mock_keep_responses(
1059                 (socket.timeout("timed out"), 200),
1060                 (self.DEFAULT_EXPECT, 200)):
1061             self.check_success(locator=self.HINTED_LOCATOR)
1062
1063 @tutil.skip_sleep
1064 class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1065     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
1066     DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
1067     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1068
1069     def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
1070                    copies=1, *args, **kwargs):
1071         return self.new_client().put(data, copies, *args, **kwargs)
1072
1073     def test_do_not_send_multiple_copies_to_same_server(self):
1074         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200):
1075             self.check_exception(copies=2, num_retries=3)
1076
1077
1078 class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
1079
1080     class FakeKeepService(object):
1081         def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
1082             self.delay = delay
1083             self.will_succeed = will_succeed
1084             self.will_raise = will_raise
1085             self._result = {}
1086             self._result['headers'] = {}
1087             self._result['headers']['x-keep-replicas-stored'] = str(replicas)
1088             self._result['body'] = 'foobar'
1089
1090         def put(self, data_hash, data, timeout):
1091             time.sleep(self.delay)
1092             if self.will_raise is not None:
1093                 raise self.will_raise
1094             return self.will_succeed
1095
1096         def last_result(self):
1097             if self.will_succeed:
1098                 return self._result
1099
1100         def finished(self):
1101             return False
1102     
1103     def setUp(self):
1104         self.copies = 3
1105         self.pool = arvados.KeepClient.KeepWriterThreadPool(
1106             data = 'foo',
1107             data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1108             max_service_replicas = self.copies,
1109             copies = self.copies
1110         )
1111
1112     def test_only_write_enough_on_success(self):
1113         for i in range(10):
1114             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1115             self.pool.add_task(ks, None)
1116         self.pool.join()
1117         self.assertEqual(self.pool.done(), self.copies)
1118
1119     def test_only_write_enough_on_partial_success(self):
1120         for i in range(5):
1121             ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
1122             self.pool.add_task(ks, None)
1123             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1124             self.pool.add_task(ks, None)
1125         self.pool.join()
1126         self.assertEqual(self.pool.done(), self.copies)
1127
1128     def test_only_write_enough_when_some_crash(self):
1129         for i in range(5):
1130             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1131             self.pool.add_task(ks, None)
1132             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1133             self.pool.add_task(ks, None)
1134         self.pool.join()
1135         self.assertEqual(self.pool.done(), self.copies)
1136
1137     def test_fail_when_too_many_crash(self):
1138         for i in range(self.copies+1):
1139             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1140             self.pool.add_task(ks, None)
1141         for i in range(self.copies-1):
1142             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1143             self.pool.add_task(ks, None)
1144         self.pool.join()
1145         self.assertEqual(self.pool.done(), self.copies-1)
1146     
1147
1148 @tutil.skip_sleep
1149 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock):
1150     # Test put()s that need two distinct servers to succeed, possibly
1151     # requiring multiple passes through the retry loop.
1152
1153     def setUp(self):
1154         self.api_client = self.mock_keep_services(count=2)
1155         self.keep_client = arvados.KeepClient(api_client=self.api_client)
1156
1157     def test_success_after_exception(self):
1158         with tutil.mock_keep_responses(
1159                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1160                 Exception('mock err'), 200, 200) as req_mock:
1161             self.keep_client.put('foo', num_retries=1, copies=2)
1162         self.assertEqual(3, req_mock.call_count)
1163
1164     def test_success_after_retryable_error(self):
1165         with tutil.mock_keep_responses(
1166                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1167                 500, 200, 200) as req_mock:
1168             self.keep_client.put('foo', num_retries=1, copies=2)
1169         self.assertEqual(3, req_mock.call_count)
1170
1171     def test_fail_after_final_error(self):
1172         # First retry loop gets a 200 (can't achieve replication by
1173         # storing again on that server) and a 400 (can't retry that
1174         # server at all), so we shouldn't try a third request.
1175         with tutil.mock_keep_responses(
1176                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1177                 200, 400, 200) as req_mock:
1178             with self.assertRaises(arvados.errors.KeepWriteError):
1179                 self.keep_client.put('foo', num_retries=1, copies=2)
1180         self.assertEqual(2, req_mock.call_count)