Merge branch '15680-arv-put-retry'
[arvados.git] / sdk / python / tests / test_keep_client.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 standard_library.install_aliases()
9 from builtins import str
10 from builtins import range
11 from builtins import object
12 import hashlib
13 import mock
14 import os
15 import pycurl
16 import random
17 import re
18 import socket
19 import sys
20 import time
21 import unittest
22 import urllib.parse
23
24 import arvados
25 import arvados.retry
26 import arvados.util
27 from . import arvados_testutil as tutil
28 from . import keepstub
29 from . import run_test_server
30
31 class KeepTestCase(run_test_server.TestCaseWithServers):
32     MAIN_SERVER = {}
33     KEEP_SERVER = {}
34
35     @classmethod
36     def setUpClass(cls):
37         super(KeepTestCase, cls).setUpClass()
38         run_test_server.authorize_with("admin")
39         cls.api_client = arvados.api('v1')
40         cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
41                                              proxy='', local_store='')
42
43     def test_KeepBasicRWTest(self):
44         self.assertEqual(0, self.keep_client.upload_counter.get())
45         foo_locator = self.keep_client.put('foo')
46         self.assertRegex(
47             foo_locator,
48             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
49             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
50
51         # 6 bytes because uploaded 2 copies
52         self.assertEqual(6, self.keep_client.upload_counter.get())
53
54         self.assertEqual(0, self.keep_client.download_counter.get())
55         self.assertEqual(self.keep_client.get(foo_locator),
56                          b'foo',
57                          'wrong content from Keep.get(md5("foo"))')
58         self.assertEqual(3, self.keep_client.download_counter.get())
59
60     def test_KeepBinaryRWTest(self):
61         blob_str = b'\xff\xfe\xf7\x00\x01\x02'
62         blob_locator = self.keep_client.put(blob_str)
63         self.assertRegex(
64             blob_locator,
65             '^7fc7c53b45e53926ba52821140fef396\+6',
66             ('wrong locator from Keep.put(<binarydata>):' + blob_locator))
67         self.assertEqual(self.keep_client.get(blob_locator),
68                          blob_str,
69                          'wrong content from Keep.get(md5(<binarydata>))')
70
71     def test_KeepLongBinaryRWTest(self):
72         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
73         for i in range(0,23):
74             blob_data = blob_data + blob_data
75         blob_locator = self.keep_client.put(blob_data)
76         self.assertRegex(
77             blob_locator,
78             '^84d90fc0d8175dd5dcfab04b999bc956\+67108864',
79             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
80         self.assertEqual(self.keep_client.get(blob_locator),
81                          blob_data,
82                          'wrong content from Keep.get(md5(<binarydata>))')
83
84     @unittest.skip("unreliable test - please fix and close #8752")
85     def test_KeepSingleCopyRWTest(self):
86         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
87         blob_locator = self.keep_client.put(blob_data, copies=1)
88         self.assertRegex(
89             blob_locator,
90             '^c902006bc98a3eb4a3663b65ab4a6fab\+8',
91             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
92         self.assertEqual(self.keep_client.get(blob_locator),
93                          blob_data,
94                          'wrong content from Keep.get(md5(<binarydata>))')
95
96     def test_KeepEmptyCollectionTest(self):
97         blob_locator = self.keep_client.put('', copies=1)
98         self.assertRegex(
99             blob_locator,
100             '^d41d8cd98f00b204e9800998ecf8427e\+0',
101             ('wrong locator from Keep.put(""): ' + blob_locator))
102
103     def test_unicode_must_be_ascii(self):
104         # If unicode type, must only consist of valid ASCII
105         foo_locator = self.keep_client.put(u'foo')
106         self.assertRegex(
107             foo_locator,
108             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
109             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
110
111         if sys.version_info < (3, 0):
112             with self.assertRaises(UnicodeEncodeError):
113                 # Error if it is not ASCII
114                 self.keep_client.put(u'\xe2')
115
116         with self.assertRaises(AttributeError):
117             # Must be bytes or have an encode() method
118             self.keep_client.put({})
119
120     def test_KeepHeadTest(self):
121         locator = self.keep_client.put('test_head')
122         self.assertRegex(
123             locator,
124             '^b9a772c7049325feb7130fff1f8333e9\+9',
125             'wrong md5 hash from Keep.put for "test_head": ' + locator)
126         self.assertEqual(True, self.keep_client.head(locator))
127         self.assertEqual(self.keep_client.get(locator),
128                          b'test_head',
129                          'wrong content from Keep.get for "test_head"')
130
131 class KeepPermissionTestCase(run_test_server.TestCaseWithServers):
132     MAIN_SERVER = {}
133     KEEP_SERVER = {'blob_signing_key': 'abcdefghijk0123456789',
134                    'enforce_permissions': True}
135
136     def test_KeepBasicRWTest(self):
137         run_test_server.authorize_with('active')
138         keep_client = arvados.KeepClient()
139         foo_locator = keep_client.put('foo')
140         self.assertRegex(
141             foo_locator,
142             r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
143             'invalid locator from Keep.put("foo"): ' + foo_locator)
144         self.assertEqual(keep_client.get(foo_locator),
145                          b'foo',
146                          'wrong content from Keep.get(md5("foo"))')
147
148         # GET with an unsigned locator => NotFound
149         bar_locator = keep_client.put('bar')
150         unsigned_bar_locator = "37b51d194a7513e45b56f6524f2d51f2+3"
151         self.assertRegex(
152             bar_locator,
153             r'^37b51d194a7513e45b56f6524f2d51f2\+3\+A[a-f0-9]+@[a-f0-9]+$',
154             'invalid locator from Keep.put("bar"): ' + bar_locator)
155         self.assertRaises(arvados.errors.NotFoundError,
156                           keep_client.get,
157                           unsigned_bar_locator)
158
159         # GET from a different user => NotFound
160         run_test_server.authorize_with('spectator')
161         self.assertRaises(arvados.errors.NotFoundError,
162                           arvados.Keep.get,
163                           bar_locator)
164
165         # Unauthenticated GET for a signed locator => NotFound
166         # Unauthenticated GET for an unsigned locator => NotFound
167         keep_client.api_token = ''
168         self.assertRaises(arvados.errors.NotFoundError,
169                           keep_client.get,
170                           bar_locator)
171         self.assertRaises(arvados.errors.NotFoundError,
172                           keep_client.get,
173                           unsigned_bar_locator)
174
175
176 # KeepOptionalPermission: starts Keep with --permission-key-file
177 # but not --enforce-permissions (i.e. generate signatures on PUT
178 # requests, but do not require them for GET requests)
179 #
180 # All of these requests should succeed when permissions are optional:
181 # * authenticated request, signed locator
182 # * authenticated request, unsigned locator
183 # * unauthenticated request, signed locator
184 # * unauthenticated request, unsigned locator
185 class KeepOptionalPermission(run_test_server.TestCaseWithServers):
186     MAIN_SERVER = {}
187     KEEP_SERVER = {'blob_signing_key': 'abcdefghijk0123456789',
188                    'enforce_permissions': False}
189
190     @classmethod
191     def setUpClass(cls):
192         super(KeepOptionalPermission, cls).setUpClass()
193         run_test_server.authorize_with("admin")
194         cls.api_client = arvados.api('v1')
195
196     def setUp(self):
197         super(KeepOptionalPermission, self).setUp()
198         self.keep_client = arvados.KeepClient(api_client=self.api_client,
199                                               proxy='', local_store='')
200
201     def _put_foo_and_check(self):
202         signed_locator = self.keep_client.put('foo')
203         self.assertRegex(
204             signed_locator,
205             r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
206             'invalid locator from Keep.put("foo"): ' + signed_locator)
207         return signed_locator
208
209     def test_KeepAuthenticatedSignedTest(self):
210         signed_locator = self._put_foo_and_check()
211         self.assertEqual(self.keep_client.get(signed_locator),
212                          b'foo',
213                          'wrong content from Keep.get(md5("foo"))')
214
215     def test_KeepAuthenticatedUnsignedTest(self):
216         signed_locator = self._put_foo_and_check()
217         self.assertEqual(self.keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8"),
218                          b'foo',
219                          'wrong content from Keep.get(md5("foo"))')
220
221     def test_KeepUnauthenticatedSignedTest(self):
222         # Check that signed GET requests work even when permissions
223         # enforcement is off.
224         signed_locator = self._put_foo_and_check()
225         self.keep_client.api_token = ''
226         self.assertEqual(self.keep_client.get(signed_locator),
227                          b'foo',
228                          'wrong content from Keep.get(md5("foo"))')
229
230     def test_KeepUnauthenticatedUnsignedTest(self):
231         # Since --enforce-permissions is not in effect, GET requests
232         # need not be authenticated.
233         signed_locator = self._put_foo_and_check()
234         self.keep_client.api_token = ''
235         self.assertEqual(self.keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8"),
236                          b'foo',
237                          'wrong content from Keep.get(md5("foo"))')
238
239
240 class KeepProxyTestCase(run_test_server.TestCaseWithServers):
241     MAIN_SERVER = {}
242     KEEP_SERVER = {}
243     KEEP_PROXY_SERVER = {}
244
245     @classmethod
246     def setUpClass(cls):
247         super(KeepProxyTestCase, cls).setUpClass()
248         run_test_server.authorize_with('active')
249         cls.api_client = arvados.api('v1')
250
251     def tearDown(self):
252         arvados.config.settings().pop('ARVADOS_EXTERNAL_CLIENT', None)
253         super(KeepProxyTestCase, self).tearDown()
254
255     def test_KeepProxyTest1(self):
256         # Will use ARVADOS_KEEP_SERVICES environment variable that
257         # is set by setUpClass().
258         keep_client = arvados.KeepClient(api_client=self.api_client,
259                                          local_store='')
260         baz_locator = keep_client.put('baz')
261         self.assertRegex(
262             baz_locator,
263             '^73feffa4b7f6bb68e44cf984c85f6e88\+3',
264             'wrong md5 hash from Keep.put("baz"): ' + baz_locator)
265         self.assertEqual(keep_client.get(baz_locator),
266                          b'baz',
267                          'wrong content from Keep.get(md5("baz"))')
268         self.assertTrue(keep_client.using_proxy)
269
270     def test_KeepProxyTest2(self):
271         # Don't instantiate the proxy directly, but set the X-External-Client
272         # header.  The API server should direct us to the proxy.
273         arvados.config.settings()['ARVADOS_EXTERNAL_CLIENT'] = 'true'
274         keep_client = arvados.KeepClient(api_client=self.api_client,
275                                          proxy='', local_store='')
276         baz_locator = keep_client.put('baz2')
277         self.assertRegex(
278             baz_locator,
279             '^91f372a266fe2bf2823cb8ec7fda31ce\+4',
280             'wrong md5 hash from Keep.put("baz2"): ' + baz_locator)
281         self.assertEqual(keep_client.get(baz_locator),
282                          b'baz2',
283                          'wrong content from Keep.get(md5("baz2"))')
284         self.assertTrue(keep_client.using_proxy)
285
286     def test_KeepProxyTestMultipleURIs(self):
287         # Test using ARVADOS_KEEP_SERVICES env var overriding any
288         # existing proxy setting and setting multiple proxies
289         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
290         keep_client = arvados.KeepClient(api_client=self.api_client,
291                                          local_store='')
292         uris = [x['_service_root'] for x in keep_client._keep_services]
293         self.assertEqual(uris, ['http://10.0.0.1/',
294                                 'https://foo.example.org:1234/'])
295
296     def test_KeepProxyTestInvalidURI(self):
297         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org'
298         with self.assertRaises(arvados.errors.ArgumentError):
299             keep_client = arvados.KeepClient(api_client=self.api_client,
300                                              local_store='')
301
302
303 class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
304     def get_service_roots(self, api_client):
305         keep_client = arvados.KeepClient(api_client=api_client)
306         services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
307         return [urllib.parse.urlparse(url) for url in sorted(services)]
308
309     def test_ssl_flag_respected_in_roots(self):
310         for ssl_flag in [False, True]:
311             services = self.get_service_roots(self.mock_keep_services(
312                 service_ssl_flag=ssl_flag))
313             self.assertEqual(
314                 ('https' if ssl_flag else 'http'), services[0].scheme)
315
316     def test_correct_ports_with_ipv6_addresses(self):
317         service = self.get_service_roots(self.mock_keep_services(
318             service_type='proxy', service_host='100::1', service_port=10, count=1))[0]
319         self.assertEqual('100::1', service.hostname)
320         self.assertEqual(10, service.port)
321
322     def test_insecure_disables_tls_verify(self):
323         api_client = self.mock_keep_services(count=1)
324         force_timeout = socket.timeout("timed out")
325
326         api_client.insecure = True
327         with tutil.mock_keep_responses(b'foo', 200) as mock:
328             keep_client = arvados.KeepClient(api_client=api_client)
329             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
330             self.assertEqual(
331                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
332                 0)
333
334         api_client.insecure = False
335         with tutil.mock_keep_responses(b'foo', 200) as mock:
336             keep_client = arvados.KeepClient(api_client=api_client)
337             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
338             # getopt()==None here means we didn't change the
339             # default. If we were using real pycurl instead of a mock,
340             # it would return the default value 1.
341             self.assertEqual(
342                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
343                 None)
344
345     def test_refresh_signature(self):
346         blk_digest = '6f5902ac237024bdd0c176cb93063dc4+11'
347         blk_sig = 'da39a3ee5e6b4b0d3255bfef95601890afd80709@53bed294'
348         local_loc = blk_digest+'+A'+blk_sig
349         remote_loc = blk_digest+'+R'+blk_sig
350         api_client = self.mock_keep_services(count=1)
351         headers = {'X-Keep-Locator':local_loc}
352         with tutil.mock_keep_responses('', 200, **headers):
353             # Check that the translated locator gets returned
354             keep_client = arvados.KeepClient(api_client=api_client)
355             self.assertEqual(local_loc, keep_client.refresh_signature(remote_loc))
356             # Check that refresh_signature() uses the correct method and headers
357             keep_client._get_or_head = mock.MagicMock()
358             keep_client.refresh_signature(remote_loc)
359             args, kwargs = keep_client._get_or_head.call_args_list[0]
360             self.assertIn(remote_loc, args)
361             self.assertEqual("HEAD", kwargs['method'])
362             self.assertIn('X-Keep-Signature', kwargs['headers'])
363
364     # test_*_timeout verify that KeepClient instructs pycurl to use
365     # the appropriate connection and read timeouts. They don't care
366     # whether pycurl actually exhibits the expected timeout behavior
367     # -- those tests are in the KeepClientTimeout test class.
368
369     def test_get_timeout(self):
370         api_client = self.mock_keep_services(count=1)
371         force_timeout = socket.timeout("timed out")
372         with tutil.mock_keep_responses(force_timeout, 0) as mock:
373             keep_client = arvados.KeepClient(api_client=api_client)
374             with self.assertRaises(arvados.errors.KeepReadError):
375                 keep_client.get('ffffffffffffffffffffffffffffffff')
376             self.assertEqual(
377                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
378                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
379             self.assertEqual(
380                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
381                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
382             self.assertEqual(
383                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
384                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
385
386     def test_put_timeout(self):
387         api_client = self.mock_keep_services(count=1)
388         force_timeout = socket.timeout("timed out")
389         with tutil.mock_keep_responses(force_timeout, 0) as mock:
390             keep_client = arvados.KeepClient(api_client=api_client)
391             with self.assertRaises(arvados.errors.KeepWriteError):
392                 keep_client.put(b'foo')
393             self.assertEqual(
394                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
395                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
396             self.assertEqual(
397                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
398                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
399             self.assertEqual(
400                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
401                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
402
403     def test_head_timeout(self):
404         api_client = self.mock_keep_services(count=1)
405         force_timeout = socket.timeout("timed out")
406         with tutil.mock_keep_responses(force_timeout, 0) as mock:
407             keep_client = arvados.KeepClient(api_client=api_client)
408             with self.assertRaises(arvados.errors.KeepReadError):
409                 keep_client.head('ffffffffffffffffffffffffffffffff')
410             self.assertEqual(
411                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
412                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
413             self.assertEqual(
414                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
415                 None)
416             self.assertEqual(
417                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
418                 None)
419
420     def test_proxy_get_timeout(self):
421         api_client = self.mock_keep_services(service_type='proxy', count=1)
422         force_timeout = socket.timeout("timed out")
423         with tutil.mock_keep_responses(force_timeout, 0) as mock:
424             keep_client = arvados.KeepClient(api_client=api_client)
425             with self.assertRaises(arvados.errors.KeepReadError):
426                 keep_client.get('ffffffffffffffffffffffffffffffff')
427             self.assertEqual(
428                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
429                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
430             self.assertEqual(
431                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
432                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
433             self.assertEqual(
434                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
435                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
436
437     def test_proxy_head_timeout(self):
438         api_client = self.mock_keep_services(service_type='proxy', count=1)
439         force_timeout = socket.timeout("timed out")
440         with tutil.mock_keep_responses(force_timeout, 0) as mock:
441             keep_client = arvados.KeepClient(api_client=api_client)
442             with self.assertRaises(arvados.errors.KeepReadError):
443                 keep_client.head('ffffffffffffffffffffffffffffffff')
444             self.assertEqual(
445                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
446                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
447             self.assertEqual(
448                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
449                 None)
450             self.assertEqual(
451                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
452                 None)
453
454     def test_proxy_put_timeout(self):
455         api_client = self.mock_keep_services(service_type='proxy', count=1)
456         force_timeout = socket.timeout("timed out")
457         with tutil.mock_keep_responses(force_timeout, 0) as mock:
458             keep_client = arvados.KeepClient(api_client=api_client)
459             with self.assertRaises(arvados.errors.KeepWriteError):
460                 keep_client.put('foo')
461             self.assertEqual(
462                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
463                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
464             self.assertEqual(
465                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
466                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
467             self.assertEqual(
468                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
469                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
470
471     def check_no_services_error(self, verb, exc_class):
472         api_client = mock.MagicMock(name='api_client')
473         api_client.keep_services().accessible().execute.side_effect = (
474             arvados.errors.ApiError)
475         keep_client = arvados.KeepClient(api_client=api_client)
476         with self.assertRaises(exc_class) as err_check:
477             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
478         self.assertEqual(0, len(err_check.exception.request_errors()))
479
480     def test_get_error_with_no_services(self):
481         self.check_no_services_error('get', arvados.errors.KeepReadError)
482
483     def test_head_error_with_no_services(self):
484         self.check_no_services_error('head', arvados.errors.KeepReadError)
485
486     def test_put_error_with_no_services(self):
487         self.check_no_services_error('put', arvados.errors.KeepWriteError)
488
489     def check_errors_from_last_retry(self, verb, exc_class):
490         api_client = self.mock_keep_services(count=2)
491         req_mock = tutil.mock_keep_responses(
492             "retry error reporting test", 500, 500, 500, 500, 500, 500, 502, 502)
493         with req_mock, tutil.skip_sleep, \
494                 self.assertRaises(exc_class) as err_check:
495             keep_client = arvados.KeepClient(api_client=api_client)
496             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
497                                        num_retries=3)
498         self.assertEqual([502, 502], [
499                 getattr(error, 'status_code', None)
500                 for error in err_check.exception.request_errors().values()])
501         self.assertRegex(str(err_check.exception), r'failed to (read|write) .* after 4 attempts')
502
503     def test_get_error_reflects_last_retry(self):
504         self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
505
506     def test_head_error_reflects_last_retry(self):
507         self.check_errors_from_last_retry('head', arvados.errors.KeepReadError)
508
509     def test_put_error_reflects_last_retry(self):
510         self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
511
512     def test_put_error_does_not_include_successful_puts(self):
513         data = 'partial failure test'
514         data_loc = tutil.str_keep_locator(data)
515         api_client = self.mock_keep_services(count=3)
516         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
517                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
518             keep_client = arvados.KeepClient(api_client=api_client)
519             keep_client.put(data)
520         self.assertEqual(2, len(exc_check.exception.request_errors()))
521
522     def test_proxy_put_with_no_writable_services(self):
523         data = 'test with no writable services'
524         data_loc = tutil.str_keep_locator(data)
525         api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
526         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
527                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
528           keep_client = arvados.KeepClient(api_client=api_client)
529           keep_client.put(data)
530         self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
531         self.assertEqual(0, len(exc_check.exception.request_errors()))
532
533     def test_oddball_service_get(self):
534         body = b'oddball service get'
535         api_client = self.mock_keep_services(service_type='fancynewblobstore')
536         with tutil.mock_keep_responses(body, 200):
537             keep_client = arvados.KeepClient(api_client=api_client)
538             actual = keep_client.get(tutil.str_keep_locator(body))
539         self.assertEqual(body, actual)
540
541     def test_oddball_service_put(self):
542         body = b'oddball service put'
543         pdh = tutil.str_keep_locator(body)
544         api_client = self.mock_keep_services(service_type='fancynewblobstore')
545         with tutil.mock_keep_responses(pdh, 200):
546             keep_client = arvados.KeepClient(api_client=api_client)
547             actual = keep_client.put(body, copies=1)
548         self.assertEqual(pdh, actual)
549
550     def test_oddball_service_writer_count(self):
551         body = b'oddball service writer count'
552         pdh = tutil.str_keep_locator(body)
553         api_client = self.mock_keep_services(service_type='fancynewblobstore',
554                                              count=4)
555         headers = {'x-keep-replicas-stored': 3}
556         with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
557                                        **headers) as req_mock:
558             keep_client = arvados.KeepClient(api_client=api_client)
559             actual = keep_client.put(body, copies=2)
560         self.assertEqual(pdh, actual)
561         self.assertEqual(1, req_mock.call_count)
562
563
564 @tutil.skip_sleep
565 class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
566     def setUp(self):
567         self.api_client = self.mock_keep_services(count=2)
568         self.keep_client = arvados.KeepClient(api_client=self.api_client)
569         self.data = b'xyzzy'
570         self.locator = '1271ed5ef305aadabc605b1609e24c52'
571
572     @mock.patch('arvados.KeepClient.KeepService.get')
573     def test_get_request_cache(self, get_mock):
574         with tutil.mock_keep_responses(self.data, 200, 200):
575             self.keep_client.get(self.locator)
576             self.keep_client.get(self.locator)
577         # Request already cached, don't require more than one request
578         get_mock.assert_called_once()
579
580     @mock.patch('arvados.KeepClient.KeepService.get')
581     def test_head_request_cache(self, get_mock):
582         with tutil.mock_keep_responses(self.data, 200, 200):
583             self.keep_client.head(self.locator)
584             self.keep_client.head(self.locator)
585         # Don't cache HEAD requests so that they're not confused with GET reqs
586         self.assertEqual(2, get_mock.call_count)
587
588     @mock.patch('arvados.KeepClient.KeepService.get')
589     def test_head_and_then_get_return_different_responses(self, get_mock):
590         head_resp = None
591         get_resp = None
592         get_mock.side_effect = ['first response', 'second response']
593         with tutil.mock_keep_responses(self.data, 200, 200):
594             head_resp = self.keep_client.head(self.locator)
595             get_resp = self.keep_client.get(self.locator)
596         self.assertEqual('first response', head_resp)
597         # First reponse was not cached because it was from a HEAD request.
598         self.assertNotEqual(head_resp, get_resp)
599
600
601 @tutil.skip_sleep
602 class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock):
603     def setUp(self):
604         self.api_client = self.mock_keep_services(count=2)
605         self.keep_client = arvados.KeepClient(api_client=self.api_client)
606         self.data = b'xyzzy'
607         self.locator = '1271ed5ef305aadabc605b1609e24c52'
608         self.test_id = arvados.util.new_request_id()
609         self.assertRegex(self.test_id, r'^req-[a-z0-9]{20}$')
610         # If we don't set request_id to None explicitly here, it will
611         # return <MagicMock name='api_client_mock.request_id'
612         # id='123456789'>:
613         self.api_client.request_id = None
614
615     def test_default_to_api_client_request_id(self):
616         self.api_client.request_id = self.test_id
617         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
618             self.keep_client.put(self.data)
619         self.assertEqual(2, len(mock.responses))
620         for resp in mock.responses:
621             self.assertProvidedRequestId(resp)
622
623         with tutil.mock_keep_responses(self.data, 200) as mock:
624             self.keep_client.get(self.locator)
625         self.assertProvidedRequestId(mock.responses[0])
626
627         with tutil.mock_keep_responses(b'', 200) as mock:
628             self.keep_client.head(self.locator)
629         self.assertProvidedRequestId(mock.responses[0])
630
631     def test_explicit_request_id(self):
632         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
633             self.keep_client.put(self.data, request_id=self.test_id)
634         self.assertEqual(2, len(mock.responses))
635         for resp in mock.responses:
636             self.assertProvidedRequestId(resp)
637
638         with tutil.mock_keep_responses(self.data, 200) as mock:
639             self.keep_client.get(self.locator, request_id=self.test_id)
640         self.assertProvidedRequestId(mock.responses[0])
641
642         with tutil.mock_keep_responses(b'', 200) as mock:
643             self.keep_client.head(self.locator, request_id=self.test_id)
644         self.assertProvidedRequestId(mock.responses[0])
645
646     def test_automatic_request_id(self):
647         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
648             self.keep_client.put(self.data)
649         self.assertEqual(2, len(mock.responses))
650         for resp in mock.responses:
651             self.assertAutomaticRequestId(resp)
652
653         with tutil.mock_keep_responses(self.data, 200) as mock:
654             self.keep_client.get(self.locator)
655         self.assertAutomaticRequestId(mock.responses[0])
656
657         with tutil.mock_keep_responses(b'', 200) as mock:
658             self.keep_client.head(self.locator)
659         self.assertAutomaticRequestId(mock.responses[0])
660
661     def assertAutomaticRequestId(self, resp):
662         hdr = [x for x in resp.getopt(pycurl.HTTPHEADER)
663                if x.startswith('X-Request-Id: ')][0]
664         self.assertNotEqual(hdr, 'X-Request-Id: '+self.test_id)
665         self.assertRegex(hdr, r'^X-Request-Id: req-[a-z0-9]{20}$')
666
667     def assertProvidedRequestId(self, resp):
668         self.assertIn('X-Request-Id: '+self.test_id,
669                       resp.getopt(pycurl.HTTPHEADER))
670
671
672 @tutil.skip_sleep
673 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
674
675     def setUp(self):
676         # expected_order[i] is the probe order for
677         # hash=md5(sprintf("%064x",i)) where there are 16 services
678         # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
679         # the first probe for the block consisting of 64 "0"
680         # characters is the service whose uuid is
681         # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
682         self.services = 16
683         self.expected_order = [
684             list('3eab2d5fc9681074'),
685             list('097dba52e648f1c3'),
686             list('c5b4e023f8a7d691'),
687             list('9d81c02e76a3bf54'),
688             ]
689         self.blocks = [
690             "{:064x}".format(x).encode()
691             for x in range(len(self.expected_order))]
692         self.hashes = [
693             hashlib.md5(self.blocks[x]).hexdigest()
694             for x in range(len(self.expected_order))]
695         self.api_client = self.mock_keep_services(count=self.services)
696         self.keep_client = arvados.KeepClient(api_client=self.api_client)
697
698     def test_weighted_service_roots_against_reference_set(self):
699         # Confirm weighted_service_roots() returns the correct order
700         for i, hash in enumerate(self.hashes):
701             roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash))
702             got_order = [
703                 re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
704                 for root in roots]
705             self.assertEqual(self.expected_order[i], got_order)
706
707     def test_get_probe_order_against_reference_set(self):
708         self._test_probe_order_against_reference_set(
709             lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
710
711     def test_head_probe_order_against_reference_set(self):
712         self._test_probe_order_against_reference_set(
713             lambda i: self.keep_client.head(self.hashes[i], num_retries=1))
714
715     def test_put_probe_order_against_reference_set(self):
716         # copies=1 prevents the test from being sensitive to races
717         # between writer threads.
718         self._test_probe_order_against_reference_set(
719             lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1))
720
721     def _test_probe_order_against_reference_set(self, op):
722         for i in range(len(self.blocks)):
723             with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \
724                  self.assertRaises(arvados.errors.KeepRequestError):
725                 op(i)
726             got_order = [
727                 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
728                 for resp in mock.responses]
729             self.assertEqual(self.expected_order[i]*2, got_order)
730
731     def test_put_probe_order_multiple_copies(self):
732         for copies in range(2, 4):
733             for i in range(len(self.blocks)):
734                 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \
735                      self.assertRaises(arvados.errors.KeepWriteError):
736                     self.keep_client.put(self.blocks[i], num_retries=2, copies=copies)
737                 got_order = [
738                     re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
739                     for resp in mock.responses]
740                 # With T threads racing to make requests, the position
741                 # of a given server in the sequence of HTTP requests
742                 # (got_order) cannot be more than T-1 positions
743                 # earlier than that server's position in the reference
744                 # probe sequence (expected_order).
745                 #
746                 # Loop invariant: we have accounted for +pos+ expected
747                 # probes, either by seeing them in +got_order+ or by
748                 # putting them in +pending+ in the hope of seeing them
749                 # later. As long as +len(pending)<T+, we haven't
750                 # started a request too early.
751                 pending = []
752                 for pos, expected in enumerate(self.expected_order[i]*3):
753                     got = got_order[pos-len(pending)]
754                     while got in pending:
755                         del pending[pending.index(got)]
756                         got = got_order[pos-len(pending)]
757                     if got != expected:
758                         pending.append(expected)
759                         self.assertLess(
760                             len(pending), copies,
761                             "pending={}, with copies={}, got {}, expected {}".format(
762                                 pending, copies, repr(got_order), repr(self.expected_order[i]*3)))
763
764     def test_probe_waste_adding_one_server(self):
765         hashes = [
766             hashlib.md5("{:064x}".format(x).encode()).hexdigest() for x in range(100)]
767         initial_services = 12
768         self.api_client = self.mock_keep_services(count=initial_services)
769         self.keep_client = arvados.KeepClient(api_client=self.api_client)
770         probes_before = [
771             self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
772         for added_services in range(1, 12):
773             api_client = self.mock_keep_services(count=initial_services+added_services)
774             keep_client = arvados.KeepClient(api_client=api_client)
775             total_penalty = 0
776             for hash_index in range(len(hashes)):
777                 probe_after = keep_client.weighted_service_roots(
778                     arvados.KeepLocator(hashes[hash_index]))
779                 penalty = probe_after.index(probes_before[hash_index][0])
780                 self.assertLessEqual(penalty, added_services)
781                 total_penalty += penalty
782             # Average penalty per block should not exceed
783             # N(added)/N(orig) by more than 20%, and should get closer
784             # to the ideal as we add data points.
785             expect_penalty = (
786                 added_services *
787                 len(hashes) / initial_services)
788             max_penalty = (
789                 expect_penalty *
790                 (120 - added_services)/100)
791             min_penalty = (
792                 expect_penalty * 8/10)
793             self.assertTrue(
794                 min_penalty <= total_penalty <= max_penalty,
795                 "With {}+{} services, {} blocks, penalty {} but expected {}..{}".format(
796                     initial_services,
797                     added_services,
798                     len(hashes),
799                     total_penalty,
800                     min_penalty,
801                     max_penalty))
802
803     def check_64_zeros_error_order(self, verb, exc_class):
804         data = b'0' * 64
805         if verb == 'get':
806             data = tutil.str_keep_locator(data)
807         # Arbitrary port number:
808         aport = random.randint(1024,65535)
809         api_client = self.mock_keep_services(service_port=aport, count=self.services)
810         keep_client = arvados.KeepClient(api_client=api_client)
811         with mock.patch('pycurl.Curl') as curl_mock, \
812              self.assertRaises(exc_class) as err_check:
813             curl_mock.return_value = tutil.FakeCurl.make(code=500, body=b'')
814             getattr(keep_client, verb)(data)
815         urls = [urllib.parse.urlparse(url)
816                 for url in err_check.exception.request_errors()]
817         self.assertEqual([('keep0x' + c, aport) for c in '3eab2d5fc9681074'],
818                          [(url.hostname, url.port) for url in urls])
819
820     def test_get_error_shows_probe_order(self):
821         self.check_64_zeros_error_order('get', arvados.errors.KeepReadError)
822
823     def test_put_error_shows_probe_order(self):
824         self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
825
826
827 class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase):
828     # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
829     # 1s worth of data and then trigger bandwidth errors before running
830     # out of data.
831     DATA = b'x'*2**11
832     BANDWIDTH_LOW_LIM = 1024
833     TIMEOUT_TIME = 1.0
834
835     class assertTakesBetween(unittest.TestCase):
836         def __init__(self, tmin, tmax):
837             self.tmin = tmin
838             self.tmax = tmax
839
840         def __enter__(self):
841             self.t0 = time.time()
842
843         def __exit__(self, *args, **kwargs):
844             # Round times to milliseconds, like CURL. Otherwise, we
845             # fail when CURL reaches a 1s timeout at 0.9998s.
846             delta = round(time.time() - self.t0, 3)
847             self.assertGreaterEqual(delta, self.tmin)
848             self.assertLessEqual(delta, self.tmax)
849
850     class assertTakesGreater(unittest.TestCase):
851         def __init__(self, tmin):
852             self.tmin = tmin
853
854         def __enter__(self):
855             self.t0 = time.time()
856
857         def __exit__(self, *args, **kwargs):
858             delta = round(time.time() - self.t0, 3)
859             self.assertGreaterEqual(delta, self.tmin)
860
861     def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
862         return arvados.KeepClient(
863             api_client=self.api_client,
864             timeout=timeouts)
865
866     def test_timeout_slow_connect(self):
867         # Can't simulate TCP delays with our own socket. Leave our
868         # stub server running uselessly, and try to connect to an
869         # unroutable IP address instead.
870         self.api_client = self.mock_keep_services(
871             count=1,
872             service_host='240.0.0.0',
873         )
874         with self.assertTakesBetween(0.1, 0.5):
875             with self.assertRaises(arvados.errors.KeepWriteError):
876                 self.keepClient().put(self.DATA, copies=1, num_retries=0)
877
878     def test_low_bandwidth_no_delays_success(self):
879         self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
880         kc = self.keepClient()
881         loc = kc.put(self.DATA, copies=1, num_retries=0)
882         self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
883
884     def test_too_low_bandwidth_no_delays_failure(self):
885         # Check that lessening bandwidth corresponds to failing
886         kc = self.keepClient()
887         loc = kc.put(self.DATA, copies=1, num_retries=0)
888         self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
889         with self.assertTakesGreater(self.TIMEOUT_TIME):
890             with self.assertRaises(arvados.errors.KeepReadError):
891                 kc.get(loc, num_retries=0)
892         with self.assertTakesGreater(self.TIMEOUT_TIME):
893             with self.assertRaises(arvados.errors.KeepWriteError):
894                 kc.put(self.DATA, copies=1, num_retries=0)
895
896     def test_low_bandwidth_with_server_response_delay_failure(self):
897         kc = self.keepClient()
898         loc = kc.put(self.DATA, copies=1, num_retries=0)
899         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
900         self.server.setdelays(response=self.TIMEOUT_TIME)
901         with self.assertTakesGreater(self.TIMEOUT_TIME):
902             with self.assertRaises(arvados.errors.KeepReadError):
903                 kc.get(loc, num_retries=0)
904         with self.assertTakesGreater(self.TIMEOUT_TIME):
905             with self.assertRaises(arvados.errors.KeepWriteError):
906                 kc.put(self.DATA, copies=1, num_retries=0)
907         with self.assertTakesGreater(self.TIMEOUT_TIME):
908             kc.head(loc, num_retries=0)
909
910     def test_low_bandwidth_with_server_mid_delay_failure(self):
911         kc = self.keepClient()
912         loc = kc.put(self.DATA, copies=1, num_retries=0)
913         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
914         self.server.setdelays(mid_write=self.TIMEOUT_TIME, mid_read=self.TIMEOUT_TIME)
915         with self.assertTakesGreater(self.TIMEOUT_TIME):
916             with self.assertRaises(arvados.errors.KeepReadError) as e:
917                 kc.get(loc, num_retries=0)
918         with self.assertTakesGreater(self.TIMEOUT_TIME):
919             with self.assertRaises(arvados.errors.KeepWriteError):
920                 kc.put(self.DATA, copies=1, num_retries=0)
921
922     def test_timeout_slow_request(self):
923         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
924         self.server.setdelays(request=.2)
925         self._test_connect_timeout_under_200ms(loc)
926         self.server.setdelays(request=2)
927         self._test_response_timeout_under_2s(loc)
928
929     def test_timeout_slow_response(self):
930         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
931         self.server.setdelays(response=.2)
932         self._test_connect_timeout_under_200ms(loc)
933         self.server.setdelays(response=2)
934         self._test_response_timeout_under_2s(loc)
935
936     def test_timeout_slow_response_body(self):
937         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
938         self.server.setdelays(response_body=.2)
939         self._test_connect_timeout_under_200ms(loc)
940         self.server.setdelays(response_body=2)
941         self._test_response_timeout_under_2s(loc)
942
943     def _test_connect_timeout_under_200ms(self, loc):
944         # Allow 100ms to connect, then 1s for response. Everything
945         # should work, and everything should take at least 200ms to
946         # return.
947         kc = self.keepClient(timeouts=(.1, 1))
948         with self.assertTakesBetween(.2, .3):
949             kc.put(self.DATA, copies=1, num_retries=0)
950         with self.assertTakesBetween(.2, .3):
951             self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
952
953     def _test_response_timeout_under_2s(self, loc):
954         # Allow 10s to connect, then 1s for response. Nothing should
955         # work, and everything should take at least 1s to return.
956         kc = self.keepClient(timeouts=(10, 1))
957         with self.assertTakesBetween(1, 9):
958             with self.assertRaises(arvados.errors.KeepReadError):
959                 kc.get(loc, num_retries=0)
960         with self.assertTakesBetween(1, 9):
961             with self.assertRaises(arvados.errors.KeepWriteError):
962                 kc.put(self.DATA, copies=1, num_retries=0)
963
964
965 class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
966     def mock_disks_and_gateways(self, disks=3, gateways=1):
967         self.gateways = [{
968                 'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
969                 'owner_uuid': 'zzzzz-tpzed-000000000000000',
970                 'service_host': 'gatewayhost{}'.format(i),
971                 'service_port': 12345,
972                 'service_ssl_flag': True,
973                 'service_type': 'gateway:test',
974         } for i in range(gateways)]
975         self.gateway_roots = [
976             "https://{service_host}:{service_port}/".format(**gw)
977             for gw in self.gateways]
978         self.api_client = self.mock_keep_services(
979             count=disks, additional_services=self.gateways)
980         self.keepClient = arvados.KeepClient(api_client=self.api_client)
981
982     @mock.patch('pycurl.Curl')
983     def test_get_with_gateway_hint_first(self, MockCurl):
984         MockCurl.return_value = tutil.FakeCurl.make(
985             code=200, body='foo', headers={'Content-Length': 3})
986         self.mock_disks_and_gateways()
987         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@' + self.gateways[0]['uuid']
988         self.assertEqual(b'foo', self.keepClient.get(locator))
989         self.assertEqual(self.gateway_roots[0]+locator,
990                          MockCurl.return_value.getopt(pycurl.URL).decode())
991         self.assertEqual(True, self.keepClient.head(locator))
992
993     @mock.patch('pycurl.Curl')
994     def test_get_with_gateway_hints_in_order(self, MockCurl):
995         gateways = 4
996         disks = 3
997         mocks = [
998             tutil.FakeCurl.make(code=404, body='')
999             for _ in range(gateways+disks)
1000         ]
1001         MockCurl.side_effect = tutil.queue_with(mocks)
1002         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1003         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1004                            ['K@'+gw['uuid'] for gw in self.gateways])
1005         with self.assertRaises(arvados.errors.NotFoundError):
1006             self.keepClient.get(locator)
1007         # Gateways are tried first, in the order given.
1008         for i, root in enumerate(self.gateway_roots):
1009             self.assertEqual(root+locator,
1010                              mocks[i].getopt(pycurl.URL).decode())
1011         # Disk services are tried next.
1012         for i in range(gateways, gateways+disks):
1013             self.assertRegex(
1014                 mocks[i].getopt(pycurl.URL).decode(),
1015                 r'keep0x')
1016
1017     @mock.patch('pycurl.Curl')
1018     def test_head_with_gateway_hints_in_order(self, MockCurl):
1019         gateways = 4
1020         disks = 3
1021         mocks = [
1022             tutil.FakeCurl.make(code=404, body=b'')
1023             for _ in range(gateways+disks)
1024         ]
1025         MockCurl.side_effect = tutil.queue_with(mocks)
1026         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1027         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1028                            ['K@'+gw['uuid'] for gw in self.gateways])
1029         with self.assertRaises(arvados.errors.NotFoundError):
1030             self.keepClient.head(locator)
1031         # Gateways are tried first, in the order given.
1032         for i, root in enumerate(self.gateway_roots):
1033             self.assertEqual(root+locator,
1034                              mocks[i].getopt(pycurl.URL).decode())
1035         # Disk services are tried next.
1036         for i in range(gateways, gateways+disks):
1037             self.assertRegex(
1038                 mocks[i].getopt(pycurl.URL).decode(),
1039                 r'keep0x')
1040
1041     @mock.patch('pycurl.Curl')
1042     def test_get_with_remote_proxy_hint(self, MockCurl):
1043         MockCurl.return_value = tutil.FakeCurl.make(
1044             code=200, body=b'foo', headers={'Content-Length': 3})
1045         self.mock_disks_and_gateways()
1046         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1047         self.assertEqual(b'foo', self.keepClient.get(locator))
1048         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1049                          MockCurl.return_value.getopt(pycurl.URL).decode())
1050
1051     @mock.patch('pycurl.Curl')
1052     def test_head_with_remote_proxy_hint(self, MockCurl):
1053         MockCurl.return_value = tutil.FakeCurl.make(
1054             code=200, body=b'foo', headers={'Content-Length': 3})
1055         self.mock_disks_and_gateways()
1056         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1057         self.assertEqual(True, self.keepClient.head(locator))
1058         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1059                          MockCurl.return_value.getopt(pycurl.URL).decode())
1060
1061
1062 class KeepClientRetryTestMixin(object):
1063     # Testing with a local Keep store won't exercise the retry behavior.
1064     # Instead, our strategy is:
1065     # * Create a client with one proxy specified (pointed at a black
1066     #   hole), so there's no need to instantiate an API client, and
1067     #   all HTTP requests come from one place.
1068     # * Mock httplib's request method to provide simulated responses.
1069     # This lets us test the retry logic extensively without relying on any
1070     # supporting servers, and prevents side effects in case something hiccups.
1071     # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
1072     # run_method().
1073     #
1074     # Test classes must define TEST_PATCHER to a method that mocks
1075     # out appropriate methods in the client.
1076
1077     PROXY_ADDR = 'http://[%s]:65535/' % (tutil.TEST_HOST,)
1078     TEST_DATA = b'testdata'
1079     TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
1080
1081     def setUp(self):
1082         self.client_kwargs = {'proxy': self.PROXY_ADDR, 'local_store': ''}
1083
1084     def new_client(self, **caller_kwargs):
1085         kwargs = self.client_kwargs.copy()
1086         kwargs.update(caller_kwargs)
1087         return arvados.KeepClient(**kwargs)
1088
1089     def run_method(self, *args, **kwargs):
1090         raise NotImplementedError("test subclasses must define run_method")
1091
1092     def check_success(self, expected=None, *args, **kwargs):
1093         if expected is None:
1094             expected = self.DEFAULT_EXPECT
1095         self.assertEqual(expected, self.run_method(*args, **kwargs))
1096
1097     def check_exception(self, error_class=None, *args, **kwargs):
1098         if error_class is None:
1099             error_class = self.DEFAULT_EXCEPTION
1100         with self.assertRaises(error_class) as err:
1101             self.run_method(*args, **kwargs)
1102         return err
1103
1104     def test_immediate_success(self):
1105         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 200):
1106             self.check_success()
1107
1108     def test_retry_then_success(self):
1109         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1110             self.check_success(num_retries=3)
1111
1112     def test_exception_then_success(self):
1113         with self.TEST_PATCHER(self.DEFAULT_EXPECT, Exception('mock err'), 200):
1114             self.check_success(num_retries=3)
1115
1116     def test_no_default_retry(self):
1117         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1118             self.check_exception()
1119
1120     def test_no_retry_after_permanent_error(self):
1121         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 403, 200):
1122             self.check_exception(num_retries=3)
1123
1124     def test_error_after_retries_exhausted(self):
1125         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 500, 200):
1126             err = self.check_exception(num_retries=1)
1127         self.assertRegex(str(err.exception), r'failed to .* after 2 attempts')
1128
1129     def test_num_retries_instance_fallback(self):
1130         self.client_kwargs['num_retries'] = 3
1131         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1132             self.check_success()
1133
1134
1135 @tutil.skip_sleep
1136 class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1137     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
1138     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1139     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1140     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1141
1142     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1143                    *args, **kwargs):
1144         return self.new_client().get(locator, *args, **kwargs)
1145
1146     def test_specific_exception_when_not_found(self):
1147         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1148             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1149
1150     def test_general_exception_with_mixed_errors(self):
1151         # get should raise a NotFoundError if no server returns the block,
1152         # and a high threshold of servers report that it's not found.
1153         # This test rigs up 50/50 disagreement between two servers, and
1154         # checks that it does not become a NotFoundError.
1155         client = self.new_client()
1156         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1157             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1158                 client.get(self.HINTED_LOCATOR)
1159             self.assertNotIsInstance(
1160                 exc_check.exception, arvados.errors.NotFoundError,
1161                 "mixed errors raised NotFoundError")
1162
1163     def test_hint_server_can_succeed_without_retries(self):
1164         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1165             self.check_success(locator=self.HINTED_LOCATOR)
1166
1167     def test_try_next_server_after_timeout(self):
1168         with tutil.mock_keep_responses(
1169                 (socket.timeout("timed out"), 200),
1170                 (self.DEFAULT_EXPECT, 200)):
1171             self.check_success(locator=self.HINTED_LOCATOR)
1172
1173     def test_retry_data_with_wrong_checksum(self):
1174         with tutil.mock_keep_responses(
1175                 ('baddata', 200),
1176                 (self.DEFAULT_EXPECT, 200)):
1177             self.check_success(locator=self.HINTED_LOCATOR)
1178
1179 @tutil.skip_sleep
1180 class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1181     DEFAULT_EXPECT = True
1182     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1183     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1184     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1185
1186     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1187                    *args, **kwargs):
1188         return self.new_client().head(locator, *args, **kwargs)
1189
1190     def test_specific_exception_when_not_found(self):
1191         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1192             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1193
1194     def test_general_exception_with_mixed_errors(self):
1195         # head should raise a NotFoundError if no server returns the block,
1196         # and a high threshold of servers report that it's not found.
1197         # This test rigs up 50/50 disagreement between two servers, and
1198         # checks that it does not become a NotFoundError.
1199         client = self.new_client()
1200         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1201             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1202                 client.head(self.HINTED_LOCATOR)
1203             self.assertNotIsInstance(
1204                 exc_check.exception, arvados.errors.NotFoundError,
1205                 "mixed errors raised NotFoundError")
1206
1207     def test_hint_server_can_succeed_without_retries(self):
1208         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1209             self.check_success(locator=self.HINTED_LOCATOR)
1210
1211     def test_try_next_server_after_timeout(self):
1212         with tutil.mock_keep_responses(
1213                 (socket.timeout("timed out"), 200),
1214                 (self.DEFAULT_EXPECT, 200)):
1215             self.check_success(locator=self.HINTED_LOCATOR)
1216
1217 @tutil.skip_sleep
1218 class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1219     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
1220     DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
1221     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1222
1223     def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
1224                    copies=1, *args, **kwargs):
1225         return self.new_client().put(data, copies, *args, **kwargs)
1226
1227     def test_do_not_send_multiple_copies_to_same_server(self):
1228         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200):
1229             self.check_exception(copies=2, num_retries=3)
1230
1231
1232 class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
1233
1234     class FakeKeepService(object):
1235         def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
1236             self.delay = delay
1237             self.will_succeed = will_succeed
1238             self.will_raise = will_raise
1239             self._result = {}
1240             self._result['headers'] = {}
1241             self._result['headers']['x-keep-replicas-stored'] = str(replicas)
1242             self._result['body'] = 'foobar'
1243
1244         def put(self, data_hash, data, timeout):
1245             time.sleep(self.delay)
1246             if self.will_raise is not None:
1247                 raise self.will_raise
1248             return self.will_succeed
1249
1250         def last_result(self):
1251             if self.will_succeed:
1252                 return self._result
1253
1254         def finished(self):
1255             return False
1256
1257     def setUp(self):
1258         self.copies = 3
1259         self.pool = arvados.KeepClient.KeepWriterThreadPool(
1260             data = 'foo',
1261             data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1262             max_service_replicas = self.copies,
1263             copies = self.copies
1264         )
1265
1266     def test_only_write_enough_on_success(self):
1267         for i in range(10):
1268             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1269             self.pool.add_task(ks, None)
1270         self.pool.join()
1271         self.assertEqual(self.pool.done(), self.copies)
1272
1273     def test_only_write_enough_on_partial_success(self):
1274         for i in range(5):
1275             ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
1276             self.pool.add_task(ks, None)
1277             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1278             self.pool.add_task(ks, None)
1279         self.pool.join()
1280         self.assertEqual(self.pool.done(), self.copies)
1281
1282     def test_only_write_enough_when_some_crash(self):
1283         for i in range(5):
1284             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1285             self.pool.add_task(ks, None)
1286             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1287             self.pool.add_task(ks, None)
1288         self.pool.join()
1289         self.assertEqual(self.pool.done(), self.copies)
1290
1291     def test_fail_when_too_many_crash(self):
1292         for i in range(self.copies+1):
1293             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1294             self.pool.add_task(ks, None)
1295         for i in range(self.copies-1):
1296             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1297             self.pool.add_task(ks, None)
1298         self.pool.join()
1299         self.assertEqual(self.pool.done(), self.copies-1)
1300
1301
1302 @tutil.skip_sleep
1303 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock):
1304     # Test put()s that need two distinct servers to succeed, possibly
1305     # requiring multiple passes through the retry loop.
1306
1307     def setUp(self):
1308         self.api_client = self.mock_keep_services(count=2)
1309         self.keep_client = arvados.KeepClient(api_client=self.api_client)
1310
1311     def test_success_after_exception(self):
1312         with tutil.mock_keep_responses(
1313                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1314                 Exception('mock err'), 200, 200) as req_mock:
1315             self.keep_client.put('foo', num_retries=1, copies=2)
1316         self.assertEqual(3, req_mock.call_count)
1317
1318     def test_success_after_retryable_error(self):
1319         with tutil.mock_keep_responses(
1320                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1321                 500, 200, 200) as req_mock:
1322             self.keep_client.put('foo', num_retries=1, copies=2)
1323         self.assertEqual(3, req_mock.call_count)
1324
1325     def test_fail_after_final_error(self):
1326         # First retry loop gets a 200 (can't achieve replication by
1327         # storing again on that server) and a 400 (can't retry that
1328         # server at all), so we shouldn't try a third request.
1329         with tutil.mock_keep_responses(
1330                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1331                 200, 400, 200) as req_mock:
1332             with self.assertRaises(arvados.errors.KeepWriteError):
1333                 self.keep_client.put('foo', num_retries=1, copies=2)
1334         self.assertEqual(2, req_mock.call_count)
1335
1336 class KeepClientAPIErrorTest(unittest.TestCase):
1337     def test_api_fail(self):
1338         class ApiMock(object):
1339             def __getattr__(self, r):
1340                 if r == "api_token":
1341                     return "abc"
1342                 elif r == "insecure":
1343                     return False
1344                 else:
1345                     raise arvados.errors.KeepReadError()
1346         keep_client = arvados.KeepClient(api_client=ApiMock(),
1347                                              proxy='', local_store='')
1348
1349         # The bug this is testing for is that if an API (not
1350         # keepstore) exception is thrown as part of a get(), the next
1351         # attempt to get that same block will result in a deadlock.
1352         # This is why there are two get()s in a row.  Unfortunately,
1353         # the failure mode for this test is that the test suite
1354         # deadlocks, there isn't a good way to avoid that without
1355         # adding a special case that has no use except for this test.
1356
1357         with self.assertRaises(arvados.errors.KeepReadError):
1358             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1359         with self.assertRaises(arvados.errors.KeepReadError):
1360             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")