16476: Merge branch 'master' into 16476-upgrade-arvados-jobs-to-buster
[arvados.git] / sdk / python / tests / test_keep_client.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 standard_library.install_aliases()
9 from builtins import str
10 from builtins import range
11 from builtins import object
12 import hashlib
13 import mock
14 import os
15 import pycurl
16 import random
17 import re
18 import socket
19 import sys
20 import time
21 import unittest
22 import urllib.parse
23
24 import arvados
25 import arvados.retry
26 import arvados.util
27 from . import arvados_testutil as tutil
28 from . import keepstub
29 from . import run_test_server
30
31 class KeepTestCase(run_test_server.TestCaseWithServers):
32     MAIN_SERVER = {}
33     KEEP_SERVER = {}
34
35     @classmethod
36     def setUpClass(cls):
37         super(KeepTestCase, cls).setUpClass()
38         run_test_server.authorize_with("admin")
39         cls.api_client = arvados.api('v1')
40         cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
41                                              proxy='', local_store='')
42
43     def test_KeepBasicRWTest(self):
44         self.assertEqual(0, self.keep_client.upload_counter.get())
45         foo_locator = self.keep_client.put('foo')
46         self.assertRegex(
47             foo_locator,
48             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
49             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
50
51         # 6 bytes because uploaded 2 copies
52         self.assertEqual(6, self.keep_client.upload_counter.get())
53
54         self.assertEqual(0, self.keep_client.download_counter.get())
55         self.assertEqual(self.keep_client.get(foo_locator),
56                          b'foo',
57                          'wrong content from Keep.get(md5("foo"))')
58         self.assertEqual(3, self.keep_client.download_counter.get())
59
60     def test_KeepBinaryRWTest(self):
61         blob_str = b'\xff\xfe\xf7\x00\x01\x02'
62         blob_locator = self.keep_client.put(blob_str)
63         self.assertRegex(
64             blob_locator,
65             '^7fc7c53b45e53926ba52821140fef396\+6',
66             ('wrong locator from Keep.put(<binarydata>):' + blob_locator))
67         self.assertEqual(self.keep_client.get(blob_locator),
68                          blob_str,
69                          'wrong content from Keep.get(md5(<binarydata>))')
70
71     def test_KeepLongBinaryRWTest(self):
72         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
73         for i in range(0,23):
74             blob_data = blob_data + blob_data
75         blob_locator = self.keep_client.put(blob_data)
76         self.assertRegex(
77             blob_locator,
78             '^84d90fc0d8175dd5dcfab04b999bc956\+67108864',
79             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
80         self.assertEqual(self.keep_client.get(blob_locator),
81                          blob_data,
82                          'wrong content from Keep.get(md5(<binarydata>))')
83
84     @unittest.skip("unreliable test - please fix and close #8752")
85     def test_KeepSingleCopyRWTest(self):
86         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
87         blob_locator = self.keep_client.put(blob_data, copies=1)
88         self.assertRegex(
89             blob_locator,
90             '^c902006bc98a3eb4a3663b65ab4a6fab\+8',
91             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
92         self.assertEqual(self.keep_client.get(blob_locator),
93                          blob_data,
94                          'wrong content from Keep.get(md5(<binarydata>))')
95
96     def test_KeepEmptyCollectionTest(self):
97         blob_locator = self.keep_client.put('', copies=1)
98         self.assertRegex(
99             blob_locator,
100             '^d41d8cd98f00b204e9800998ecf8427e\+0',
101             ('wrong locator from Keep.put(""): ' + blob_locator))
102
103     def test_unicode_must_be_ascii(self):
104         # If unicode type, must only consist of valid ASCII
105         foo_locator = self.keep_client.put(u'foo')
106         self.assertRegex(
107             foo_locator,
108             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
109             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
110
111         if sys.version_info < (3, 0):
112             with self.assertRaises(UnicodeEncodeError):
113                 # Error if it is not ASCII
114                 self.keep_client.put(u'\xe2')
115
116         with self.assertRaises(AttributeError):
117             # Must be bytes or have an encode() method
118             self.keep_client.put({})
119
120     def test_KeepHeadTest(self):
121         locator = self.keep_client.put('test_head')
122         self.assertRegex(
123             locator,
124             '^b9a772c7049325feb7130fff1f8333e9\+9',
125             'wrong md5 hash from Keep.put for "test_head": ' + locator)
126         self.assertEqual(True, self.keep_client.head(locator))
127         self.assertEqual(self.keep_client.get(locator),
128                          b'test_head',
129                          'wrong content from Keep.get for "test_head"')
130
131 class KeepPermissionTestCase(run_test_server.TestCaseWithServers):
132     MAIN_SERVER = {}
133     KEEP_SERVER = {'blob_signing': True}
134
135     def test_KeepBasicRWTest(self):
136         run_test_server.authorize_with('active')
137         keep_client = arvados.KeepClient()
138         foo_locator = keep_client.put('foo')
139         self.assertRegex(
140             foo_locator,
141             r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
142             'invalid locator from Keep.put("foo"): ' + foo_locator)
143         self.assertEqual(keep_client.get(foo_locator),
144                          b'foo',
145                          'wrong content from Keep.get(md5("foo"))')
146
147         # GET with an unsigned locator => NotFound
148         bar_locator = keep_client.put('bar')
149         unsigned_bar_locator = "37b51d194a7513e45b56f6524f2d51f2+3"
150         self.assertRegex(
151             bar_locator,
152             r'^37b51d194a7513e45b56f6524f2d51f2\+3\+A[a-f0-9]+@[a-f0-9]+$',
153             'invalid locator from Keep.put("bar"): ' + bar_locator)
154         self.assertRaises(arvados.errors.NotFoundError,
155                           keep_client.get,
156                           unsigned_bar_locator)
157
158         # GET from a different user => NotFound
159         run_test_server.authorize_with('spectator')
160         self.assertRaises(arvados.errors.NotFoundError,
161                           arvados.Keep.get,
162                           bar_locator)
163
164         # Unauthenticated GET for a signed locator => NotFound
165         # Unauthenticated GET for an unsigned locator => NotFound
166         keep_client.api_token = ''
167         self.assertRaises(arvados.errors.NotFoundError,
168                           keep_client.get,
169                           bar_locator)
170         self.assertRaises(arvados.errors.NotFoundError,
171                           keep_client.get,
172                           unsigned_bar_locator)
173
174
175 class KeepProxyTestCase(run_test_server.TestCaseWithServers):
176     MAIN_SERVER = {}
177     KEEP_SERVER = {}
178     KEEP_PROXY_SERVER = {}
179
180     @classmethod
181     def setUpClass(cls):
182         super(KeepProxyTestCase, cls).setUpClass()
183         run_test_server.authorize_with('active')
184         cls.api_client = arvados.api('v1')
185
186     def tearDown(self):
187         arvados.config.settings().pop('ARVADOS_EXTERNAL_CLIENT', None)
188         super(KeepProxyTestCase, self).tearDown()
189
190     def test_KeepProxyTest1(self):
191         # Will use ARVADOS_KEEP_SERVICES environment variable that
192         # is set by setUpClass().
193         keep_client = arvados.KeepClient(api_client=self.api_client,
194                                          local_store='')
195         baz_locator = keep_client.put('baz')
196         self.assertRegex(
197             baz_locator,
198             '^73feffa4b7f6bb68e44cf984c85f6e88\+3',
199             'wrong md5 hash from Keep.put("baz"): ' + baz_locator)
200         self.assertEqual(keep_client.get(baz_locator),
201                          b'baz',
202                          'wrong content from Keep.get(md5("baz"))')
203         self.assertTrue(keep_client.using_proxy)
204
205     def test_KeepProxyTest2(self):
206         # Don't instantiate the proxy directly, but set the X-External-Client
207         # header.  The API server should direct us to the proxy.
208         arvados.config.settings()['ARVADOS_EXTERNAL_CLIENT'] = 'true'
209         keep_client = arvados.KeepClient(api_client=self.api_client,
210                                          proxy='', local_store='')
211         baz_locator = keep_client.put('baz2')
212         self.assertRegex(
213             baz_locator,
214             '^91f372a266fe2bf2823cb8ec7fda31ce\+4',
215             'wrong md5 hash from Keep.put("baz2"): ' + baz_locator)
216         self.assertEqual(keep_client.get(baz_locator),
217                          b'baz2',
218                          'wrong content from Keep.get(md5("baz2"))')
219         self.assertTrue(keep_client.using_proxy)
220
221     def test_KeepProxyTestMultipleURIs(self):
222         # Test using ARVADOS_KEEP_SERVICES env var overriding any
223         # existing proxy setting and setting multiple proxies
224         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
225         keep_client = arvados.KeepClient(api_client=self.api_client,
226                                          local_store='')
227         uris = [x['_service_root'] for x in keep_client._keep_services]
228         self.assertEqual(uris, ['http://10.0.0.1/',
229                                 'https://foo.example.org:1234/'])
230
231     def test_KeepProxyTestInvalidURI(self):
232         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org'
233         with self.assertRaises(arvados.errors.ArgumentError):
234             keep_client = arvados.KeepClient(api_client=self.api_client,
235                                              local_store='')
236
237
238 class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
239     def get_service_roots(self, api_client):
240         keep_client = arvados.KeepClient(api_client=api_client)
241         services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
242         return [urllib.parse.urlparse(url) for url in sorted(services)]
243
244     def test_ssl_flag_respected_in_roots(self):
245         for ssl_flag in [False, True]:
246             services = self.get_service_roots(self.mock_keep_services(
247                 service_ssl_flag=ssl_flag))
248             self.assertEqual(
249                 ('https' if ssl_flag else 'http'), services[0].scheme)
250
251     def test_correct_ports_with_ipv6_addresses(self):
252         service = self.get_service_roots(self.mock_keep_services(
253             service_type='proxy', service_host='100::1', service_port=10, count=1))[0]
254         self.assertEqual('100::1', service.hostname)
255         self.assertEqual(10, service.port)
256
257     def test_insecure_disables_tls_verify(self):
258         api_client = self.mock_keep_services(count=1)
259         force_timeout = socket.timeout("timed out")
260
261         api_client.insecure = True
262         with tutil.mock_keep_responses(b'foo', 200) as mock:
263             keep_client = arvados.KeepClient(api_client=api_client)
264             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
265             self.assertEqual(
266                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
267                 0)
268
269         api_client.insecure = False
270         with tutil.mock_keep_responses(b'foo', 200) as mock:
271             keep_client = arvados.KeepClient(api_client=api_client)
272             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
273             # getopt()==None here means we didn't change the
274             # default. If we were using real pycurl instead of a mock,
275             # it would return the default value 1.
276             self.assertEqual(
277                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
278                 None)
279
280     def test_refresh_signature(self):
281         blk_digest = '6f5902ac237024bdd0c176cb93063dc4+11'
282         blk_sig = 'da39a3ee5e6b4b0d3255bfef95601890afd80709@53bed294'
283         local_loc = blk_digest+'+A'+blk_sig
284         remote_loc = blk_digest+'+R'+blk_sig
285         api_client = self.mock_keep_services(count=1)
286         headers = {'X-Keep-Locator':local_loc}
287         with tutil.mock_keep_responses('', 200, **headers):
288             # Check that the translated locator gets returned
289             keep_client = arvados.KeepClient(api_client=api_client)
290             self.assertEqual(local_loc, keep_client.refresh_signature(remote_loc))
291             # Check that refresh_signature() uses the correct method and headers
292             keep_client._get_or_head = mock.MagicMock()
293             keep_client.refresh_signature(remote_loc)
294             args, kwargs = keep_client._get_or_head.call_args_list[0]
295             self.assertIn(remote_loc, args)
296             self.assertEqual("HEAD", kwargs['method'])
297             self.assertIn('X-Keep-Signature', kwargs['headers'])
298
299     # test_*_timeout verify that KeepClient instructs pycurl to use
300     # the appropriate connection and read timeouts. They don't care
301     # whether pycurl actually exhibits the expected timeout behavior
302     # -- those tests are in the KeepClientTimeout test class.
303
304     def test_get_timeout(self):
305         api_client = self.mock_keep_services(count=1)
306         force_timeout = socket.timeout("timed out")
307         with tutil.mock_keep_responses(force_timeout, 0) as mock:
308             keep_client = arvados.KeepClient(api_client=api_client)
309             with self.assertRaises(arvados.errors.KeepReadError):
310                 keep_client.get('ffffffffffffffffffffffffffffffff')
311             self.assertEqual(
312                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
313                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
314             self.assertEqual(
315                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
316                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
317             self.assertEqual(
318                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
319                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
320
321     def test_put_timeout(self):
322         api_client = self.mock_keep_services(count=1)
323         force_timeout = socket.timeout("timed out")
324         with tutil.mock_keep_responses(force_timeout, 0) as mock:
325             keep_client = arvados.KeepClient(api_client=api_client)
326             with self.assertRaises(arvados.errors.KeepWriteError):
327                 keep_client.put(b'foo')
328             self.assertEqual(
329                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
330                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
331             self.assertEqual(
332                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
333                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
334             self.assertEqual(
335                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
336                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
337
338     def test_head_timeout(self):
339         api_client = self.mock_keep_services(count=1)
340         force_timeout = socket.timeout("timed out")
341         with tutil.mock_keep_responses(force_timeout, 0) as mock:
342             keep_client = arvados.KeepClient(api_client=api_client)
343             with self.assertRaises(arvados.errors.KeepReadError):
344                 keep_client.head('ffffffffffffffffffffffffffffffff')
345             self.assertEqual(
346                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
347                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
348             self.assertEqual(
349                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
350                 None)
351             self.assertEqual(
352                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
353                 None)
354
355     def test_proxy_get_timeout(self):
356         api_client = self.mock_keep_services(service_type='proxy', count=1)
357         force_timeout = socket.timeout("timed out")
358         with tutil.mock_keep_responses(force_timeout, 0) as mock:
359             keep_client = arvados.KeepClient(api_client=api_client)
360             with self.assertRaises(arvados.errors.KeepReadError):
361                 keep_client.get('ffffffffffffffffffffffffffffffff')
362             self.assertEqual(
363                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
364                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
365             self.assertEqual(
366                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
367                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
368             self.assertEqual(
369                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
370                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
371
372     def test_proxy_head_timeout(self):
373         api_client = self.mock_keep_services(service_type='proxy', count=1)
374         force_timeout = socket.timeout("timed out")
375         with tutil.mock_keep_responses(force_timeout, 0) as mock:
376             keep_client = arvados.KeepClient(api_client=api_client)
377             with self.assertRaises(arvados.errors.KeepReadError):
378                 keep_client.head('ffffffffffffffffffffffffffffffff')
379             self.assertEqual(
380                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
381                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
382             self.assertEqual(
383                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
384                 None)
385             self.assertEqual(
386                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
387                 None)
388
389     def test_proxy_put_timeout(self):
390         api_client = self.mock_keep_services(service_type='proxy', count=1)
391         force_timeout = socket.timeout("timed out")
392         with tutil.mock_keep_responses(force_timeout, 0) as mock:
393             keep_client = arvados.KeepClient(api_client=api_client)
394             with self.assertRaises(arvados.errors.KeepWriteError):
395                 keep_client.put('foo')
396             self.assertEqual(
397                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
398                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
399             self.assertEqual(
400                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
401                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
402             self.assertEqual(
403                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
404                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
405
406     def check_no_services_error(self, verb, exc_class):
407         api_client = mock.MagicMock(name='api_client')
408         api_client.keep_services().accessible().execute.side_effect = (
409             arvados.errors.ApiError)
410         keep_client = arvados.KeepClient(api_client=api_client)
411         with self.assertRaises(exc_class) as err_check:
412             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
413         self.assertEqual(0, len(err_check.exception.request_errors()))
414
415     def test_get_error_with_no_services(self):
416         self.check_no_services_error('get', arvados.errors.KeepReadError)
417
418     def test_head_error_with_no_services(self):
419         self.check_no_services_error('head', arvados.errors.KeepReadError)
420
421     def test_put_error_with_no_services(self):
422         self.check_no_services_error('put', arvados.errors.KeepWriteError)
423
424     def check_errors_from_last_retry(self, verb, exc_class):
425         api_client = self.mock_keep_services(count=2)
426         req_mock = tutil.mock_keep_responses(
427             "retry error reporting test", 500, 500, 500, 500, 500, 500, 502, 502)
428         with req_mock, tutil.skip_sleep, \
429                 self.assertRaises(exc_class) as err_check:
430             keep_client = arvados.KeepClient(api_client=api_client)
431             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
432                                        num_retries=3)
433         self.assertEqual([502, 502], [
434                 getattr(error, 'status_code', None)
435                 for error in err_check.exception.request_errors().values()])
436         self.assertRegex(str(err_check.exception), r'failed to (read|write) .* after 4 attempts')
437
438     def test_get_error_reflects_last_retry(self):
439         self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
440
441     def test_head_error_reflects_last_retry(self):
442         self.check_errors_from_last_retry('head', arvados.errors.KeepReadError)
443
444     def test_put_error_reflects_last_retry(self):
445         self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
446
447     def test_put_error_does_not_include_successful_puts(self):
448         data = 'partial failure test'
449         data_loc = tutil.str_keep_locator(data)
450         api_client = self.mock_keep_services(count=3)
451         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
452                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
453             keep_client = arvados.KeepClient(api_client=api_client)
454             keep_client.put(data)
455         self.assertEqual(2, len(exc_check.exception.request_errors()))
456
457     def test_proxy_put_with_no_writable_services(self):
458         data = 'test with no writable services'
459         data_loc = tutil.str_keep_locator(data)
460         api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
461         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
462                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
463           keep_client = arvados.KeepClient(api_client=api_client)
464           keep_client.put(data)
465         self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
466         self.assertEqual(0, len(exc_check.exception.request_errors()))
467
468     def test_oddball_service_get(self):
469         body = b'oddball service get'
470         api_client = self.mock_keep_services(service_type='fancynewblobstore')
471         with tutil.mock_keep_responses(body, 200):
472             keep_client = arvados.KeepClient(api_client=api_client)
473             actual = keep_client.get(tutil.str_keep_locator(body))
474         self.assertEqual(body, actual)
475
476     def test_oddball_service_put(self):
477         body = b'oddball service put'
478         pdh = tutil.str_keep_locator(body)
479         api_client = self.mock_keep_services(service_type='fancynewblobstore')
480         with tutil.mock_keep_responses(pdh, 200):
481             keep_client = arvados.KeepClient(api_client=api_client)
482             actual = keep_client.put(body, copies=1)
483         self.assertEqual(pdh, actual)
484
485     def test_oddball_service_writer_count(self):
486         body = b'oddball service writer count'
487         pdh = tutil.str_keep_locator(body)
488         api_client = self.mock_keep_services(service_type='fancynewblobstore',
489                                              count=4)
490         headers = {'x-keep-replicas-stored': 3}
491         with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
492                                        **headers) as req_mock:
493             keep_client = arvados.KeepClient(api_client=api_client)
494             actual = keep_client.put(body, copies=2)
495         self.assertEqual(pdh, actual)
496         self.assertEqual(1, req_mock.call_count)
497
498
499 @tutil.skip_sleep
500 class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
501     def setUp(self):
502         self.api_client = self.mock_keep_services(count=2)
503         self.keep_client = arvados.KeepClient(api_client=self.api_client)
504         self.data = b'xyzzy'
505         self.locator = '1271ed5ef305aadabc605b1609e24c52'
506
507     @mock.patch('arvados.KeepClient.KeepService.get')
508     def test_get_request_cache(self, get_mock):
509         with tutil.mock_keep_responses(self.data, 200, 200):
510             self.keep_client.get(self.locator)
511             self.keep_client.get(self.locator)
512         # Request already cached, don't require more than one request
513         get_mock.assert_called_once()
514
515     @mock.patch('arvados.KeepClient.KeepService.get')
516     def test_head_request_cache(self, get_mock):
517         with tutil.mock_keep_responses(self.data, 200, 200):
518             self.keep_client.head(self.locator)
519             self.keep_client.head(self.locator)
520         # Don't cache HEAD requests so that they're not confused with GET reqs
521         self.assertEqual(2, get_mock.call_count)
522
523     @mock.patch('arvados.KeepClient.KeepService.get')
524     def test_head_and_then_get_return_different_responses(self, get_mock):
525         head_resp = None
526         get_resp = None
527         get_mock.side_effect = ['first response', 'second response']
528         with tutil.mock_keep_responses(self.data, 200, 200):
529             head_resp = self.keep_client.head(self.locator)
530             get_resp = self.keep_client.get(self.locator)
531         self.assertEqual('first response', head_resp)
532         # First reponse was not cached because it was from a HEAD request.
533         self.assertNotEqual(head_resp, get_resp)
534
535
536 @tutil.skip_sleep
537 class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock):
538     def setUp(self):
539         self.api_client = self.mock_keep_services(count=2)
540         self.keep_client = arvados.KeepClient(api_client=self.api_client)
541         self.data = b'xyzzy'
542         self.locator = '1271ed5ef305aadabc605b1609e24c52'
543         self.test_id = arvados.util.new_request_id()
544         self.assertRegex(self.test_id, r'^req-[a-z0-9]{20}$')
545         # If we don't set request_id to None explicitly here, it will
546         # return <MagicMock name='api_client_mock.request_id'
547         # id='123456789'>:
548         self.api_client.request_id = None
549
550     def test_default_to_api_client_request_id(self):
551         self.api_client.request_id = self.test_id
552         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
553             self.keep_client.put(self.data)
554         self.assertEqual(2, len(mock.responses))
555         for resp in mock.responses:
556             self.assertProvidedRequestId(resp)
557
558         with tutil.mock_keep_responses(self.data, 200) as mock:
559             self.keep_client.get(self.locator)
560         self.assertProvidedRequestId(mock.responses[0])
561
562         with tutil.mock_keep_responses(b'', 200) as mock:
563             self.keep_client.head(self.locator)
564         self.assertProvidedRequestId(mock.responses[0])
565
566     def test_explicit_request_id(self):
567         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
568             self.keep_client.put(self.data, request_id=self.test_id)
569         self.assertEqual(2, len(mock.responses))
570         for resp in mock.responses:
571             self.assertProvidedRequestId(resp)
572
573         with tutil.mock_keep_responses(self.data, 200) as mock:
574             self.keep_client.get(self.locator, request_id=self.test_id)
575         self.assertProvidedRequestId(mock.responses[0])
576
577         with tutil.mock_keep_responses(b'', 200) as mock:
578             self.keep_client.head(self.locator, request_id=self.test_id)
579         self.assertProvidedRequestId(mock.responses[0])
580
581     def test_automatic_request_id(self):
582         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
583             self.keep_client.put(self.data)
584         self.assertEqual(2, len(mock.responses))
585         for resp in mock.responses:
586             self.assertAutomaticRequestId(resp)
587
588         with tutil.mock_keep_responses(self.data, 200) as mock:
589             self.keep_client.get(self.locator)
590         self.assertAutomaticRequestId(mock.responses[0])
591
592         with tutil.mock_keep_responses(b'', 200) as mock:
593             self.keep_client.head(self.locator)
594         self.assertAutomaticRequestId(mock.responses[0])
595
596     def assertAutomaticRequestId(self, resp):
597         hdr = [x for x in resp.getopt(pycurl.HTTPHEADER)
598                if x.startswith('X-Request-Id: ')][0]
599         self.assertNotEqual(hdr, 'X-Request-Id: '+self.test_id)
600         self.assertRegex(hdr, r'^X-Request-Id: req-[a-z0-9]{20}$')
601
602     def assertProvidedRequestId(self, resp):
603         self.assertIn('X-Request-Id: '+self.test_id,
604                       resp.getopt(pycurl.HTTPHEADER))
605
606
607 @tutil.skip_sleep
608 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
609
610     def setUp(self):
611         # expected_order[i] is the probe order for
612         # hash=md5(sprintf("%064x",i)) where there are 16 services
613         # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
614         # the first probe for the block consisting of 64 "0"
615         # characters is the service whose uuid is
616         # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
617         self.services = 16
618         self.expected_order = [
619             list('3eab2d5fc9681074'),
620             list('097dba52e648f1c3'),
621             list('c5b4e023f8a7d691'),
622             list('9d81c02e76a3bf54'),
623             ]
624         self.blocks = [
625             "{:064x}".format(x).encode()
626             for x in range(len(self.expected_order))]
627         self.hashes = [
628             hashlib.md5(self.blocks[x]).hexdigest()
629             for x in range(len(self.expected_order))]
630         self.api_client = self.mock_keep_services(count=self.services)
631         self.keep_client = arvados.KeepClient(api_client=self.api_client)
632
633     def test_weighted_service_roots_against_reference_set(self):
634         # Confirm weighted_service_roots() returns the correct order
635         for i, hash in enumerate(self.hashes):
636             roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash))
637             got_order = [
638                 re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
639                 for root in roots]
640             self.assertEqual(self.expected_order[i], got_order)
641
642     def test_get_probe_order_against_reference_set(self):
643         self._test_probe_order_against_reference_set(
644             lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
645
646     def test_head_probe_order_against_reference_set(self):
647         self._test_probe_order_against_reference_set(
648             lambda i: self.keep_client.head(self.hashes[i], num_retries=1))
649
650     def test_put_probe_order_against_reference_set(self):
651         # copies=1 prevents the test from being sensitive to races
652         # between writer threads.
653         self._test_probe_order_against_reference_set(
654             lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1))
655
656     def _test_probe_order_against_reference_set(self, op):
657         for i in range(len(self.blocks)):
658             with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \
659                  self.assertRaises(arvados.errors.KeepRequestError):
660                 op(i)
661             got_order = [
662                 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
663                 for resp in mock.responses]
664             self.assertEqual(self.expected_order[i]*2, got_order)
665
666     def test_put_probe_order_multiple_copies(self):
667         for copies in range(2, 4):
668             for i in range(len(self.blocks)):
669                 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \
670                      self.assertRaises(arvados.errors.KeepWriteError):
671                     self.keep_client.put(self.blocks[i], num_retries=2, copies=copies)
672                 got_order = [
673                     re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
674                     for resp in mock.responses]
675                 # With T threads racing to make requests, the position
676                 # of a given server in the sequence of HTTP requests
677                 # (got_order) cannot be more than T-1 positions
678                 # earlier than that server's position in the reference
679                 # probe sequence (expected_order).
680                 #
681                 # Loop invariant: we have accounted for +pos+ expected
682                 # probes, either by seeing them in +got_order+ or by
683                 # putting them in +pending+ in the hope of seeing them
684                 # later. As long as +len(pending)<T+, we haven't
685                 # started a request too early.
686                 pending = []
687                 for pos, expected in enumerate(self.expected_order[i]*3):
688                     got = got_order[pos-len(pending)]
689                     while got in pending:
690                         del pending[pending.index(got)]
691                         got = got_order[pos-len(pending)]
692                     if got != expected:
693                         pending.append(expected)
694                         self.assertLess(
695                             len(pending), copies,
696                             "pending={}, with copies={}, got {}, expected {}".format(
697                                 pending, copies, repr(got_order), repr(self.expected_order[i]*3)))
698
699     def test_probe_waste_adding_one_server(self):
700         hashes = [
701             hashlib.md5("{:064x}".format(x).encode()).hexdigest() for x in range(100)]
702         initial_services = 12
703         self.api_client = self.mock_keep_services(count=initial_services)
704         self.keep_client = arvados.KeepClient(api_client=self.api_client)
705         probes_before = [
706             self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
707         for added_services in range(1, 12):
708             api_client = self.mock_keep_services(count=initial_services+added_services)
709             keep_client = arvados.KeepClient(api_client=api_client)
710             total_penalty = 0
711             for hash_index in range(len(hashes)):
712                 probe_after = keep_client.weighted_service_roots(
713                     arvados.KeepLocator(hashes[hash_index]))
714                 penalty = probe_after.index(probes_before[hash_index][0])
715                 self.assertLessEqual(penalty, added_services)
716                 total_penalty += penalty
717             # Average penalty per block should not exceed
718             # N(added)/N(orig) by more than 20%, and should get closer
719             # to the ideal as we add data points.
720             expect_penalty = (
721                 added_services *
722                 len(hashes) / initial_services)
723             max_penalty = (
724                 expect_penalty *
725                 (120 - added_services)/100)
726             min_penalty = (
727                 expect_penalty * 8/10)
728             self.assertTrue(
729                 min_penalty <= total_penalty <= max_penalty,
730                 "With {}+{} services, {} blocks, penalty {} but expected {}..{}".format(
731                     initial_services,
732                     added_services,
733                     len(hashes),
734                     total_penalty,
735                     min_penalty,
736                     max_penalty))
737
738     def check_64_zeros_error_order(self, verb, exc_class):
739         data = b'0' * 64
740         if verb == 'get':
741             data = tutil.str_keep_locator(data)
742         # Arbitrary port number:
743         aport = random.randint(1024,65535)
744         api_client = self.mock_keep_services(service_port=aport, count=self.services)
745         keep_client = arvados.KeepClient(api_client=api_client)
746         with mock.patch('pycurl.Curl') as curl_mock, \
747              self.assertRaises(exc_class) as err_check:
748             curl_mock.return_value = tutil.FakeCurl.make(code=500, body=b'')
749             getattr(keep_client, verb)(data)
750         urls = [urllib.parse.urlparse(url)
751                 for url in err_check.exception.request_errors()]
752         self.assertEqual([('keep0x' + c, aport) for c in '3eab2d5fc9681074'],
753                          [(url.hostname, url.port) for url in urls])
754
755     def test_get_error_shows_probe_order(self):
756         self.check_64_zeros_error_order('get', arvados.errors.KeepReadError)
757
758     def test_put_error_shows_probe_order(self):
759         self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
760
761
762 class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase):
763     # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
764     # 1s worth of data and then trigger bandwidth errors before running
765     # out of data.
766     DATA = b'x'*2**11
767     BANDWIDTH_LOW_LIM = 1024
768     TIMEOUT_TIME = 1.0
769
770     class assertTakesBetween(unittest.TestCase):
771         def __init__(self, tmin, tmax):
772             self.tmin = tmin
773             self.tmax = tmax
774
775         def __enter__(self):
776             self.t0 = time.time()
777
778         def __exit__(self, *args, **kwargs):
779             # Round times to milliseconds, like CURL. Otherwise, we
780             # fail when CURL reaches a 1s timeout at 0.9998s.
781             delta = round(time.time() - self.t0, 3)
782             self.assertGreaterEqual(delta, self.tmin)
783             self.assertLessEqual(delta, self.tmax)
784
785     class assertTakesGreater(unittest.TestCase):
786         def __init__(self, tmin):
787             self.tmin = tmin
788
789         def __enter__(self):
790             self.t0 = time.time()
791
792         def __exit__(self, *args, **kwargs):
793             delta = round(time.time() - self.t0, 3)
794             self.assertGreaterEqual(delta, self.tmin)
795
796     def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
797         return arvados.KeepClient(
798             api_client=self.api_client,
799             timeout=timeouts)
800
801     def test_timeout_slow_connect(self):
802         # Can't simulate TCP delays with our own socket. Leave our
803         # stub server running uselessly, and try to connect to an
804         # unroutable IP address instead.
805         self.api_client = self.mock_keep_services(
806             count=1,
807             service_host='240.0.0.0',
808         )
809         with self.assertTakesBetween(0.1, 0.5):
810             with self.assertRaises(arvados.errors.KeepWriteError):
811                 self.keepClient().put(self.DATA, copies=1, num_retries=0)
812
813     def test_low_bandwidth_no_delays_success(self):
814         self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
815         kc = self.keepClient()
816         loc = kc.put(self.DATA, copies=1, num_retries=0)
817         self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
818
819     def test_too_low_bandwidth_no_delays_failure(self):
820         # Check that lessening bandwidth corresponds to failing
821         kc = self.keepClient()
822         loc = kc.put(self.DATA, copies=1, num_retries=0)
823         self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
824         with self.assertTakesGreater(self.TIMEOUT_TIME):
825             with self.assertRaises(arvados.errors.KeepReadError):
826                 kc.get(loc, num_retries=0)
827         with self.assertTakesGreater(self.TIMEOUT_TIME):
828             with self.assertRaises(arvados.errors.KeepWriteError):
829                 kc.put(self.DATA, copies=1, num_retries=0)
830
831     def test_low_bandwidth_with_server_response_delay_failure(self):
832         kc = self.keepClient()
833         loc = kc.put(self.DATA, copies=1, num_retries=0)
834         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
835         # Note the actual delay must be 1s longer than the low speed
836         # limit interval in order for curl to detect it reliably.
837         self.server.setdelays(response=self.TIMEOUT_TIME+1)
838         with self.assertTakesGreater(self.TIMEOUT_TIME):
839             with self.assertRaises(arvados.errors.KeepReadError):
840                 kc.get(loc, num_retries=0)
841         with self.assertTakesGreater(self.TIMEOUT_TIME):
842             with self.assertRaises(arvados.errors.KeepWriteError):
843                 kc.put(self.DATA, copies=1, num_retries=0)
844         with self.assertTakesGreater(self.TIMEOUT_TIME):
845             kc.head(loc, num_retries=0)
846
847     def test_low_bandwidth_with_server_mid_delay_failure(self):
848         kc = self.keepClient()
849         loc = kc.put(self.DATA, copies=1, num_retries=0)
850         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
851         # Note the actual delay must be 1s longer than the low speed
852         # limit interval in order for curl to detect it reliably.
853         self.server.setdelays(mid_write=self.TIMEOUT_TIME+1, mid_read=self.TIMEOUT_TIME+1)
854         with self.assertTakesGreater(self.TIMEOUT_TIME):
855             with self.assertRaises(arvados.errors.KeepReadError) as e:
856                 kc.get(loc, num_retries=0)
857         with self.assertTakesGreater(self.TIMEOUT_TIME):
858             with self.assertRaises(arvados.errors.KeepWriteError):
859                 kc.put(self.DATA, copies=1, num_retries=0)
860
861     def test_timeout_slow_request(self):
862         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
863         self.server.setdelays(request=.2)
864         self._test_connect_timeout_under_200ms(loc)
865         self.server.setdelays(request=2)
866         self._test_response_timeout_under_2s(loc)
867
868     def test_timeout_slow_response(self):
869         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
870         self.server.setdelays(response=.2)
871         self._test_connect_timeout_under_200ms(loc)
872         self.server.setdelays(response=2)
873         self._test_response_timeout_under_2s(loc)
874
875     def test_timeout_slow_response_body(self):
876         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
877         self.server.setdelays(response_body=.2)
878         self._test_connect_timeout_under_200ms(loc)
879         self.server.setdelays(response_body=2)
880         self._test_response_timeout_under_2s(loc)
881
882     def _test_connect_timeout_under_200ms(self, loc):
883         # Allow 100ms to connect, then 1s for response. Everything
884         # should work, and everything should take at least 200ms to
885         # return.
886         kc = self.keepClient(timeouts=(.1, 1))
887         with self.assertTakesBetween(.2, .3):
888             kc.put(self.DATA, copies=1, num_retries=0)
889         with self.assertTakesBetween(.2, .3):
890             self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
891
892     def _test_response_timeout_under_2s(self, loc):
893         # Allow 10s to connect, then 1s for response. Nothing should
894         # work, and everything should take at least 1s to return.
895         kc = self.keepClient(timeouts=(10, 1))
896         with self.assertTakesBetween(1, 9):
897             with self.assertRaises(arvados.errors.KeepReadError):
898                 kc.get(loc, num_retries=0)
899         with self.assertTakesBetween(1, 9):
900             with self.assertRaises(arvados.errors.KeepWriteError):
901                 kc.put(self.DATA, copies=1, num_retries=0)
902
903
904 class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
905     def mock_disks_and_gateways(self, disks=3, gateways=1):
906         self.gateways = [{
907                 'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
908                 'owner_uuid': 'zzzzz-tpzed-000000000000000',
909                 'service_host': 'gatewayhost{}'.format(i),
910                 'service_port': 12345,
911                 'service_ssl_flag': True,
912                 'service_type': 'gateway:test',
913         } for i in range(gateways)]
914         self.gateway_roots = [
915             "https://{service_host}:{service_port}/".format(**gw)
916             for gw in self.gateways]
917         self.api_client = self.mock_keep_services(
918             count=disks, additional_services=self.gateways)
919         self.keepClient = arvados.KeepClient(api_client=self.api_client)
920
921     @mock.patch('pycurl.Curl')
922     def test_get_with_gateway_hint_first(self, MockCurl):
923         MockCurl.return_value = tutil.FakeCurl.make(
924             code=200, body='foo', headers={'Content-Length': 3})
925         self.mock_disks_and_gateways()
926         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@' + self.gateways[0]['uuid']
927         self.assertEqual(b'foo', self.keepClient.get(locator))
928         self.assertEqual(self.gateway_roots[0]+locator,
929                          MockCurl.return_value.getopt(pycurl.URL).decode())
930         self.assertEqual(True, self.keepClient.head(locator))
931
932     @mock.patch('pycurl.Curl')
933     def test_get_with_gateway_hints_in_order(self, MockCurl):
934         gateways = 4
935         disks = 3
936         mocks = [
937             tutil.FakeCurl.make(code=404, body='')
938             for _ in range(gateways+disks)
939         ]
940         MockCurl.side_effect = tutil.queue_with(mocks)
941         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
942         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
943                            ['K@'+gw['uuid'] for gw in self.gateways])
944         with self.assertRaises(arvados.errors.NotFoundError):
945             self.keepClient.get(locator)
946         # Gateways are tried first, in the order given.
947         for i, root in enumerate(self.gateway_roots):
948             self.assertEqual(root+locator,
949                              mocks[i].getopt(pycurl.URL).decode())
950         # Disk services are tried next.
951         for i in range(gateways, gateways+disks):
952             self.assertRegex(
953                 mocks[i].getopt(pycurl.URL).decode(),
954                 r'keep0x')
955
956     @mock.patch('pycurl.Curl')
957     def test_head_with_gateway_hints_in_order(self, MockCurl):
958         gateways = 4
959         disks = 3
960         mocks = [
961             tutil.FakeCurl.make(code=404, body=b'')
962             for _ in range(gateways+disks)
963         ]
964         MockCurl.side_effect = tutil.queue_with(mocks)
965         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
966         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
967                            ['K@'+gw['uuid'] for gw in self.gateways])
968         with self.assertRaises(arvados.errors.NotFoundError):
969             self.keepClient.head(locator)
970         # Gateways are tried first, in the order given.
971         for i, root in enumerate(self.gateway_roots):
972             self.assertEqual(root+locator,
973                              mocks[i].getopt(pycurl.URL).decode())
974         # Disk services are tried next.
975         for i in range(gateways, gateways+disks):
976             self.assertRegex(
977                 mocks[i].getopt(pycurl.URL).decode(),
978                 r'keep0x')
979
980     @mock.patch('pycurl.Curl')
981     def test_get_with_remote_proxy_hint(self, MockCurl):
982         MockCurl.return_value = tutil.FakeCurl.make(
983             code=200, body=b'foo', headers={'Content-Length': 3})
984         self.mock_disks_and_gateways()
985         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
986         self.assertEqual(b'foo', self.keepClient.get(locator))
987         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
988                          MockCurl.return_value.getopt(pycurl.URL).decode())
989
990     @mock.patch('pycurl.Curl')
991     def test_head_with_remote_proxy_hint(self, MockCurl):
992         MockCurl.return_value = tutil.FakeCurl.make(
993             code=200, body=b'foo', headers={'Content-Length': 3})
994         self.mock_disks_and_gateways()
995         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
996         self.assertEqual(True, self.keepClient.head(locator))
997         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
998                          MockCurl.return_value.getopt(pycurl.URL).decode())
999
1000
1001 class KeepClientRetryTestMixin(object):
1002     # Testing with a local Keep store won't exercise the retry behavior.
1003     # Instead, our strategy is:
1004     # * Create a client with one proxy specified (pointed at a black
1005     #   hole), so there's no need to instantiate an API client, and
1006     #   all HTTP requests come from one place.
1007     # * Mock httplib's request method to provide simulated responses.
1008     # This lets us test the retry logic extensively without relying on any
1009     # supporting servers, and prevents side effects in case something hiccups.
1010     # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
1011     # run_method().
1012     #
1013     # Test classes must define TEST_PATCHER to a method that mocks
1014     # out appropriate methods in the client.
1015
1016     PROXY_ADDR = 'http://[%s]:65535/' % (tutil.TEST_HOST,)
1017     TEST_DATA = b'testdata'
1018     TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
1019
1020     def setUp(self):
1021         self.client_kwargs = {'proxy': self.PROXY_ADDR, 'local_store': ''}
1022
1023     def new_client(self, **caller_kwargs):
1024         kwargs = self.client_kwargs.copy()
1025         kwargs.update(caller_kwargs)
1026         return arvados.KeepClient(**kwargs)
1027
1028     def run_method(self, *args, **kwargs):
1029         raise NotImplementedError("test subclasses must define run_method")
1030
1031     def check_success(self, expected=None, *args, **kwargs):
1032         if expected is None:
1033             expected = self.DEFAULT_EXPECT
1034         self.assertEqual(expected, self.run_method(*args, **kwargs))
1035
1036     def check_exception(self, error_class=None, *args, **kwargs):
1037         if error_class is None:
1038             error_class = self.DEFAULT_EXCEPTION
1039         with self.assertRaises(error_class) as err:
1040             self.run_method(*args, **kwargs)
1041         return err
1042
1043     def test_immediate_success(self):
1044         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 200):
1045             self.check_success()
1046
1047     def test_retry_then_success(self):
1048         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1049             self.check_success(num_retries=3)
1050
1051     def test_exception_then_success(self):
1052         with self.TEST_PATCHER(self.DEFAULT_EXPECT, Exception('mock err'), 200):
1053             self.check_success(num_retries=3)
1054
1055     def test_no_default_retry(self):
1056         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1057             self.check_exception()
1058
1059     def test_no_retry_after_permanent_error(self):
1060         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 403, 200):
1061             self.check_exception(num_retries=3)
1062
1063     def test_error_after_retries_exhausted(self):
1064         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 500, 200):
1065             err = self.check_exception(num_retries=1)
1066         self.assertRegex(str(err.exception), r'failed to .* after 2 attempts')
1067
1068     def test_num_retries_instance_fallback(self):
1069         self.client_kwargs['num_retries'] = 3
1070         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1071             self.check_success()
1072
1073
1074 @tutil.skip_sleep
1075 class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1076     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
1077     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1078     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1079     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1080
1081     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1082                    *args, **kwargs):
1083         return self.new_client().get(locator, *args, **kwargs)
1084
1085     def test_specific_exception_when_not_found(self):
1086         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1087             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1088
1089     def test_general_exception_with_mixed_errors(self):
1090         # get should raise a NotFoundError if no server returns the block,
1091         # and a high threshold of servers report that it's not found.
1092         # This test rigs up 50/50 disagreement between two servers, and
1093         # checks that it does not become a NotFoundError.
1094         client = self.new_client()
1095         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1096             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1097                 client.get(self.HINTED_LOCATOR)
1098             self.assertNotIsInstance(
1099                 exc_check.exception, arvados.errors.NotFoundError,
1100                 "mixed errors raised NotFoundError")
1101
1102     def test_hint_server_can_succeed_without_retries(self):
1103         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1104             self.check_success(locator=self.HINTED_LOCATOR)
1105
1106     def test_try_next_server_after_timeout(self):
1107         with tutil.mock_keep_responses(
1108                 (socket.timeout("timed out"), 200),
1109                 (self.DEFAULT_EXPECT, 200)):
1110             self.check_success(locator=self.HINTED_LOCATOR)
1111
1112     def test_retry_data_with_wrong_checksum(self):
1113         with tutil.mock_keep_responses(
1114                 ('baddata', 200),
1115                 (self.DEFAULT_EXPECT, 200)):
1116             self.check_success(locator=self.HINTED_LOCATOR)
1117
1118 @tutil.skip_sleep
1119 class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1120     DEFAULT_EXPECT = True
1121     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1122     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1123     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1124
1125     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1126                    *args, **kwargs):
1127         return self.new_client().head(locator, *args, **kwargs)
1128
1129     def test_specific_exception_when_not_found(self):
1130         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1131             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1132
1133     def test_general_exception_with_mixed_errors(self):
1134         # head should raise a NotFoundError if no server returns the block,
1135         # and a high threshold of servers report that it's not found.
1136         # This test rigs up 50/50 disagreement between two servers, and
1137         # checks that it does not become a NotFoundError.
1138         client = self.new_client()
1139         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1140             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1141                 client.head(self.HINTED_LOCATOR)
1142             self.assertNotIsInstance(
1143                 exc_check.exception, arvados.errors.NotFoundError,
1144                 "mixed errors raised NotFoundError")
1145
1146     def test_hint_server_can_succeed_without_retries(self):
1147         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1148             self.check_success(locator=self.HINTED_LOCATOR)
1149
1150     def test_try_next_server_after_timeout(self):
1151         with tutil.mock_keep_responses(
1152                 (socket.timeout("timed out"), 200),
1153                 (self.DEFAULT_EXPECT, 200)):
1154             self.check_success(locator=self.HINTED_LOCATOR)
1155
1156 @tutil.skip_sleep
1157 class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1158     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
1159     DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
1160     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1161
1162     def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
1163                    copies=1, *args, **kwargs):
1164         return self.new_client().put(data, copies, *args, **kwargs)
1165
1166     def test_do_not_send_multiple_copies_to_same_server(self):
1167         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200):
1168             self.check_exception(copies=2, num_retries=3)
1169
1170
1171 class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
1172
1173     class FakeKeepService(object):
1174         def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
1175             self.delay = delay
1176             self.will_succeed = will_succeed
1177             self.will_raise = will_raise
1178             self._result = {}
1179             self._result['headers'] = {}
1180             self._result['headers']['x-keep-replicas-stored'] = str(replicas)
1181             self._result['body'] = 'foobar'
1182
1183         def put(self, data_hash, data, timeout):
1184             time.sleep(self.delay)
1185             if self.will_raise is not None:
1186                 raise self.will_raise
1187             return self.will_succeed
1188
1189         def last_result(self):
1190             if self.will_succeed:
1191                 return self._result
1192
1193         def finished(self):
1194             return False
1195
1196     def setUp(self):
1197         self.copies = 3
1198         self.pool = arvados.KeepClient.KeepWriterThreadPool(
1199             data = 'foo',
1200             data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1201             max_service_replicas = self.copies,
1202             copies = self.copies
1203         )
1204
1205     def test_only_write_enough_on_success(self):
1206         for i in range(10):
1207             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1208             self.pool.add_task(ks, None)
1209         self.pool.join()
1210         self.assertEqual(self.pool.done(), self.copies)
1211
1212     def test_only_write_enough_on_partial_success(self):
1213         for i in range(5):
1214             ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
1215             self.pool.add_task(ks, None)
1216             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1217             self.pool.add_task(ks, None)
1218         self.pool.join()
1219         self.assertEqual(self.pool.done(), self.copies)
1220
1221     def test_only_write_enough_when_some_crash(self):
1222         for i in range(5):
1223             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1224             self.pool.add_task(ks, None)
1225             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1226             self.pool.add_task(ks, None)
1227         self.pool.join()
1228         self.assertEqual(self.pool.done(), self.copies)
1229
1230     def test_fail_when_too_many_crash(self):
1231         for i in range(self.copies+1):
1232             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1233             self.pool.add_task(ks, None)
1234         for i in range(self.copies-1):
1235             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1236             self.pool.add_task(ks, None)
1237         self.pool.join()
1238         self.assertEqual(self.pool.done(), self.copies-1)
1239
1240
1241 @tutil.skip_sleep
1242 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock):
1243     # Test put()s that need two distinct servers to succeed, possibly
1244     # requiring multiple passes through the retry loop.
1245
1246     def setUp(self):
1247         self.api_client = self.mock_keep_services(count=2)
1248         self.keep_client = arvados.KeepClient(api_client=self.api_client)
1249
1250     def test_success_after_exception(self):
1251         with tutil.mock_keep_responses(
1252                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1253                 Exception('mock err'), 200, 200) as req_mock:
1254             self.keep_client.put('foo', num_retries=1, copies=2)
1255         self.assertEqual(3, req_mock.call_count)
1256
1257     def test_success_after_retryable_error(self):
1258         with tutil.mock_keep_responses(
1259                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1260                 500, 200, 200) as req_mock:
1261             self.keep_client.put('foo', num_retries=1, copies=2)
1262         self.assertEqual(3, req_mock.call_count)
1263
1264     def test_fail_after_final_error(self):
1265         # First retry loop gets a 200 (can't achieve replication by
1266         # storing again on that server) and a 400 (can't retry that
1267         # server at all), so we shouldn't try a third request.
1268         with tutil.mock_keep_responses(
1269                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1270                 200, 400, 200) as req_mock:
1271             with self.assertRaises(arvados.errors.KeepWriteError):
1272                 self.keep_client.put('foo', num_retries=1, copies=2)
1273         self.assertEqual(2, req_mock.call_count)
1274
1275 class KeepClientAPIErrorTest(unittest.TestCase):
1276     def test_api_fail(self):
1277         class ApiMock(object):
1278             def __getattr__(self, r):
1279                 if r == "api_token":
1280                     return "abc"
1281                 elif r == "insecure":
1282                     return False
1283                 else:
1284                     raise arvados.errors.KeepReadError()
1285         keep_client = arvados.KeepClient(api_client=ApiMock(),
1286                                              proxy='', local_store='')
1287
1288         # The bug this is testing for is that if an API (not
1289         # keepstore) exception is thrown as part of a get(), the next
1290         # attempt to get that same block will result in a deadlock.
1291         # This is why there are two get()s in a row.  Unfortunately,
1292         # the failure mode for this test is that the test suite
1293         # deadlocks, there isn't a good way to avoid that without
1294         # adding a special case that has no use except for this test.
1295
1296         with self.assertRaises(arvados.errors.KeepReadError):
1297             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1298         with self.assertRaises(arvados.errors.KeepReadError):
1299             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")