14259: Move & improve test.
[arvados.git] / sdk / python / tests / test_keep_client.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 standard_library.install_aliases()
9 from builtins import str
10 from builtins import range
11 from builtins import object
12 import hashlib
13 import mock
14 import os
15 import pycurl
16 import random
17 import re
18 import socket
19 import sys
20 import time
21 import unittest
22 import urllib.parse
23
24 import arvados
25 import arvados.retry
26 import arvados.util
27 from . import arvados_testutil as tutil
28 from . import keepstub
29 from . import run_test_server
30
31 class KeepTestCase(run_test_server.TestCaseWithServers):
32     MAIN_SERVER = {}
33     KEEP_SERVER = {}
34
35     @classmethod
36     def setUpClass(cls):
37         super(KeepTestCase, cls).setUpClass()
38         run_test_server.authorize_with("admin")
39         cls.api_client = arvados.api('v1')
40         cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
41                                              proxy='', local_store='')
42
43     def test_KeepBasicRWTest(self):
44         self.assertEqual(0, self.keep_client.upload_counter.get())
45         foo_locator = self.keep_client.put('foo')
46         self.assertRegex(
47             foo_locator,
48             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
49             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
50
51         # 6 bytes because uploaded 2 copies
52         self.assertEqual(6, self.keep_client.upload_counter.get())
53
54         self.assertEqual(0, self.keep_client.download_counter.get())
55         self.assertEqual(self.keep_client.get(foo_locator),
56                          b'foo',
57                          'wrong content from Keep.get(md5("foo"))')
58         self.assertEqual(3, self.keep_client.download_counter.get())
59
60     def test_KeepBinaryRWTest(self):
61         blob_str = b'\xff\xfe\xf7\x00\x01\x02'
62         blob_locator = self.keep_client.put(blob_str)
63         self.assertRegex(
64             blob_locator,
65             '^7fc7c53b45e53926ba52821140fef396\+6',
66             ('wrong locator from Keep.put(<binarydata>):' + blob_locator))
67         self.assertEqual(self.keep_client.get(blob_locator),
68                          blob_str,
69                          'wrong content from Keep.get(md5(<binarydata>))')
70
71     def test_KeepLongBinaryRWTest(self):
72         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
73         for i in range(0,23):
74             blob_data = blob_data + blob_data
75         blob_locator = self.keep_client.put(blob_data)
76         self.assertRegex(
77             blob_locator,
78             '^84d90fc0d8175dd5dcfab04b999bc956\+67108864',
79             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
80         self.assertEqual(self.keep_client.get(blob_locator),
81                          blob_data,
82                          'wrong content from Keep.get(md5(<binarydata>))')
83
84     @unittest.skip("unreliable test - please fix and close #8752")
85     def test_KeepSingleCopyRWTest(self):
86         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
87         blob_locator = self.keep_client.put(blob_data, copies=1)
88         self.assertRegex(
89             blob_locator,
90             '^c902006bc98a3eb4a3663b65ab4a6fab\+8',
91             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
92         self.assertEqual(self.keep_client.get(blob_locator),
93                          blob_data,
94                          'wrong content from Keep.get(md5(<binarydata>))')
95
96     def test_KeepEmptyCollectionTest(self):
97         blob_locator = self.keep_client.put('', copies=1)
98         self.assertRegex(
99             blob_locator,
100             '^d41d8cd98f00b204e9800998ecf8427e\+0',
101             ('wrong locator from Keep.put(""): ' + blob_locator))
102
103     def test_unicode_must_be_ascii(self):
104         # If unicode type, must only consist of valid ASCII
105         foo_locator = self.keep_client.put(u'foo')
106         self.assertRegex(
107             foo_locator,
108             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
109             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
110
111         if sys.version_info < (3, 0):
112             with self.assertRaises(UnicodeEncodeError):
113                 # Error if it is not ASCII
114                 self.keep_client.put(u'\xe2')
115
116         with self.assertRaises(AttributeError):
117             # Must be bytes or have an encode() method
118             self.keep_client.put({})
119
120     def test_KeepHeadTest(self):
121         locator = self.keep_client.put('test_head')
122         self.assertRegex(
123             locator,
124             '^b9a772c7049325feb7130fff1f8333e9\+9',
125             'wrong md5 hash from Keep.put for "test_head": ' + locator)
126         self.assertEqual(True, self.keep_client.head(locator))
127         self.assertEqual(self.keep_client.get(locator),
128                          b'test_head',
129                          'wrong content from Keep.get for "test_head"')
130
131 class KeepPermissionTestCase(run_test_server.TestCaseWithServers):
132     MAIN_SERVER = {}
133     KEEP_SERVER = {'blob_signing_key': 'abcdefghijk0123456789',
134                    'enforce_permissions': True}
135
136     def test_KeepBasicRWTest(self):
137         run_test_server.authorize_with('active')
138         keep_client = arvados.KeepClient()
139         foo_locator = keep_client.put('foo')
140         self.assertRegex(
141             foo_locator,
142             r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
143             'invalid locator from Keep.put("foo"): ' + foo_locator)
144         self.assertEqual(keep_client.get(foo_locator),
145                          b'foo',
146                          'wrong content from Keep.get(md5("foo"))')
147
148         # GET with an unsigned locator => NotFound
149         bar_locator = keep_client.put('bar')
150         unsigned_bar_locator = "37b51d194a7513e45b56f6524f2d51f2+3"
151         self.assertRegex(
152             bar_locator,
153             r'^37b51d194a7513e45b56f6524f2d51f2\+3\+A[a-f0-9]+@[a-f0-9]+$',
154             'invalid locator from Keep.put("bar"): ' + bar_locator)
155         self.assertRaises(arvados.errors.NotFoundError,
156                           keep_client.get,
157                           unsigned_bar_locator)
158
159         # GET from a different user => NotFound
160         run_test_server.authorize_with('spectator')
161         self.assertRaises(arvados.errors.NotFoundError,
162                           arvados.Keep.get,
163                           bar_locator)
164
165         # Unauthenticated GET for a signed locator => NotFound
166         # Unauthenticated GET for an unsigned locator => NotFound
167         keep_client.api_token = ''
168         self.assertRaises(arvados.errors.NotFoundError,
169                           keep_client.get,
170                           bar_locator)
171         self.assertRaises(arvados.errors.NotFoundError,
172                           keep_client.get,
173                           unsigned_bar_locator)
174
175
176 # KeepOptionalPermission: starts Keep with --permission-key-file
177 # but not --enforce-permissions (i.e. generate signatures on PUT
178 # requests, but do not require them for GET requests)
179 #
180 # All of these requests should succeed when permissions are optional:
181 # * authenticated request, signed locator
182 # * authenticated request, unsigned locator
183 # * unauthenticated request, signed locator
184 # * unauthenticated request, unsigned locator
185 class KeepOptionalPermission(run_test_server.TestCaseWithServers):
186     MAIN_SERVER = {}
187     KEEP_SERVER = {'blob_signing_key': 'abcdefghijk0123456789',
188                    'enforce_permissions': False}
189
190     @classmethod
191     def setUpClass(cls):
192         super(KeepOptionalPermission, cls).setUpClass()
193         run_test_server.authorize_with("admin")
194         cls.api_client = arvados.api('v1')
195
196     def setUp(self):
197         super(KeepOptionalPermission, self).setUp()
198         self.keep_client = arvados.KeepClient(api_client=self.api_client,
199                                               proxy='', local_store='')
200
201     def _put_foo_and_check(self):
202         signed_locator = self.keep_client.put('foo')
203         self.assertRegex(
204             signed_locator,
205             r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
206             'invalid locator from Keep.put("foo"): ' + signed_locator)
207         return signed_locator
208
209     def test_KeepAuthenticatedSignedTest(self):
210         signed_locator = self._put_foo_and_check()
211         self.assertEqual(self.keep_client.get(signed_locator),
212                          b'foo',
213                          'wrong content from Keep.get(md5("foo"))')
214
215     def test_KeepAuthenticatedUnsignedTest(self):
216         signed_locator = self._put_foo_and_check()
217         self.assertEqual(self.keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8"),
218                          b'foo',
219                          'wrong content from Keep.get(md5("foo"))')
220
221     def test_KeepUnauthenticatedSignedTest(self):
222         # Check that signed GET requests work even when permissions
223         # enforcement is off.
224         signed_locator = self._put_foo_and_check()
225         self.keep_client.api_token = ''
226         self.assertEqual(self.keep_client.get(signed_locator),
227                          b'foo',
228                          'wrong content from Keep.get(md5("foo"))')
229
230     def test_KeepUnauthenticatedUnsignedTest(self):
231         # Since --enforce-permissions is not in effect, GET requests
232         # need not be authenticated.
233         signed_locator = self._put_foo_and_check()
234         self.keep_client.api_token = ''
235         self.assertEqual(self.keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8"),
236                          b'foo',
237                          'wrong content from Keep.get(md5("foo"))')
238
239
240 class KeepProxyTestCase(run_test_server.TestCaseWithServers):
241     MAIN_SERVER = {}
242     KEEP_SERVER = {}
243     KEEP_PROXY_SERVER = {}
244
245     @classmethod
246     def setUpClass(cls):
247         super(KeepProxyTestCase, cls).setUpClass()
248         run_test_server.authorize_with('active')
249         cls.api_client = arvados.api('v1')
250
251     def tearDown(self):
252         arvados.config.settings().pop('ARVADOS_EXTERNAL_CLIENT', None)
253         super(KeepProxyTestCase, self).tearDown()
254
255     def test_KeepProxyTest1(self):
256         # Will use ARVADOS_KEEP_SERVICES environment variable that
257         # is set by setUpClass().
258         keep_client = arvados.KeepClient(api_client=self.api_client,
259                                          local_store='')
260         baz_locator = keep_client.put('baz')
261         self.assertRegex(
262             baz_locator,
263             '^73feffa4b7f6bb68e44cf984c85f6e88\+3',
264             'wrong md5 hash from Keep.put("baz"): ' + baz_locator)
265         self.assertEqual(keep_client.get(baz_locator),
266                          b'baz',
267                          'wrong content from Keep.get(md5("baz"))')
268         self.assertTrue(keep_client.using_proxy)
269
270     def test_KeepProxyTest2(self):
271         # Don't instantiate the proxy directly, but set the X-External-Client
272         # header.  The API server should direct us to the proxy.
273         arvados.config.settings()['ARVADOS_EXTERNAL_CLIENT'] = 'true'
274         keep_client = arvados.KeepClient(api_client=self.api_client,
275                                          proxy='', local_store='')
276         baz_locator = keep_client.put('baz2')
277         self.assertRegex(
278             baz_locator,
279             '^91f372a266fe2bf2823cb8ec7fda31ce\+4',
280             'wrong md5 hash from Keep.put("baz2"): ' + baz_locator)
281         self.assertEqual(keep_client.get(baz_locator),
282                          b'baz2',
283                          'wrong content from Keep.get(md5("baz2"))')
284         self.assertTrue(keep_client.using_proxy)
285
286     def test_KeepProxyTestMultipleURIs(self):
287         # Test using ARVADOS_KEEP_SERVICES env var overriding any
288         # existing proxy setting and setting multiple proxies
289         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
290         keep_client = arvados.KeepClient(api_client=self.api_client,
291                                          local_store='')
292         uris = [x['_service_root'] for x in keep_client._keep_services]
293         self.assertEqual(uris, ['http://10.0.0.1/',
294                                 'https://foo.example.org:1234/'])
295
296     def test_KeepProxyTestInvalidURI(self):
297         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org'
298         with self.assertRaises(arvados.errors.ArgumentError):
299             keep_client = arvados.KeepClient(api_client=self.api_client,
300                                              local_store='')
301
302
303 class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
304     def get_service_roots(self, api_client):
305         keep_client = arvados.KeepClient(api_client=api_client)
306         services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
307         return [urllib.parse.urlparse(url) for url in sorted(services)]
308
309     def test_ssl_flag_respected_in_roots(self):
310         for ssl_flag in [False, True]:
311             services = self.get_service_roots(self.mock_keep_services(
312                 service_ssl_flag=ssl_flag))
313             self.assertEqual(
314                 ('https' if ssl_flag else 'http'), services[0].scheme)
315
316     def test_correct_ports_with_ipv6_addresses(self):
317         service = self.get_service_roots(self.mock_keep_services(
318             service_type='proxy', service_host='100::1', service_port=10, count=1))[0]
319         self.assertEqual('100::1', service.hostname)
320         self.assertEqual(10, service.port)
321
322     def test_insecure_disables_tls_verify(self):
323         api_client = self.mock_keep_services(count=1)
324         force_timeout = socket.timeout("timed out")
325
326         api_client.insecure = True
327         with tutil.mock_keep_responses(b'foo', 200) as mock:
328             keep_client = arvados.KeepClient(api_client=api_client)
329             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
330             self.assertEqual(
331                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
332                 0)
333
334         api_client.insecure = False
335         with tutil.mock_keep_responses(b'foo', 200) as mock:
336             keep_client = arvados.KeepClient(api_client=api_client)
337             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
338             # getopt()==None here means we didn't change the
339             # default. If we were using real pycurl instead of a mock,
340             # it would return the default value 1.
341             self.assertEqual(
342                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
343                 None)
344
345     def test_refresh_signature(self):
346         blk_digest = '6f5902ac237024bdd0c176cb93063dc4+11'
347         blk_sig = 'da39a3ee5e6b4b0d3255bfef95601890afd80709@53bed294'
348         local_loc = blk_digest+'+A'+blk_sig
349         remote_loc = blk_digest+'+R'+blk_sig
350         api_client = self.mock_keep_services(count=1)
351         headers = {'X-Keep-Locator':local_loc}
352         with tutil.mock_keep_responses('', 200, **headers):
353             # Check that the translated locator gets returned
354             keep_client = arvados.KeepClient(api_client=api_client)
355             self.assertEqual(local_loc, keep_client.refresh_signature(remote_loc))
356             # Check that refresh_signature() uses the correct method and headers
357             keep_client._get_or_head = mock.MagicMock()
358             keep_client.refresh_signature(remote_loc)
359             args, kwargs = keep_client._get_or_head.call_args_list[0]
360             self.assertIn(remote_loc, args)
361             self.assertEqual("HEAD", kwargs['method'])
362             self.assertIn('X-Keep-Signature', kwargs['headers'])
363
364     # test_*_timeout verify that KeepClient instructs pycurl to use
365     # the appropriate connection and read timeouts. They don't care
366     # whether pycurl actually exhibits the expected timeout behavior
367     # -- those tests are in the KeepClientTimeout test class.
368
369     def test_get_timeout(self):
370         api_client = self.mock_keep_services(count=1)
371         force_timeout = socket.timeout("timed out")
372         with tutil.mock_keep_responses(force_timeout, 0) as mock:
373             keep_client = arvados.KeepClient(api_client=api_client)
374             with self.assertRaises(arvados.errors.KeepReadError):
375                 keep_client.get('ffffffffffffffffffffffffffffffff')
376             self.assertEqual(
377                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
378                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
379             self.assertEqual(
380                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
381                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
382             self.assertEqual(
383                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
384                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
385
386     def test_put_timeout(self):
387         api_client = self.mock_keep_services(count=1)
388         force_timeout = socket.timeout("timed out")
389         with tutil.mock_keep_responses(force_timeout, 0) as mock:
390             keep_client = arvados.KeepClient(api_client=api_client)
391             with self.assertRaises(arvados.errors.KeepWriteError):
392                 keep_client.put(b'foo')
393             self.assertEqual(
394                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
395                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
396             self.assertEqual(
397                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
398                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
399             self.assertEqual(
400                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
401                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
402
403     def test_head_timeout(self):
404         api_client = self.mock_keep_services(count=1)
405         force_timeout = socket.timeout("timed out")
406         with tutil.mock_keep_responses(force_timeout, 0) as mock:
407             keep_client = arvados.KeepClient(api_client=api_client)
408             with self.assertRaises(arvados.errors.KeepReadError):
409                 keep_client.head('ffffffffffffffffffffffffffffffff')
410             self.assertEqual(
411                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
412                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
413             self.assertEqual(
414                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
415                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
416             self.assertEqual(
417                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
418                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
419
420     def test_proxy_get_timeout(self):
421         api_client = self.mock_keep_services(service_type='proxy', count=1)
422         force_timeout = socket.timeout("timed out")
423         with tutil.mock_keep_responses(force_timeout, 0) as mock:
424             keep_client = arvados.KeepClient(api_client=api_client)
425             with self.assertRaises(arvados.errors.KeepReadError):
426                 keep_client.get('ffffffffffffffffffffffffffffffff')
427             self.assertEqual(
428                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
429                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
430             self.assertEqual(
431                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
432                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
433             self.assertEqual(
434                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
435                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
436
437     def test_proxy_head_timeout(self):
438         api_client = self.mock_keep_services(service_type='proxy', count=1)
439         force_timeout = socket.timeout("timed out")
440         with tutil.mock_keep_responses(force_timeout, 0) as mock:
441             keep_client = arvados.KeepClient(api_client=api_client)
442             with self.assertRaises(arvados.errors.KeepReadError):
443                 keep_client.head('ffffffffffffffffffffffffffffffff')
444             self.assertEqual(
445                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
446                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
447             self.assertEqual(
448                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
449                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
450             self.assertEqual(
451                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
452                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
453
454     def test_proxy_put_timeout(self):
455         api_client = self.mock_keep_services(service_type='proxy', count=1)
456         force_timeout = socket.timeout("timed out")
457         with tutil.mock_keep_responses(force_timeout, 0) as mock:
458             keep_client = arvados.KeepClient(api_client=api_client)
459             with self.assertRaises(arvados.errors.KeepWriteError):
460                 keep_client.put('foo')
461             self.assertEqual(
462                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
463                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
464             self.assertEqual(
465                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
466                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
467             self.assertEqual(
468                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
469                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
470
471     def check_no_services_error(self, verb, exc_class):
472         api_client = mock.MagicMock(name='api_client')
473         api_client.keep_services().accessible().execute.side_effect = (
474             arvados.errors.ApiError)
475         keep_client = arvados.KeepClient(api_client=api_client)
476         with self.assertRaises(exc_class) as err_check:
477             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
478         self.assertEqual(0, len(err_check.exception.request_errors()))
479
480     def test_get_error_with_no_services(self):
481         self.check_no_services_error('get', arvados.errors.KeepReadError)
482
483     def test_head_error_with_no_services(self):
484         self.check_no_services_error('head', arvados.errors.KeepReadError)
485
486     def test_put_error_with_no_services(self):
487         self.check_no_services_error('put', arvados.errors.KeepWriteError)
488
489     def check_errors_from_last_retry(self, verb, exc_class):
490         api_client = self.mock_keep_services(count=2)
491         req_mock = tutil.mock_keep_responses(
492             "retry error reporting test", 500, 500, 403, 403)
493         with req_mock, tutil.skip_sleep, \
494                 self.assertRaises(exc_class) as err_check:
495             keep_client = arvados.KeepClient(api_client=api_client)
496             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
497                                        num_retries=3)
498         self.assertEqual([403, 403], [
499                 getattr(error, 'status_code', None)
500                 for error in err_check.exception.request_errors().values()])
501
502     def test_get_error_reflects_last_retry(self):
503         self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
504
505     def test_head_error_reflects_last_retry(self):
506         self.check_errors_from_last_retry('head', arvados.errors.KeepReadError)
507
508     def test_put_error_reflects_last_retry(self):
509         self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
510
511     def test_put_error_does_not_include_successful_puts(self):
512         data = 'partial failure test'
513         data_loc = tutil.str_keep_locator(data)
514         api_client = self.mock_keep_services(count=3)
515         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
516                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
517             keep_client = arvados.KeepClient(api_client=api_client)
518             keep_client.put(data)
519         self.assertEqual(2, len(exc_check.exception.request_errors()))
520
521     def test_proxy_put_with_no_writable_services(self):
522         data = 'test with no writable services'
523         data_loc = tutil.str_keep_locator(data)
524         api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
525         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
526                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
527           keep_client = arvados.KeepClient(api_client=api_client)
528           keep_client.put(data)
529         self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
530         self.assertEqual(0, len(exc_check.exception.request_errors()))
531
532     def test_oddball_service_get(self):
533         body = b'oddball service get'
534         api_client = self.mock_keep_services(service_type='fancynewblobstore')
535         with tutil.mock_keep_responses(body, 200):
536             keep_client = arvados.KeepClient(api_client=api_client)
537             actual = keep_client.get(tutil.str_keep_locator(body))
538         self.assertEqual(body, actual)
539
540     def test_oddball_service_put(self):
541         body = b'oddball service put'
542         pdh = tutil.str_keep_locator(body)
543         api_client = self.mock_keep_services(service_type='fancynewblobstore')
544         with tutil.mock_keep_responses(pdh, 200):
545             keep_client = arvados.KeepClient(api_client=api_client)
546             actual = keep_client.put(body, copies=1)
547         self.assertEqual(pdh, actual)
548
549     def test_oddball_service_writer_count(self):
550         body = b'oddball service writer count'
551         pdh = tutil.str_keep_locator(body)
552         api_client = self.mock_keep_services(service_type='fancynewblobstore',
553                                              count=4)
554         headers = {'x-keep-replicas-stored': 3}
555         with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
556                                        **headers) as req_mock:
557             keep_client = arvados.KeepClient(api_client=api_client)
558             actual = keep_client.put(body, copies=2)
559         self.assertEqual(pdh, actual)
560         self.assertEqual(1, req_mock.call_count)
561
562
563 @tutil.skip_sleep
564 class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock):
565     def setUp(self):
566         self.api_client = self.mock_keep_services(count=2)
567         self.keep_client = arvados.KeepClient(api_client=self.api_client)
568         self.data = b'xyzzy'
569         self.locator = '1271ed5ef305aadabc605b1609e24c52'
570         self.test_id = arvados.util.new_request_id()
571         self.assertRegex(self.test_id, r'^req-[a-z0-9]{20}$')
572         # If we don't set request_id to None explicitly here, it will
573         # return <MagicMock name='api_client_mock.request_id'
574         # id='123456789'>:
575         self.api_client.request_id = None
576
577     def test_default_to_api_client_request_id(self):
578         self.api_client.request_id = self.test_id
579         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
580             self.keep_client.put(self.data)
581         self.assertEqual(2, len(mock.responses))
582         for resp in mock.responses:
583             self.assertProvidedRequestId(resp)
584
585         with tutil.mock_keep_responses(self.data, 200) as mock:
586             self.keep_client.get(self.locator)
587         self.assertProvidedRequestId(mock.responses[0])
588
589         with tutil.mock_keep_responses(b'', 200) as mock:
590             self.keep_client.head(self.locator)
591         self.assertProvidedRequestId(mock.responses[0])
592
593     def test_explicit_request_id(self):
594         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
595             self.keep_client.put(self.data, request_id=self.test_id)
596         self.assertEqual(2, len(mock.responses))
597         for resp in mock.responses:
598             self.assertProvidedRequestId(resp)
599
600         with tutil.mock_keep_responses(self.data, 200) as mock:
601             self.keep_client.get(self.locator, request_id=self.test_id)
602         self.assertProvidedRequestId(mock.responses[0])
603
604         with tutil.mock_keep_responses(b'', 200) as mock:
605             self.keep_client.head(self.locator, request_id=self.test_id)
606         self.assertProvidedRequestId(mock.responses[0])
607
608     def test_automatic_request_id(self):
609         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
610             self.keep_client.put(self.data)
611         self.assertEqual(2, len(mock.responses))
612         for resp in mock.responses:
613             self.assertAutomaticRequestId(resp)
614
615         with tutil.mock_keep_responses(self.data, 200) as mock:
616             self.keep_client.get(self.locator)
617         self.assertAutomaticRequestId(mock.responses[0])
618
619         with tutil.mock_keep_responses(b'', 200) as mock:
620             self.keep_client.head(self.locator)
621         self.assertAutomaticRequestId(mock.responses[0])
622
623     def assertAutomaticRequestId(self, resp):
624         hdr = [x for x in resp.getopt(pycurl.HTTPHEADER)
625                if x.startswith('X-Request-Id: ')][0]
626         self.assertNotEqual(hdr, 'X-Request-Id: '+self.test_id)
627         self.assertRegex(hdr, r'^X-Request-Id: req-[a-z0-9]{20}$')
628
629     def assertProvidedRequestId(self, resp):
630         self.assertIn('X-Request-Id: '+self.test_id,
631                       resp.getopt(pycurl.HTTPHEADER))
632
633
634 @tutil.skip_sleep
635 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
636
637     def setUp(self):
638         # expected_order[i] is the probe order for
639         # hash=md5(sprintf("%064x",i)) where there are 16 services
640         # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
641         # the first probe for the block consisting of 64 "0"
642         # characters is the service whose uuid is
643         # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
644         self.services = 16
645         self.expected_order = [
646             list('3eab2d5fc9681074'),
647             list('097dba52e648f1c3'),
648             list('c5b4e023f8a7d691'),
649             list('9d81c02e76a3bf54'),
650             ]
651         self.blocks = [
652             "{:064x}".format(x).encode()
653             for x in range(len(self.expected_order))]
654         self.hashes = [
655             hashlib.md5(self.blocks[x]).hexdigest()
656             for x in range(len(self.expected_order))]
657         self.api_client = self.mock_keep_services(count=self.services)
658         self.keep_client = arvados.KeepClient(api_client=self.api_client)
659
660     def test_weighted_service_roots_against_reference_set(self):
661         # Confirm weighted_service_roots() returns the correct order
662         for i, hash in enumerate(self.hashes):
663             roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash))
664             got_order = [
665                 re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
666                 for root in roots]
667             self.assertEqual(self.expected_order[i], got_order)
668
669     def test_get_probe_order_against_reference_set(self):
670         self._test_probe_order_against_reference_set(
671             lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
672
673     def test_head_probe_order_against_reference_set(self):
674         self._test_probe_order_against_reference_set(
675             lambda i: self.keep_client.head(self.hashes[i], num_retries=1))
676
677     def test_put_probe_order_against_reference_set(self):
678         # copies=1 prevents the test from being sensitive to races
679         # between writer threads.
680         self._test_probe_order_against_reference_set(
681             lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1))
682
683     def _test_probe_order_against_reference_set(self, op):
684         for i in range(len(self.blocks)):
685             with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \
686                  self.assertRaises(arvados.errors.KeepRequestError):
687                 op(i)
688             got_order = [
689                 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
690                 for resp in mock.responses]
691             self.assertEqual(self.expected_order[i]*2, got_order)
692
693     def test_put_probe_order_multiple_copies(self):
694         for copies in range(2, 4):
695             for i in range(len(self.blocks)):
696                 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \
697                      self.assertRaises(arvados.errors.KeepWriteError):
698                     self.keep_client.put(self.blocks[i], num_retries=2, copies=copies)
699                 got_order = [
700                     re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
701                     for resp in mock.responses]
702                 # With T threads racing to make requests, the position
703                 # of a given server in the sequence of HTTP requests
704                 # (got_order) cannot be more than T-1 positions
705                 # earlier than that server's position in the reference
706                 # probe sequence (expected_order).
707                 #
708                 # Loop invariant: we have accounted for +pos+ expected
709                 # probes, either by seeing them in +got_order+ or by
710                 # putting them in +pending+ in the hope of seeing them
711                 # later. As long as +len(pending)<T+, we haven't
712                 # started a request too early.
713                 pending = []
714                 for pos, expected in enumerate(self.expected_order[i]*3):
715                     got = got_order[pos-len(pending)]
716                     while got in pending:
717                         del pending[pending.index(got)]
718                         got = got_order[pos-len(pending)]
719                     if got != expected:
720                         pending.append(expected)
721                         self.assertLess(
722                             len(pending), copies,
723                             "pending={}, with copies={}, got {}, expected {}".format(
724                                 pending, copies, repr(got_order), repr(self.expected_order[i]*3)))
725
726     def test_probe_waste_adding_one_server(self):
727         hashes = [
728             hashlib.md5("{:064x}".format(x).encode()).hexdigest() for x in range(100)]
729         initial_services = 12
730         self.api_client = self.mock_keep_services(count=initial_services)
731         self.keep_client = arvados.KeepClient(api_client=self.api_client)
732         probes_before = [
733             self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
734         for added_services in range(1, 12):
735             api_client = self.mock_keep_services(count=initial_services+added_services)
736             keep_client = arvados.KeepClient(api_client=api_client)
737             total_penalty = 0
738             for hash_index in range(len(hashes)):
739                 probe_after = keep_client.weighted_service_roots(
740                     arvados.KeepLocator(hashes[hash_index]))
741                 penalty = probe_after.index(probes_before[hash_index][0])
742                 self.assertLessEqual(penalty, added_services)
743                 total_penalty += penalty
744             # Average penalty per block should not exceed
745             # N(added)/N(orig) by more than 20%, and should get closer
746             # to the ideal as we add data points.
747             expect_penalty = (
748                 added_services *
749                 len(hashes) / initial_services)
750             max_penalty = (
751                 expect_penalty *
752                 (120 - added_services)/100)
753             min_penalty = (
754                 expect_penalty * 8/10)
755             self.assertTrue(
756                 min_penalty <= total_penalty <= max_penalty,
757                 "With {}+{} services, {} blocks, penalty {} but expected {}..{}".format(
758                     initial_services,
759                     added_services,
760                     len(hashes),
761                     total_penalty,
762                     min_penalty,
763                     max_penalty))
764
765     def check_64_zeros_error_order(self, verb, exc_class):
766         data = b'0' * 64
767         if verb == 'get':
768             data = tutil.str_keep_locator(data)
769         # Arbitrary port number:
770         aport = random.randint(1024,65535)
771         api_client = self.mock_keep_services(service_port=aport, count=self.services)
772         keep_client = arvados.KeepClient(api_client=api_client)
773         with mock.patch('pycurl.Curl') as curl_mock, \
774              self.assertRaises(exc_class) as err_check:
775             curl_mock.return_value = tutil.FakeCurl.make(code=500, body=b'')
776             getattr(keep_client, verb)(data)
777         urls = [urllib.parse.urlparse(url)
778                 for url in err_check.exception.request_errors()]
779         self.assertEqual([('keep0x' + c, aport) for c in '3eab2d5fc9681074'],
780                          [(url.hostname, url.port) for url in urls])
781
782     def test_get_error_shows_probe_order(self):
783         self.check_64_zeros_error_order('get', arvados.errors.KeepReadError)
784
785     def test_put_error_shows_probe_order(self):
786         self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
787
788
789 class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase):
790     # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
791     # 1s worth of data and then trigger bandwidth errors before running
792     # out of data.
793     DATA = b'x'*2**11
794     BANDWIDTH_LOW_LIM = 1024
795     TIMEOUT_TIME = 1.0
796
797     class assertTakesBetween(unittest.TestCase):
798         def __init__(self, tmin, tmax):
799             self.tmin = tmin
800             self.tmax = tmax
801
802         def __enter__(self):
803             self.t0 = time.time()
804
805         def __exit__(self, *args, **kwargs):
806             # Round times to milliseconds, like CURL. Otherwise, we
807             # fail when CURL reaches a 1s timeout at 0.9998s.
808             delta = round(time.time() - self.t0, 3)
809             self.assertGreaterEqual(delta, self.tmin)
810             self.assertLessEqual(delta, self.tmax)
811
812     class assertTakesGreater(unittest.TestCase):
813         def __init__(self, tmin):
814             self.tmin = tmin
815
816         def __enter__(self):
817             self.t0 = time.time()
818
819         def __exit__(self, *args, **kwargs):
820             delta = round(time.time() - self.t0, 3)
821             self.assertGreaterEqual(delta, self.tmin)
822
823     def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
824         return arvados.KeepClient(
825             api_client=self.api_client,
826             timeout=timeouts)
827
828     def test_timeout_slow_connect(self):
829         # Can't simulate TCP delays with our own socket. Leave our
830         # stub server running uselessly, and try to connect to an
831         # unroutable IP address instead.
832         self.api_client = self.mock_keep_services(
833             count=1,
834             service_host='240.0.0.0',
835         )
836         with self.assertTakesBetween(0.1, 0.5):
837             with self.assertRaises(arvados.errors.KeepWriteError):
838                 self.keepClient().put(self.DATA, copies=1, num_retries=0)
839
840     def test_low_bandwidth_no_delays_success(self):
841         self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
842         kc = self.keepClient()
843         loc = kc.put(self.DATA, copies=1, num_retries=0)
844         self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
845
846     def test_too_low_bandwidth_no_delays_failure(self):
847         # Check that lessening bandwidth corresponds to failing
848         kc = self.keepClient()
849         loc = kc.put(self.DATA, copies=1, num_retries=0)
850         self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
851         with self.assertTakesGreater(self.TIMEOUT_TIME):
852             with self.assertRaises(arvados.errors.KeepReadError) as e:
853                 kc.get(loc, num_retries=0)
854         with self.assertTakesGreater(self.TIMEOUT_TIME):
855             with self.assertRaises(arvados.errors.KeepWriteError):
856                 kc.put(self.DATA, copies=1, num_retries=0)
857
858     def test_low_bandwidth_with_server_response_delay_failure(self):
859         kc = self.keepClient()
860         loc = kc.put(self.DATA, copies=1, num_retries=0)
861         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
862         self.server.setdelays(response=self.TIMEOUT_TIME)
863         with self.assertTakesGreater(self.TIMEOUT_TIME):
864             with self.assertRaises(arvados.errors.KeepReadError) as e:
865                 kc.get(loc, num_retries=0)
866         with self.assertTakesGreater(self.TIMEOUT_TIME):
867             with self.assertRaises(arvados.errors.KeepWriteError):
868                 kc.put(self.DATA, copies=1, num_retries=0)
869         with self.assertTakesGreater(self.TIMEOUT_TIME):
870             with self.assertRaises(arvados.errors.KeepReadError) as e:
871                 kc.head(loc, num_retries=0)
872
873     def test_low_bandwidth_with_server_mid_delay_failure(self):
874         kc = self.keepClient()
875         loc = kc.put(self.DATA, copies=1, num_retries=0)
876         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
877         self.server.setdelays(mid_write=self.TIMEOUT_TIME, mid_read=self.TIMEOUT_TIME)
878         with self.assertTakesGreater(self.TIMEOUT_TIME):
879             with self.assertRaises(arvados.errors.KeepReadError) as e:
880                 kc.get(loc, num_retries=0)
881         with self.assertTakesGreater(self.TIMEOUT_TIME):
882             with self.assertRaises(arvados.errors.KeepWriteError):
883                 kc.put(self.DATA, copies=1, num_retries=0)
884
885     def test_timeout_slow_request(self):
886         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
887         self.server.setdelays(request=.2)
888         self._test_connect_timeout_under_200ms(loc)
889         self.server.setdelays(request=2)
890         self._test_response_timeout_under_2s(loc)
891
892     def test_timeout_slow_response(self):
893         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
894         self.server.setdelays(response=.2)
895         self._test_connect_timeout_under_200ms(loc)
896         self.server.setdelays(response=2)
897         self._test_response_timeout_under_2s(loc)
898
899     def test_timeout_slow_response_body(self):
900         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
901         self.server.setdelays(response_body=.2)
902         self._test_connect_timeout_under_200ms(loc)
903         self.server.setdelays(response_body=2)
904         self._test_response_timeout_under_2s(loc)
905
906     def _test_connect_timeout_under_200ms(self, loc):
907         # Allow 100ms to connect, then 1s for response. Everything
908         # should work, and everything should take at least 200ms to
909         # return.
910         kc = self.keepClient(timeouts=(.1, 1))
911         with self.assertTakesBetween(.2, .3):
912             kc.put(self.DATA, copies=1, num_retries=0)
913         with self.assertTakesBetween(.2, .3):
914             self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
915
916     def _test_response_timeout_under_2s(self, loc):
917         # Allow 10s to connect, then 1s for response. Nothing should
918         # work, and everything should take at least 1s to return.
919         kc = self.keepClient(timeouts=(10, 1))
920         with self.assertTakesBetween(1, 9):
921             with self.assertRaises(arvados.errors.KeepReadError):
922                 kc.get(loc, num_retries=0)
923         with self.assertTakesBetween(1, 9):
924             with self.assertRaises(arvados.errors.KeepWriteError):
925                 kc.put(self.DATA, copies=1, num_retries=0)
926
927
928 class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
929     def mock_disks_and_gateways(self, disks=3, gateways=1):
930         self.gateways = [{
931                 'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
932                 'owner_uuid': 'zzzzz-tpzed-000000000000000',
933                 'service_host': 'gatewayhost{}'.format(i),
934                 'service_port': 12345,
935                 'service_ssl_flag': True,
936                 'service_type': 'gateway:test',
937         } for i in range(gateways)]
938         self.gateway_roots = [
939             "https://{service_host}:{service_port}/".format(**gw)
940             for gw in self.gateways]
941         self.api_client = self.mock_keep_services(
942             count=disks, additional_services=self.gateways)
943         self.keepClient = arvados.KeepClient(api_client=self.api_client)
944
945     @mock.patch('pycurl.Curl')
946     def test_get_with_gateway_hint_first(self, MockCurl):
947         MockCurl.return_value = tutil.FakeCurl.make(
948             code=200, body='foo', headers={'Content-Length': 3})
949         self.mock_disks_and_gateways()
950         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@' + self.gateways[0]['uuid']
951         self.assertEqual(b'foo', self.keepClient.get(locator))
952         self.assertEqual(self.gateway_roots[0]+locator,
953                          MockCurl.return_value.getopt(pycurl.URL).decode())
954         self.assertEqual(True, self.keepClient.head(locator))
955
956     @mock.patch('pycurl.Curl')
957     def test_get_with_gateway_hints_in_order(self, MockCurl):
958         gateways = 4
959         disks = 3
960         mocks = [
961             tutil.FakeCurl.make(code=404, body='')
962             for _ in range(gateways+disks)
963         ]
964         MockCurl.side_effect = tutil.queue_with(mocks)
965         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
966         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
967                            ['K@'+gw['uuid'] for gw in self.gateways])
968         with self.assertRaises(arvados.errors.NotFoundError):
969             self.keepClient.get(locator)
970         # Gateways are tried first, in the order given.
971         for i, root in enumerate(self.gateway_roots):
972             self.assertEqual(root+locator,
973                              mocks[i].getopt(pycurl.URL).decode())
974         # Disk services are tried next.
975         for i in range(gateways, gateways+disks):
976             self.assertRegex(
977                 mocks[i].getopt(pycurl.URL).decode(),
978                 r'keep0x')
979
980     @mock.patch('pycurl.Curl')
981     def test_head_with_gateway_hints_in_order(self, MockCurl):
982         gateways = 4
983         disks = 3
984         mocks = [
985             tutil.FakeCurl.make(code=404, body=b'')
986             for _ in range(gateways+disks)
987         ]
988         MockCurl.side_effect = tutil.queue_with(mocks)
989         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
990         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
991                            ['K@'+gw['uuid'] for gw in self.gateways])
992         with self.assertRaises(arvados.errors.NotFoundError):
993             self.keepClient.head(locator)
994         # Gateways are tried first, in the order given.
995         for i, root in enumerate(self.gateway_roots):
996             self.assertEqual(root+locator,
997                              mocks[i].getopt(pycurl.URL).decode())
998         # Disk services are tried next.
999         for i in range(gateways, gateways+disks):
1000             self.assertRegex(
1001                 mocks[i].getopt(pycurl.URL).decode(),
1002                 r'keep0x')
1003
1004     @mock.patch('pycurl.Curl')
1005     def test_get_with_remote_proxy_hint(self, MockCurl):
1006         MockCurl.return_value = tutil.FakeCurl.make(
1007             code=200, body=b'foo', headers={'Content-Length': 3})
1008         self.mock_disks_and_gateways()
1009         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1010         self.assertEqual(b'foo', self.keepClient.get(locator))
1011         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1012                          MockCurl.return_value.getopt(pycurl.URL).decode())
1013
1014     @mock.patch('pycurl.Curl')
1015     def test_head_with_remote_proxy_hint(self, MockCurl):
1016         MockCurl.return_value = tutil.FakeCurl.make(
1017             code=200, body=b'foo', headers={'Content-Length': 3})
1018         self.mock_disks_and_gateways()
1019         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1020         self.assertEqual(True, self.keepClient.head(locator))
1021         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1022                          MockCurl.return_value.getopt(pycurl.URL).decode())
1023
1024
1025 class KeepClientRetryTestMixin(object):
1026     # Testing with a local Keep store won't exercise the retry behavior.
1027     # Instead, our strategy is:
1028     # * Create a client with one proxy specified (pointed at a black
1029     #   hole), so there's no need to instantiate an API client, and
1030     #   all HTTP requests come from one place.
1031     # * Mock httplib's request method to provide simulated responses.
1032     # This lets us test the retry logic extensively without relying on any
1033     # supporting servers, and prevents side effects in case something hiccups.
1034     # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
1035     # run_method().
1036     #
1037     # Test classes must define TEST_PATCHER to a method that mocks
1038     # out appropriate methods in the client.
1039
1040     PROXY_ADDR = 'http://[%s]:65535/' % (tutil.TEST_HOST,)
1041     TEST_DATA = b'testdata'
1042     TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
1043
1044     def setUp(self):
1045         self.client_kwargs = {'proxy': self.PROXY_ADDR, 'local_store': ''}
1046
1047     def new_client(self, **caller_kwargs):
1048         kwargs = self.client_kwargs.copy()
1049         kwargs.update(caller_kwargs)
1050         return arvados.KeepClient(**kwargs)
1051
1052     def run_method(self, *args, **kwargs):
1053         raise NotImplementedError("test subclasses must define run_method")
1054
1055     def check_success(self, expected=None, *args, **kwargs):
1056         if expected is None:
1057             expected = self.DEFAULT_EXPECT
1058         self.assertEqual(expected, self.run_method(*args, **kwargs))
1059
1060     def check_exception(self, error_class=None, *args, **kwargs):
1061         if error_class is None:
1062             error_class = self.DEFAULT_EXCEPTION
1063         self.assertRaises(error_class, self.run_method, *args, **kwargs)
1064
1065     def test_immediate_success(self):
1066         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 200):
1067             self.check_success()
1068
1069     def test_retry_then_success(self):
1070         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1071             self.check_success(num_retries=3)
1072
1073     def test_exception_then_success(self):
1074         with self.TEST_PATCHER(self.DEFAULT_EXPECT, Exception('mock err'), 200):
1075             self.check_success(num_retries=3)
1076
1077     def test_no_default_retry(self):
1078         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1079             self.check_exception()
1080
1081     def test_no_retry_after_permanent_error(self):
1082         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 403, 200):
1083             self.check_exception(num_retries=3)
1084
1085     def test_error_after_retries_exhausted(self):
1086         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 500, 200):
1087             self.check_exception(num_retries=1)
1088
1089     def test_num_retries_instance_fallback(self):
1090         self.client_kwargs['num_retries'] = 3
1091         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1092             self.check_success()
1093
1094
1095 @tutil.skip_sleep
1096 class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1097     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
1098     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1099     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1100     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1101
1102     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1103                    *args, **kwargs):
1104         return self.new_client().get(locator, *args, **kwargs)
1105
1106     def test_specific_exception_when_not_found(self):
1107         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1108             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1109
1110     def test_general_exception_with_mixed_errors(self):
1111         # get should raise a NotFoundError if no server returns the block,
1112         # and a high threshold of servers report that it's not found.
1113         # This test rigs up 50/50 disagreement between two servers, and
1114         # checks that it does not become a NotFoundError.
1115         client = self.new_client()
1116         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1117             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1118                 client.get(self.HINTED_LOCATOR)
1119             self.assertNotIsInstance(
1120                 exc_check.exception, arvados.errors.NotFoundError,
1121                 "mixed errors raised NotFoundError")
1122
1123     def test_hint_server_can_succeed_without_retries(self):
1124         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1125             self.check_success(locator=self.HINTED_LOCATOR)
1126
1127     def test_try_next_server_after_timeout(self):
1128         with tutil.mock_keep_responses(
1129                 (socket.timeout("timed out"), 200),
1130                 (self.DEFAULT_EXPECT, 200)):
1131             self.check_success(locator=self.HINTED_LOCATOR)
1132
1133     def test_retry_data_with_wrong_checksum(self):
1134         with tutil.mock_keep_responses(
1135                 ('baddata', 200),
1136                 (self.DEFAULT_EXPECT, 200)):
1137             self.check_success(locator=self.HINTED_LOCATOR)
1138
1139 @tutil.skip_sleep
1140 class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1141     DEFAULT_EXPECT = True
1142     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1143     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1144     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1145
1146     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1147                    *args, **kwargs):
1148         return self.new_client().head(locator, *args, **kwargs)
1149
1150     def test_specific_exception_when_not_found(self):
1151         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1152             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1153
1154     def test_general_exception_with_mixed_errors(self):
1155         # head should raise a NotFoundError if no server returns the block,
1156         # and a high threshold of servers report that it's not found.
1157         # This test rigs up 50/50 disagreement between two servers, and
1158         # checks that it does not become a NotFoundError.
1159         client = self.new_client()
1160         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1161             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1162                 client.head(self.HINTED_LOCATOR)
1163             self.assertNotIsInstance(
1164                 exc_check.exception, arvados.errors.NotFoundError,
1165                 "mixed errors raised NotFoundError")
1166
1167     def test_hint_server_can_succeed_without_retries(self):
1168         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1169             self.check_success(locator=self.HINTED_LOCATOR)
1170
1171     def test_try_next_server_after_timeout(self):
1172         with tutil.mock_keep_responses(
1173                 (socket.timeout("timed out"), 200),
1174                 (self.DEFAULT_EXPECT, 200)):
1175             self.check_success(locator=self.HINTED_LOCATOR)
1176
1177 @tutil.skip_sleep
1178 class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1179     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
1180     DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
1181     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1182
1183     def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
1184                    copies=1, *args, **kwargs):
1185         return self.new_client().put(data, copies, *args, **kwargs)
1186
1187     def test_do_not_send_multiple_copies_to_same_server(self):
1188         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200):
1189             self.check_exception(copies=2, num_retries=3)
1190
1191
1192 class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
1193
1194     class FakeKeepService(object):
1195         def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
1196             self.delay = delay
1197             self.will_succeed = will_succeed
1198             self.will_raise = will_raise
1199             self._result = {}
1200             self._result['headers'] = {}
1201             self._result['headers']['x-keep-replicas-stored'] = str(replicas)
1202             self._result['body'] = 'foobar'
1203
1204         def put(self, data_hash, data, timeout):
1205             time.sleep(self.delay)
1206             if self.will_raise is not None:
1207                 raise self.will_raise
1208             return self.will_succeed
1209
1210         def last_result(self):
1211             if self.will_succeed:
1212                 return self._result
1213
1214         def finished(self):
1215             return False
1216
1217     def setUp(self):
1218         self.copies = 3
1219         self.pool = arvados.KeepClient.KeepWriterThreadPool(
1220             data = 'foo',
1221             data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1222             max_service_replicas = self.copies,
1223             copies = self.copies
1224         )
1225
1226     def test_only_write_enough_on_success(self):
1227         for i in range(10):
1228             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1229             self.pool.add_task(ks, None)
1230         self.pool.join()
1231         self.assertEqual(self.pool.done(), self.copies)
1232
1233     def test_only_write_enough_on_partial_success(self):
1234         for i in range(5):
1235             ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
1236             self.pool.add_task(ks, None)
1237             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1238             self.pool.add_task(ks, None)
1239         self.pool.join()
1240         self.assertEqual(self.pool.done(), self.copies)
1241
1242     def test_only_write_enough_when_some_crash(self):
1243         for i in range(5):
1244             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1245             self.pool.add_task(ks, None)
1246             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1247             self.pool.add_task(ks, None)
1248         self.pool.join()
1249         self.assertEqual(self.pool.done(), self.copies)
1250
1251     def test_fail_when_too_many_crash(self):
1252         for i in range(self.copies+1):
1253             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1254             self.pool.add_task(ks, None)
1255         for i in range(self.copies-1):
1256             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1257             self.pool.add_task(ks, None)
1258         self.pool.join()
1259         self.assertEqual(self.pool.done(), self.copies-1)
1260
1261
1262 @tutil.skip_sleep
1263 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock):
1264     # Test put()s that need two distinct servers to succeed, possibly
1265     # requiring multiple passes through the retry loop.
1266
1267     def setUp(self):
1268         self.api_client = self.mock_keep_services(count=2)
1269         self.keep_client = arvados.KeepClient(api_client=self.api_client)
1270
1271     def test_success_after_exception(self):
1272         with tutil.mock_keep_responses(
1273                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1274                 Exception('mock err'), 200, 200) as req_mock:
1275             self.keep_client.put('foo', num_retries=1, copies=2)
1276         self.assertEqual(3, req_mock.call_count)
1277
1278     def test_success_after_retryable_error(self):
1279         with tutil.mock_keep_responses(
1280                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1281                 500, 200, 200) as req_mock:
1282             self.keep_client.put('foo', num_retries=1, copies=2)
1283         self.assertEqual(3, req_mock.call_count)
1284
1285     def test_fail_after_final_error(self):
1286         # First retry loop gets a 200 (can't achieve replication by
1287         # storing again on that server) and a 400 (can't retry that
1288         # server at all), so we shouldn't try a third request.
1289         with tutil.mock_keep_responses(
1290                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1291                 200, 400, 200) as req_mock:
1292             with self.assertRaises(arvados.errors.KeepWriteError):
1293                 self.keep_client.put('foo', num_retries=1, copies=2)
1294         self.assertEqual(2, req_mock.call_count)
1295
1296 class KeepClientAPIErrorTest(unittest.TestCase):
1297     def test_api_fail(self):
1298         class ApiMock(object):
1299             def __getattr__(self, r):
1300                 if r == "api_token":
1301                     return "abc"
1302                 elif r == "insecure":
1303                     return False
1304                 else:
1305                     raise arvados.errors.KeepReadError()
1306         keep_client = arvados.KeepClient(api_client=ApiMock(),
1307                                              proxy='', local_store='')
1308
1309         # The bug this is testing for is that if an API (not
1310         # keepstore) exception is thrown as part of a get(), the next
1311         # attempt to get that same block will result in a deadlock.
1312         # This is why there are two get()s in a row.  Unfortunately,
1313         # the failure mode for this test is that the test suite
1314         # deadlocks, there isn't a good way to avoid that without
1315         # adding a special case that has no use except for this test.
1316
1317         with self.assertRaises(arvados.errors.KeepReadError):
1318             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1319         with self.assertRaises(arvados.errors.KeepReadError):
1320             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")