13106: Add test confirming deadlock
[arvados.git] / sdk / python / tests / test_keep_client.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 standard_library.install_aliases()
9 from builtins import str
10 from builtins import range
11 from builtins import object
12 import hashlib
13 import mock
14 import os
15 import pycurl
16 import random
17 import re
18 import socket
19 import sys
20 import time
21 import unittest
22 import urllib.parse
23
24 import arvados
25 import arvados.retry
26 import arvados.util
27 from . import arvados_testutil as tutil
28 from . import keepstub
29 from . import run_test_server
30
31 class KeepTestCase(run_test_server.TestCaseWithServers):
32     MAIN_SERVER = {}
33     KEEP_SERVER = {}
34
35     @classmethod
36     def setUpClass(cls):
37         super(KeepTestCase, cls).setUpClass()
38         run_test_server.authorize_with("admin")
39         cls.api_client = arvados.api('v1')
40         cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
41                                              proxy='', local_store='')
42
43     def test_KeepBasicRWTest(self):
44         self.assertEqual(0, self.keep_client.upload_counter.get())
45         foo_locator = self.keep_client.put('foo')
46         self.assertRegex(
47             foo_locator,
48             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
49             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
50
51         # 6 bytes because uploaded 2 copies
52         self.assertEqual(6, self.keep_client.upload_counter.get())
53
54         self.assertEqual(0, self.keep_client.download_counter.get())
55         self.assertEqual(self.keep_client.get(foo_locator),
56                          b'foo',
57                          'wrong content from Keep.get(md5("foo"))')
58         self.assertEqual(3, self.keep_client.download_counter.get())
59
60     def test_KeepBinaryRWTest(self):
61         blob_str = b'\xff\xfe\xf7\x00\x01\x02'
62         blob_locator = self.keep_client.put(blob_str)
63         self.assertRegex(
64             blob_locator,
65             '^7fc7c53b45e53926ba52821140fef396\+6',
66             ('wrong locator from Keep.put(<binarydata>):' + blob_locator))
67         self.assertEqual(self.keep_client.get(blob_locator),
68                          blob_str,
69                          'wrong content from Keep.get(md5(<binarydata>))')
70
71     def test_KeepLongBinaryRWTest(self):
72         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
73         for i in range(0,23):
74             blob_data = blob_data + blob_data
75         blob_locator = self.keep_client.put(blob_data)
76         self.assertRegex(
77             blob_locator,
78             '^84d90fc0d8175dd5dcfab04b999bc956\+67108864',
79             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
80         self.assertEqual(self.keep_client.get(blob_locator),
81                          blob_data,
82                          'wrong content from Keep.get(md5(<binarydata>))')
83
84     @unittest.skip("unreliable test - please fix and close #8752")
85     def test_KeepSingleCopyRWTest(self):
86         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
87         blob_locator = self.keep_client.put(blob_data, copies=1)
88         self.assertRegex(
89             blob_locator,
90             '^c902006bc98a3eb4a3663b65ab4a6fab\+8',
91             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
92         self.assertEqual(self.keep_client.get(blob_locator),
93                          blob_data,
94                          'wrong content from Keep.get(md5(<binarydata>))')
95
96     def test_KeepEmptyCollectionTest(self):
97         blob_locator = self.keep_client.put('', copies=1)
98         self.assertRegex(
99             blob_locator,
100             '^d41d8cd98f00b204e9800998ecf8427e\+0',
101             ('wrong locator from Keep.put(""): ' + blob_locator))
102
103     def test_unicode_must_be_ascii(self):
104         # If unicode type, must only consist of valid ASCII
105         foo_locator = self.keep_client.put(u'foo')
106         self.assertRegex(
107             foo_locator,
108             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
109             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
110
111         if sys.version_info < (3, 0):
112             with self.assertRaises(UnicodeEncodeError):
113                 # Error if it is not ASCII
114                 self.keep_client.put(u'\xe2')
115
116         with self.assertRaises(AttributeError):
117             # Must be bytes or have an encode() method
118             self.keep_client.put({})
119
120     def test_KeepHeadTest(self):
121         locator = self.keep_client.put('test_head')
122         self.assertRegex(
123             locator,
124             '^b9a772c7049325feb7130fff1f8333e9\+9',
125             'wrong md5 hash from Keep.put for "test_head": ' + locator)
126         self.assertEqual(True, self.keep_client.head(locator))
127         self.assertEqual(self.keep_client.get(locator),
128                          b'test_head',
129                          'wrong content from Keep.get for "test_head"')
130
131 class KeepPermissionTestCase(run_test_server.TestCaseWithServers):
132     MAIN_SERVER = {}
133     KEEP_SERVER = {'blob_signing_key': 'abcdefghijk0123456789',
134                    'enforce_permissions': True}
135
136     def test_KeepBasicRWTest(self):
137         run_test_server.authorize_with('active')
138         keep_client = arvados.KeepClient()
139         foo_locator = keep_client.put('foo')
140         self.assertRegex(
141             foo_locator,
142             r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
143             'invalid locator from Keep.put("foo"): ' + foo_locator)
144         self.assertEqual(keep_client.get(foo_locator),
145                          b'foo',
146                          'wrong content from Keep.get(md5("foo"))')
147
148         # GET with an unsigned locator => NotFound
149         bar_locator = keep_client.put('bar')
150         unsigned_bar_locator = "37b51d194a7513e45b56f6524f2d51f2+3"
151         self.assertRegex(
152             bar_locator,
153             r'^37b51d194a7513e45b56f6524f2d51f2\+3\+A[a-f0-9]+@[a-f0-9]+$',
154             'invalid locator from Keep.put("bar"): ' + bar_locator)
155         self.assertRaises(arvados.errors.NotFoundError,
156                           keep_client.get,
157                           unsigned_bar_locator)
158
159         # GET from a different user => NotFound
160         run_test_server.authorize_with('spectator')
161         self.assertRaises(arvados.errors.NotFoundError,
162                           arvados.Keep.get,
163                           bar_locator)
164
165         # Unauthenticated GET for a signed locator => NotFound
166         # Unauthenticated GET for an unsigned locator => NotFound
167         keep_client.api_token = ''
168         self.assertRaises(arvados.errors.NotFoundError,
169                           keep_client.get,
170                           bar_locator)
171         self.assertRaises(arvados.errors.NotFoundError,
172                           keep_client.get,
173                           unsigned_bar_locator)
174
175
176 # KeepOptionalPermission: starts Keep with --permission-key-file
177 # but not --enforce-permissions (i.e. generate signatures on PUT
178 # requests, but do not require them for GET requests)
179 #
180 # All of these requests should succeed when permissions are optional:
181 # * authenticated request, signed locator
182 # * authenticated request, unsigned locator
183 # * unauthenticated request, signed locator
184 # * unauthenticated request, unsigned locator
185 class KeepOptionalPermission(run_test_server.TestCaseWithServers):
186     MAIN_SERVER = {}
187     KEEP_SERVER = {'blob_signing_key': 'abcdefghijk0123456789',
188                    'enforce_permissions': False}
189
190     @classmethod
191     def setUpClass(cls):
192         super(KeepOptionalPermission, cls).setUpClass()
193         run_test_server.authorize_with("admin")
194         cls.api_client = arvados.api('v1')
195
196     def setUp(self):
197         super(KeepOptionalPermission, self).setUp()
198         self.keep_client = arvados.KeepClient(api_client=self.api_client,
199                                               proxy='', local_store='')
200
201     def _put_foo_and_check(self):
202         signed_locator = self.keep_client.put('foo')
203         self.assertRegex(
204             signed_locator,
205             r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
206             'invalid locator from Keep.put("foo"): ' + signed_locator)
207         return signed_locator
208
209     def test_KeepAuthenticatedSignedTest(self):
210         signed_locator = self._put_foo_and_check()
211         self.assertEqual(self.keep_client.get(signed_locator),
212                          b'foo',
213                          'wrong content from Keep.get(md5("foo"))')
214
215     def test_KeepAuthenticatedUnsignedTest(self):
216         signed_locator = self._put_foo_and_check()
217         self.assertEqual(self.keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8"),
218                          b'foo',
219                          'wrong content from Keep.get(md5("foo"))')
220
221     def test_KeepUnauthenticatedSignedTest(self):
222         # Check that signed GET requests work even when permissions
223         # enforcement is off.
224         signed_locator = self._put_foo_and_check()
225         self.keep_client.api_token = ''
226         self.assertEqual(self.keep_client.get(signed_locator),
227                          b'foo',
228                          'wrong content from Keep.get(md5("foo"))')
229
230     def test_KeepUnauthenticatedUnsignedTest(self):
231         # Since --enforce-permissions is not in effect, GET requests
232         # need not be authenticated.
233         signed_locator = self._put_foo_and_check()
234         self.keep_client.api_token = ''
235         self.assertEqual(self.keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8"),
236                          b'foo',
237                          'wrong content from Keep.get(md5("foo"))')
238
239
240 class KeepProxyTestCase(run_test_server.TestCaseWithServers):
241     MAIN_SERVER = {}
242     KEEP_SERVER = {}
243     KEEP_PROXY_SERVER = {}
244
245     @classmethod
246     def setUpClass(cls):
247         super(KeepProxyTestCase, cls).setUpClass()
248         run_test_server.authorize_with('active')
249         cls.api_client = arvados.api('v1')
250
251     def tearDown(self):
252         arvados.config.settings().pop('ARVADOS_EXTERNAL_CLIENT', None)
253         super(KeepProxyTestCase, self).tearDown()
254
255     def test_KeepProxyTest1(self):
256         # Will use ARVADOS_KEEP_SERVICES environment variable that
257         # is set by setUpClass().
258         keep_client = arvados.KeepClient(api_client=self.api_client,
259                                          local_store='')
260         baz_locator = keep_client.put('baz')
261         self.assertRegex(
262             baz_locator,
263             '^73feffa4b7f6bb68e44cf984c85f6e88\+3',
264             'wrong md5 hash from Keep.put("baz"): ' + baz_locator)
265         self.assertEqual(keep_client.get(baz_locator),
266                          b'baz',
267                          'wrong content from Keep.get(md5("baz"))')
268         self.assertTrue(keep_client.using_proxy)
269
270     def test_KeepProxyTest2(self):
271         # Don't instantiate the proxy directly, but set the X-External-Client
272         # header.  The API server should direct us to the proxy.
273         arvados.config.settings()['ARVADOS_EXTERNAL_CLIENT'] = 'true'
274         keep_client = arvados.KeepClient(api_client=self.api_client,
275                                          proxy='', local_store='')
276         baz_locator = keep_client.put('baz2')
277         self.assertRegex(
278             baz_locator,
279             '^91f372a266fe2bf2823cb8ec7fda31ce\+4',
280             'wrong md5 hash from Keep.put("baz2"): ' + baz_locator)
281         self.assertEqual(keep_client.get(baz_locator),
282                          b'baz2',
283                          'wrong content from Keep.get(md5("baz2"))')
284         self.assertTrue(keep_client.using_proxy)
285
286     def test_KeepProxyTestMultipleURIs(self):
287         # Test using ARVADOS_KEEP_SERVICES env var overriding any
288         # existing proxy setting and setting multiple proxies
289         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
290         keep_client = arvados.KeepClient(api_client=self.api_client,
291                                          local_store='')
292         uris = [x['_service_root'] for x in keep_client._keep_services]
293         self.assertEqual(uris, ['http://10.0.0.1/',
294                                 'https://foo.example.org:1234/'])
295
296     def test_KeepProxyTestInvalidURI(self):
297         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org'
298         with self.assertRaises(arvados.errors.ArgumentError):
299             keep_client = arvados.KeepClient(api_client=self.api_client,
300                                              local_store='')
301
302
303 class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
304     def get_service_roots(self, api_client):
305         keep_client = arvados.KeepClient(api_client=api_client)
306         services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
307         return [urllib.parse.urlparse(url) for url in sorted(services)]
308
309     def test_ssl_flag_respected_in_roots(self):
310         for ssl_flag in [False, True]:
311             services = self.get_service_roots(self.mock_keep_services(
312                 service_ssl_flag=ssl_flag))
313             self.assertEqual(
314                 ('https' if ssl_flag else 'http'), services[0].scheme)
315
316     def test_correct_ports_with_ipv6_addresses(self):
317         service = self.get_service_roots(self.mock_keep_services(
318             service_type='proxy', service_host='100::1', service_port=10, count=1))[0]
319         self.assertEqual('100::1', service.hostname)
320         self.assertEqual(10, service.port)
321
322     # test_*_timeout verify that KeepClient instructs pycurl to use
323     # the appropriate connection and read timeouts. They don't care
324     # whether pycurl actually exhibits the expected timeout behavior
325     # -- those tests are in the KeepClientTimeout test class.
326
327     def test_get_timeout(self):
328         api_client = self.mock_keep_services(count=1)
329         force_timeout = socket.timeout("timed out")
330         with tutil.mock_keep_responses(force_timeout, 0) as mock:
331             keep_client = arvados.KeepClient(api_client=api_client)
332             with self.assertRaises(arvados.errors.KeepReadError):
333                 keep_client.get('ffffffffffffffffffffffffffffffff')
334             self.assertEqual(
335                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
336                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
337             self.assertEqual(
338                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
339                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
340             self.assertEqual(
341                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
342                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
343
344     def test_put_timeout(self):
345         api_client = self.mock_keep_services(count=1)
346         force_timeout = socket.timeout("timed out")
347         with tutil.mock_keep_responses(force_timeout, 0) as mock:
348             keep_client = arvados.KeepClient(api_client=api_client)
349             with self.assertRaises(arvados.errors.KeepWriteError):
350                 keep_client.put(b'foo')
351             self.assertEqual(
352                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
353                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
354             self.assertEqual(
355                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
356                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
357             self.assertEqual(
358                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
359                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
360
361     def test_head_timeout(self):
362         api_client = self.mock_keep_services(count=1)
363         force_timeout = socket.timeout("timed out")
364         with tutil.mock_keep_responses(force_timeout, 0) as mock:
365             keep_client = arvados.KeepClient(api_client=api_client)
366             with self.assertRaises(arvados.errors.KeepReadError):
367                 keep_client.head('ffffffffffffffffffffffffffffffff')
368             self.assertEqual(
369                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
370                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
371             self.assertEqual(
372                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
373                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
374             self.assertEqual(
375                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
376                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
377
378     def test_proxy_get_timeout(self):
379         api_client = self.mock_keep_services(service_type='proxy', count=1)
380         force_timeout = socket.timeout("timed out")
381         with tutil.mock_keep_responses(force_timeout, 0) as mock:
382             keep_client = arvados.KeepClient(api_client=api_client)
383             with self.assertRaises(arvados.errors.KeepReadError):
384                 keep_client.get('ffffffffffffffffffffffffffffffff')
385             self.assertEqual(
386                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
387                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
388             self.assertEqual(
389                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
390                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
391             self.assertEqual(
392                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
393                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
394
395     def test_proxy_head_timeout(self):
396         api_client = self.mock_keep_services(service_type='proxy', count=1)
397         force_timeout = socket.timeout("timed out")
398         with tutil.mock_keep_responses(force_timeout, 0) as mock:
399             keep_client = arvados.KeepClient(api_client=api_client)
400             with self.assertRaises(arvados.errors.KeepReadError):
401                 keep_client.head('ffffffffffffffffffffffffffffffff')
402             self.assertEqual(
403                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
404                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
405             self.assertEqual(
406                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
407                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
408             self.assertEqual(
409                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
410                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
411
412     def test_proxy_put_timeout(self):
413         api_client = self.mock_keep_services(service_type='proxy', count=1)
414         force_timeout = socket.timeout("timed out")
415         with tutil.mock_keep_responses(force_timeout, 0) as mock:
416             keep_client = arvados.KeepClient(api_client=api_client)
417             with self.assertRaises(arvados.errors.KeepWriteError):
418                 keep_client.put('foo')
419             self.assertEqual(
420                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
421                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
422             self.assertEqual(
423                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
424                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
425             self.assertEqual(
426                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
427                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
428
429     def check_no_services_error(self, verb, exc_class):
430         api_client = mock.MagicMock(name='api_client')
431         api_client.keep_services().accessible().execute.side_effect = (
432             arvados.errors.ApiError)
433         keep_client = arvados.KeepClient(api_client=api_client)
434         with self.assertRaises(exc_class) as err_check:
435             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
436         self.assertEqual(0, len(err_check.exception.request_errors()))
437
438     def test_get_error_with_no_services(self):
439         self.check_no_services_error('get', arvados.errors.KeepReadError)
440
441     def test_head_error_with_no_services(self):
442         self.check_no_services_error('head', arvados.errors.KeepReadError)
443
444     def test_put_error_with_no_services(self):
445         self.check_no_services_error('put', arvados.errors.KeepWriteError)
446
447     def check_errors_from_last_retry(self, verb, exc_class):
448         api_client = self.mock_keep_services(count=2)
449         req_mock = tutil.mock_keep_responses(
450             "retry error reporting test", 500, 500, 403, 403)
451         with req_mock, tutil.skip_sleep, \
452                 self.assertRaises(exc_class) as err_check:
453             keep_client = arvados.KeepClient(api_client=api_client)
454             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
455                                        num_retries=3)
456         self.assertEqual([403, 403], [
457                 getattr(error, 'status_code', None)
458                 for error in err_check.exception.request_errors().values()])
459
460     def test_get_error_reflects_last_retry(self):
461         self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
462
463     def test_head_error_reflects_last_retry(self):
464         self.check_errors_from_last_retry('head', arvados.errors.KeepReadError)
465
466     def test_put_error_reflects_last_retry(self):
467         self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
468
469     def test_put_error_does_not_include_successful_puts(self):
470         data = 'partial failure test'
471         data_loc = tutil.str_keep_locator(data)
472         api_client = self.mock_keep_services(count=3)
473         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
474                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
475             keep_client = arvados.KeepClient(api_client=api_client)
476             keep_client.put(data)
477         self.assertEqual(2, len(exc_check.exception.request_errors()))
478
479     def test_proxy_put_with_no_writable_services(self):
480         data = 'test with no writable services'
481         data_loc = tutil.str_keep_locator(data)
482         api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
483         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
484                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
485           keep_client = arvados.KeepClient(api_client=api_client)
486           keep_client.put(data)
487         self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
488         self.assertEqual(0, len(exc_check.exception.request_errors()))
489
490     def test_oddball_service_get(self):
491         body = b'oddball service get'
492         api_client = self.mock_keep_services(service_type='fancynewblobstore')
493         with tutil.mock_keep_responses(body, 200):
494             keep_client = arvados.KeepClient(api_client=api_client)
495             actual = keep_client.get(tutil.str_keep_locator(body))
496         self.assertEqual(body, actual)
497
498     def test_oddball_service_put(self):
499         body = b'oddball service put'
500         pdh = tutil.str_keep_locator(body)
501         api_client = self.mock_keep_services(service_type='fancynewblobstore')
502         with tutil.mock_keep_responses(pdh, 200):
503             keep_client = arvados.KeepClient(api_client=api_client)
504             actual = keep_client.put(body, copies=1)
505         self.assertEqual(pdh, actual)
506
507     def test_oddball_service_writer_count(self):
508         body = b'oddball service writer count'
509         pdh = tutil.str_keep_locator(body)
510         api_client = self.mock_keep_services(service_type='fancynewblobstore',
511                                              count=4)
512         headers = {'x-keep-replicas-stored': 3}
513         with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
514                                        **headers) as req_mock:
515             keep_client = arvados.KeepClient(api_client=api_client)
516             actual = keep_client.put(body, copies=2)
517         self.assertEqual(pdh, actual)
518         self.assertEqual(1, req_mock.call_count)
519
520
521 @tutil.skip_sleep
522 class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock):
523     def setUp(self):
524         self.api_client = self.mock_keep_services(count=2)
525         self.keep_client = arvados.KeepClient(api_client=self.api_client)
526         self.data = b'xyzzy'
527         self.locator = '1271ed5ef305aadabc605b1609e24c52'
528         self.test_id = arvados.util.new_request_id()
529         self.assertRegex(self.test_id, r'^req-[a-z0-9]{20}$')
530         # If we don't set request_id to None explicitly here, it will
531         # return <MagicMock name='api_client_mock.request_id'
532         # id='123456789'>:
533         self.api_client.request_id = None
534
535     def test_default_to_api_client_request_id(self):
536         self.api_client.request_id = self.test_id
537         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
538             self.keep_client.put(self.data)
539         self.assertEqual(2, len(mock.responses))
540         for resp in mock.responses:
541             self.assertProvidedRequestId(resp)
542
543         with tutil.mock_keep_responses(self.data, 200) as mock:
544             self.keep_client.get(self.locator)
545         self.assertProvidedRequestId(mock.responses[0])
546
547         with tutil.mock_keep_responses(b'', 200) as mock:
548             self.keep_client.head(self.locator)
549         self.assertProvidedRequestId(mock.responses[0])
550
551     def test_explicit_request_id(self):
552         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
553             self.keep_client.put(self.data, request_id=self.test_id)
554         self.assertEqual(2, len(mock.responses))
555         for resp in mock.responses:
556             self.assertProvidedRequestId(resp)
557
558         with tutil.mock_keep_responses(self.data, 200) as mock:
559             self.keep_client.get(self.locator, request_id=self.test_id)
560         self.assertProvidedRequestId(mock.responses[0])
561
562         with tutil.mock_keep_responses(b'', 200) as mock:
563             self.keep_client.head(self.locator, request_id=self.test_id)
564         self.assertProvidedRequestId(mock.responses[0])
565
566     def test_automatic_request_id(self):
567         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
568             self.keep_client.put(self.data)
569         self.assertEqual(2, len(mock.responses))
570         for resp in mock.responses:
571             self.assertAutomaticRequestId(resp)
572
573         with tutil.mock_keep_responses(self.data, 200) as mock:
574             self.keep_client.get(self.locator)
575         self.assertAutomaticRequestId(mock.responses[0])
576
577         with tutil.mock_keep_responses(b'', 200) as mock:
578             self.keep_client.head(self.locator)
579         self.assertAutomaticRequestId(mock.responses[0])
580
581     def assertAutomaticRequestId(self, resp):
582         hdr = [x for x in resp.getopt(pycurl.HTTPHEADER)
583                if x.startswith('X-Request-Id: ')][0]
584         self.assertNotEqual(hdr, 'X-Request-Id: '+self.test_id)
585         self.assertRegex(hdr, r'^X-Request-Id: req-[a-z0-9]{20}$')
586
587     def assertProvidedRequestId(self, resp):
588         self.assertIn('X-Request-Id: '+self.test_id,
589                       resp.getopt(pycurl.HTTPHEADER))
590
591
592 @tutil.skip_sleep
593 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
594
595     def setUp(self):
596         # expected_order[i] is the probe order for
597         # hash=md5(sprintf("%064x",i)) where there are 16 services
598         # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
599         # the first probe for the block consisting of 64 "0"
600         # characters is the service whose uuid is
601         # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
602         self.services = 16
603         self.expected_order = [
604             list('3eab2d5fc9681074'),
605             list('097dba52e648f1c3'),
606             list('c5b4e023f8a7d691'),
607             list('9d81c02e76a3bf54'),
608             ]
609         self.blocks = [
610             "{:064x}".format(x).encode()
611             for x in range(len(self.expected_order))]
612         self.hashes = [
613             hashlib.md5(self.blocks[x]).hexdigest()
614             for x in range(len(self.expected_order))]
615         self.api_client = self.mock_keep_services(count=self.services)
616         self.keep_client = arvados.KeepClient(api_client=self.api_client)
617
618     def test_weighted_service_roots_against_reference_set(self):
619         # Confirm weighted_service_roots() returns the correct order
620         for i, hash in enumerate(self.hashes):
621             roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash))
622             got_order = [
623                 re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
624                 for root in roots]
625             self.assertEqual(self.expected_order[i], got_order)
626
627     def test_get_probe_order_against_reference_set(self):
628         self._test_probe_order_against_reference_set(
629             lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
630
631     def test_head_probe_order_against_reference_set(self):
632         self._test_probe_order_against_reference_set(
633             lambda i: self.keep_client.head(self.hashes[i], num_retries=1))
634
635     def test_put_probe_order_against_reference_set(self):
636         # copies=1 prevents the test from being sensitive to races
637         # between writer threads.
638         self._test_probe_order_against_reference_set(
639             lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1))
640
641     def _test_probe_order_against_reference_set(self, op):
642         for i in range(len(self.blocks)):
643             with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \
644                  self.assertRaises(arvados.errors.KeepRequestError):
645                 op(i)
646             got_order = [
647                 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
648                 for resp in mock.responses]
649             self.assertEqual(self.expected_order[i]*2, got_order)
650
651     def test_put_probe_order_multiple_copies(self):
652         for copies in range(2, 4):
653             for i in range(len(self.blocks)):
654                 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \
655                      self.assertRaises(arvados.errors.KeepWriteError):
656                     self.keep_client.put(self.blocks[i], num_retries=2, copies=copies)
657                 got_order = [
658                     re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
659                     for resp in mock.responses]
660                 # With T threads racing to make requests, the position
661                 # of a given server in the sequence of HTTP requests
662                 # (got_order) cannot be more than T-1 positions
663                 # earlier than that server's position in the reference
664                 # probe sequence (expected_order).
665                 #
666                 # Loop invariant: we have accounted for +pos+ expected
667                 # probes, either by seeing them in +got_order+ or by
668                 # putting them in +pending+ in the hope of seeing them
669                 # later. As long as +len(pending)<T+, we haven't
670                 # started a request too early.
671                 pending = []
672                 for pos, expected in enumerate(self.expected_order[i]*3):
673                     got = got_order[pos-len(pending)]
674                     while got in pending:
675                         del pending[pending.index(got)]
676                         got = got_order[pos-len(pending)]
677                     if got != expected:
678                         pending.append(expected)
679                         self.assertLess(
680                             len(pending), copies,
681                             "pending={}, with copies={}, got {}, expected {}".format(
682                                 pending, copies, repr(got_order), repr(self.expected_order[i]*3)))
683
684     def test_probe_waste_adding_one_server(self):
685         hashes = [
686             hashlib.md5("{:064x}".format(x).encode()).hexdigest() for x in range(100)]
687         initial_services = 12
688         self.api_client = self.mock_keep_services(count=initial_services)
689         self.keep_client = arvados.KeepClient(api_client=self.api_client)
690         probes_before = [
691             self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
692         for added_services in range(1, 12):
693             api_client = self.mock_keep_services(count=initial_services+added_services)
694             keep_client = arvados.KeepClient(api_client=api_client)
695             total_penalty = 0
696             for hash_index in range(len(hashes)):
697                 probe_after = keep_client.weighted_service_roots(
698                     arvados.KeepLocator(hashes[hash_index]))
699                 penalty = probe_after.index(probes_before[hash_index][0])
700                 self.assertLessEqual(penalty, added_services)
701                 total_penalty += penalty
702             # Average penalty per block should not exceed
703             # N(added)/N(orig) by more than 20%, and should get closer
704             # to the ideal as we add data points.
705             expect_penalty = (
706                 added_services *
707                 len(hashes) / initial_services)
708             max_penalty = (
709                 expect_penalty *
710                 (120 - added_services)/100)
711             min_penalty = (
712                 expect_penalty * 8/10)
713             self.assertTrue(
714                 min_penalty <= total_penalty <= max_penalty,
715                 "With {}+{} services, {} blocks, penalty {} but expected {}..{}".format(
716                     initial_services,
717                     added_services,
718                     len(hashes),
719                     total_penalty,
720                     min_penalty,
721                     max_penalty))
722
723     def check_64_zeros_error_order(self, verb, exc_class):
724         data = b'0' * 64
725         if verb == 'get':
726             data = tutil.str_keep_locator(data)
727         # Arbitrary port number:
728         aport = random.randint(1024,65535)
729         api_client = self.mock_keep_services(service_port=aport, count=self.services)
730         keep_client = arvados.KeepClient(api_client=api_client)
731         with mock.patch('pycurl.Curl') as curl_mock, \
732              self.assertRaises(exc_class) as err_check:
733             curl_mock.return_value = tutil.FakeCurl.make(code=500, body=b'')
734             getattr(keep_client, verb)(data)
735         urls = [urllib.parse.urlparse(url)
736                 for url in err_check.exception.request_errors()]
737         self.assertEqual([('keep0x' + c, aport) for c in '3eab2d5fc9681074'],
738                          [(url.hostname, url.port) for url in urls])
739
740     def test_get_error_shows_probe_order(self):
741         self.check_64_zeros_error_order('get', arvados.errors.KeepReadError)
742
743     def test_put_error_shows_probe_order(self):
744         self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
745
746
747 class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase):
748     # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
749     # 1s worth of data and then trigger bandwidth errors before running
750     # out of data.
751     DATA = b'x'*2**11
752     BANDWIDTH_LOW_LIM = 1024
753     TIMEOUT_TIME = 1.0
754
755     class assertTakesBetween(unittest.TestCase):
756         def __init__(self, tmin, tmax):
757             self.tmin = tmin
758             self.tmax = tmax
759
760         def __enter__(self):
761             self.t0 = time.time()
762
763         def __exit__(self, *args, **kwargs):
764             # Round times to milliseconds, like CURL. Otherwise, we
765             # fail when CURL reaches a 1s timeout at 0.9998s.
766             delta = round(time.time() - self.t0, 3)
767             self.assertGreaterEqual(delta, self.tmin)
768             self.assertLessEqual(delta, self.tmax)
769
770     class assertTakesGreater(unittest.TestCase):
771         def __init__(self, tmin):
772             self.tmin = tmin
773
774         def __enter__(self):
775             self.t0 = time.time()
776
777         def __exit__(self, *args, **kwargs):
778             delta = round(time.time() - self.t0, 3)
779             self.assertGreaterEqual(delta, self.tmin)
780
781     def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
782         return arvados.KeepClient(
783             api_client=self.api_client,
784             timeout=timeouts)
785
786     def test_timeout_slow_connect(self):
787         # Can't simulate TCP delays with our own socket. Leave our
788         # stub server running uselessly, and try to connect to an
789         # unroutable IP address instead.
790         self.api_client = self.mock_keep_services(
791             count=1,
792             service_host='240.0.0.0',
793         )
794         with self.assertTakesBetween(0.1, 0.5):
795             with self.assertRaises(arvados.errors.KeepWriteError):
796                 self.keepClient().put(self.DATA, copies=1, num_retries=0)
797
798     def test_low_bandwidth_no_delays_success(self):
799         self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
800         kc = self.keepClient()
801         loc = kc.put(self.DATA, copies=1, num_retries=0)
802         self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
803
804     def test_too_low_bandwidth_no_delays_failure(self):
805         # Check that lessening bandwidth corresponds to failing
806         kc = self.keepClient()
807         loc = kc.put(self.DATA, copies=1, num_retries=0)
808         self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
809         with self.assertTakesGreater(self.TIMEOUT_TIME):
810             with self.assertRaises(arvados.errors.KeepReadError) as e:
811                 kc.get(loc, num_retries=0)
812         with self.assertTakesGreater(self.TIMEOUT_TIME):
813             with self.assertRaises(arvados.errors.KeepWriteError):
814                 kc.put(self.DATA, copies=1, num_retries=0)
815
816     def test_low_bandwidth_with_server_response_delay_failure(self):
817         kc = self.keepClient()
818         loc = kc.put(self.DATA, copies=1, num_retries=0)
819         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
820         self.server.setdelays(response=self.TIMEOUT_TIME)
821         with self.assertTakesGreater(self.TIMEOUT_TIME):
822             with self.assertRaises(arvados.errors.KeepReadError) as e:
823                 kc.get(loc, num_retries=0)
824         with self.assertTakesGreater(self.TIMEOUT_TIME):
825             with self.assertRaises(arvados.errors.KeepWriteError):
826                 kc.put(self.DATA, copies=1, num_retries=0)
827         with self.assertTakesGreater(self.TIMEOUT_TIME):
828             with self.assertRaises(arvados.errors.KeepReadError) as e:
829                 kc.head(loc, num_retries=0)
830
831     def test_low_bandwidth_with_server_mid_delay_failure(self):
832         kc = self.keepClient()
833         loc = kc.put(self.DATA, copies=1, num_retries=0)
834         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
835         self.server.setdelays(mid_write=self.TIMEOUT_TIME, mid_read=self.TIMEOUT_TIME)
836         with self.assertTakesGreater(self.TIMEOUT_TIME):
837             with self.assertRaises(arvados.errors.KeepReadError) as e:
838                 kc.get(loc, num_retries=0)
839         with self.assertTakesGreater(self.TIMEOUT_TIME):
840             with self.assertRaises(arvados.errors.KeepWriteError):
841                 kc.put(self.DATA, copies=1, num_retries=0)
842
843     def test_timeout_slow_request(self):
844         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
845         self.server.setdelays(request=.2)
846         self._test_connect_timeout_under_200ms(loc)
847         self.server.setdelays(request=2)
848         self._test_response_timeout_under_2s(loc)
849
850     def test_timeout_slow_response(self):
851         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
852         self.server.setdelays(response=.2)
853         self._test_connect_timeout_under_200ms(loc)
854         self.server.setdelays(response=2)
855         self._test_response_timeout_under_2s(loc)
856
857     def test_timeout_slow_response_body(self):
858         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
859         self.server.setdelays(response_body=.2)
860         self._test_connect_timeout_under_200ms(loc)
861         self.server.setdelays(response_body=2)
862         self._test_response_timeout_under_2s(loc)
863
864     def _test_connect_timeout_under_200ms(self, loc):
865         # Allow 100ms to connect, then 1s for response. Everything
866         # should work, and everything should take at least 200ms to
867         # return.
868         kc = self.keepClient(timeouts=(.1, 1))
869         with self.assertTakesBetween(.2, .3):
870             kc.put(self.DATA, copies=1, num_retries=0)
871         with self.assertTakesBetween(.2, .3):
872             self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
873
874     def _test_response_timeout_under_2s(self, loc):
875         # Allow 10s to connect, then 1s for response. Nothing should
876         # work, and everything should take at least 1s to return.
877         kc = self.keepClient(timeouts=(10, 1))
878         with self.assertTakesBetween(1, 9):
879             with self.assertRaises(arvados.errors.KeepReadError):
880                 kc.get(loc, num_retries=0)
881         with self.assertTakesBetween(1, 9):
882             with self.assertRaises(arvados.errors.KeepWriteError):
883                 kc.put(self.DATA, copies=1, num_retries=0)
884
885
886 class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
887     def mock_disks_and_gateways(self, disks=3, gateways=1):
888         self.gateways = [{
889                 'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
890                 'owner_uuid': 'zzzzz-tpzed-000000000000000',
891                 'service_host': 'gatewayhost{}'.format(i),
892                 'service_port': 12345,
893                 'service_ssl_flag': True,
894                 'service_type': 'gateway:test',
895         } for i in range(gateways)]
896         self.gateway_roots = [
897             "https://{service_host}:{service_port}/".format(**gw)
898             for gw in self.gateways]
899         self.api_client = self.mock_keep_services(
900             count=disks, additional_services=self.gateways)
901         self.keepClient = arvados.KeepClient(api_client=self.api_client)
902
903     @mock.patch('pycurl.Curl')
904     def test_get_with_gateway_hint_first(self, MockCurl):
905         MockCurl.return_value = tutil.FakeCurl.make(
906             code=200, body='foo', headers={'Content-Length': 3})
907         self.mock_disks_and_gateways()
908         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@' + self.gateways[0]['uuid']
909         self.assertEqual(b'foo', self.keepClient.get(locator))
910         self.assertEqual(self.gateway_roots[0]+locator,
911                          MockCurl.return_value.getopt(pycurl.URL).decode())
912         self.assertEqual(True, self.keepClient.head(locator))
913
914     @mock.patch('pycurl.Curl')
915     def test_get_with_gateway_hints_in_order(self, MockCurl):
916         gateways = 4
917         disks = 3
918         mocks = [
919             tutil.FakeCurl.make(code=404, body='')
920             for _ in range(gateways+disks)
921         ]
922         MockCurl.side_effect = tutil.queue_with(mocks)
923         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
924         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
925                            ['K@'+gw['uuid'] for gw in self.gateways])
926         with self.assertRaises(arvados.errors.NotFoundError):
927             self.keepClient.get(locator)
928         # Gateways are tried first, in the order given.
929         for i, root in enumerate(self.gateway_roots):
930             self.assertEqual(root+locator,
931                              mocks[i].getopt(pycurl.URL).decode())
932         # Disk services are tried next.
933         for i in range(gateways, gateways+disks):
934             self.assertRegex(
935                 mocks[i].getopt(pycurl.URL).decode(),
936                 r'keep0x')
937
938     @mock.patch('pycurl.Curl')
939     def test_head_with_gateway_hints_in_order(self, MockCurl):
940         gateways = 4
941         disks = 3
942         mocks = [
943             tutil.FakeCurl.make(code=404, body=b'')
944             for _ in range(gateways+disks)
945         ]
946         MockCurl.side_effect = tutil.queue_with(mocks)
947         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
948         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
949                            ['K@'+gw['uuid'] for gw in self.gateways])
950         with self.assertRaises(arvados.errors.NotFoundError):
951             self.keepClient.head(locator)
952         # Gateways are tried first, in the order given.
953         for i, root in enumerate(self.gateway_roots):
954             self.assertEqual(root+locator,
955                              mocks[i].getopt(pycurl.URL).decode())
956         # Disk services are tried next.
957         for i in range(gateways, gateways+disks):
958             self.assertRegex(
959                 mocks[i].getopt(pycurl.URL).decode(),
960                 r'keep0x')
961
962     @mock.patch('pycurl.Curl')
963     def test_get_with_remote_proxy_hint(self, MockCurl):
964         MockCurl.return_value = tutil.FakeCurl.make(
965             code=200, body=b'foo', headers={'Content-Length': 3})
966         self.mock_disks_and_gateways()
967         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
968         self.assertEqual(b'foo', self.keepClient.get(locator))
969         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
970                          MockCurl.return_value.getopt(pycurl.URL).decode())
971
972     @mock.patch('pycurl.Curl')
973     def test_head_with_remote_proxy_hint(self, MockCurl):
974         MockCurl.return_value = tutil.FakeCurl.make(
975             code=200, body=b'foo', headers={'Content-Length': 3})
976         self.mock_disks_and_gateways()
977         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
978         self.assertEqual(True, self.keepClient.head(locator))
979         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
980                          MockCurl.return_value.getopt(pycurl.URL).decode())
981
982
983 class KeepClientRetryTestMixin(object):
984     # Testing with a local Keep store won't exercise the retry behavior.
985     # Instead, our strategy is:
986     # * Create a client with one proxy specified (pointed at a black
987     #   hole), so there's no need to instantiate an API client, and
988     #   all HTTP requests come from one place.
989     # * Mock httplib's request method to provide simulated responses.
990     # This lets us test the retry logic extensively without relying on any
991     # supporting servers, and prevents side effects in case something hiccups.
992     # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
993     # run_method().
994     #
995     # Test classes must define TEST_PATCHER to a method that mocks
996     # out appropriate methods in the client.
997
998     PROXY_ADDR = 'http://[%s]:65535/' % (tutil.TEST_HOST,)
999     TEST_DATA = b'testdata'
1000     TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
1001
1002     def setUp(self):
1003         self.client_kwargs = {'proxy': self.PROXY_ADDR, 'local_store': ''}
1004
1005     def new_client(self, **caller_kwargs):
1006         kwargs = self.client_kwargs.copy()
1007         kwargs.update(caller_kwargs)
1008         return arvados.KeepClient(**kwargs)
1009
1010     def run_method(self, *args, **kwargs):
1011         raise NotImplementedError("test subclasses must define run_method")
1012
1013     def check_success(self, expected=None, *args, **kwargs):
1014         if expected is None:
1015             expected = self.DEFAULT_EXPECT
1016         self.assertEqual(expected, self.run_method(*args, **kwargs))
1017
1018     def check_exception(self, error_class=None, *args, **kwargs):
1019         if error_class is None:
1020             error_class = self.DEFAULT_EXCEPTION
1021         self.assertRaises(error_class, self.run_method, *args, **kwargs)
1022
1023     def test_immediate_success(self):
1024         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 200):
1025             self.check_success()
1026
1027     def test_retry_then_success(self):
1028         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1029             self.check_success(num_retries=3)
1030
1031     def test_exception_then_success(self):
1032         with self.TEST_PATCHER(self.DEFAULT_EXPECT, Exception('mock err'), 200):
1033             self.check_success(num_retries=3)
1034
1035     def test_no_default_retry(self):
1036         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1037             self.check_exception()
1038
1039     def test_no_retry_after_permanent_error(self):
1040         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 403, 200):
1041             self.check_exception(num_retries=3)
1042
1043     def test_error_after_retries_exhausted(self):
1044         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 500, 200):
1045             self.check_exception(num_retries=1)
1046
1047     def test_num_retries_instance_fallback(self):
1048         self.client_kwargs['num_retries'] = 3
1049         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1050             self.check_success()
1051
1052
1053 @tutil.skip_sleep
1054 class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1055     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
1056     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1057     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1058     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1059
1060     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1061                    *args, **kwargs):
1062         return self.new_client().get(locator, *args, **kwargs)
1063
1064     def test_specific_exception_when_not_found(self):
1065         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1066             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1067
1068     def test_general_exception_with_mixed_errors(self):
1069         # get should raise a NotFoundError if no server returns the block,
1070         # and a high threshold of servers report that it's not found.
1071         # This test rigs up 50/50 disagreement between two servers, and
1072         # checks that it does not become a NotFoundError.
1073         client = self.new_client()
1074         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1075             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1076                 client.get(self.HINTED_LOCATOR)
1077             self.assertNotIsInstance(
1078                 exc_check.exception, arvados.errors.NotFoundError,
1079                 "mixed errors raised NotFoundError")
1080
1081     def test_hint_server_can_succeed_without_retries(self):
1082         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1083             self.check_success(locator=self.HINTED_LOCATOR)
1084
1085     def test_try_next_server_after_timeout(self):
1086         with tutil.mock_keep_responses(
1087                 (socket.timeout("timed out"), 200),
1088                 (self.DEFAULT_EXPECT, 200)):
1089             self.check_success(locator=self.HINTED_LOCATOR)
1090
1091     def test_retry_data_with_wrong_checksum(self):
1092         with tutil.mock_keep_responses(
1093                 ('baddata', 200),
1094                 (self.DEFAULT_EXPECT, 200)):
1095             self.check_success(locator=self.HINTED_LOCATOR)
1096
1097 @tutil.skip_sleep
1098 class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1099     DEFAULT_EXPECT = True
1100     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1101     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1102     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1103
1104     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1105                    *args, **kwargs):
1106         return self.new_client().head(locator, *args, **kwargs)
1107
1108     def test_specific_exception_when_not_found(self):
1109         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1110             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1111
1112     def test_general_exception_with_mixed_errors(self):
1113         # head should raise a NotFoundError if no server returns the block,
1114         # and a high threshold of servers report that it's not found.
1115         # This test rigs up 50/50 disagreement between two servers, and
1116         # checks that it does not become a NotFoundError.
1117         client = self.new_client()
1118         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1119             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1120                 client.head(self.HINTED_LOCATOR)
1121             self.assertNotIsInstance(
1122                 exc_check.exception, arvados.errors.NotFoundError,
1123                 "mixed errors raised NotFoundError")
1124
1125     def test_hint_server_can_succeed_without_retries(self):
1126         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1127             self.check_success(locator=self.HINTED_LOCATOR)
1128
1129     def test_try_next_server_after_timeout(self):
1130         with tutil.mock_keep_responses(
1131                 (socket.timeout("timed out"), 200),
1132                 (self.DEFAULT_EXPECT, 200)):
1133             self.check_success(locator=self.HINTED_LOCATOR)
1134
1135 @tutil.skip_sleep
1136 class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1137     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
1138     DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
1139     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1140
1141     def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
1142                    copies=1, *args, **kwargs):
1143         return self.new_client().put(data, copies, *args, **kwargs)
1144
1145     def test_do_not_send_multiple_copies_to_same_server(self):
1146         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200):
1147             self.check_exception(copies=2, num_retries=3)
1148
1149
1150 class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
1151
1152     class FakeKeepService(object):
1153         def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
1154             self.delay = delay
1155             self.will_succeed = will_succeed
1156             self.will_raise = will_raise
1157             self._result = {}
1158             self._result['headers'] = {}
1159             self._result['headers']['x-keep-replicas-stored'] = str(replicas)
1160             self._result['body'] = 'foobar'
1161
1162         def put(self, data_hash, data, timeout):
1163             time.sleep(self.delay)
1164             if self.will_raise is not None:
1165                 raise self.will_raise
1166             return self.will_succeed
1167
1168         def last_result(self):
1169             if self.will_succeed:
1170                 return self._result
1171
1172         def finished(self):
1173             return False
1174
1175     def setUp(self):
1176         self.copies = 3
1177         self.pool = arvados.KeepClient.KeepWriterThreadPool(
1178             data = 'foo',
1179             data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1180             max_service_replicas = self.copies,
1181             copies = self.copies
1182         )
1183
1184     def test_only_write_enough_on_success(self):
1185         for i in range(10):
1186             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1187             self.pool.add_task(ks, None)
1188         self.pool.join()
1189         self.assertEqual(self.pool.done(), self.copies)
1190
1191     def test_only_write_enough_on_partial_success(self):
1192         for i in range(5):
1193             ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
1194             self.pool.add_task(ks, None)
1195             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1196             self.pool.add_task(ks, None)
1197         self.pool.join()
1198         self.assertEqual(self.pool.done(), self.copies)
1199
1200     def test_only_write_enough_when_some_crash(self):
1201         for i in range(5):
1202             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1203             self.pool.add_task(ks, None)
1204             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1205             self.pool.add_task(ks, None)
1206         self.pool.join()
1207         self.assertEqual(self.pool.done(), self.copies)
1208
1209     def test_fail_when_too_many_crash(self):
1210         for i in range(self.copies+1):
1211             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1212             self.pool.add_task(ks, None)
1213         for i in range(self.copies-1):
1214             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1215             self.pool.add_task(ks, None)
1216         self.pool.join()
1217         self.assertEqual(self.pool.done(), self.copies-1)
1218
1219
1220 @tutil.skip_sleep
1221 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock):
1222     # Test put()s that need two distinct servers to succeed, possibly
1223     # requiring multiple passes through the retry loop.
1224
1225     def setUp(self):
1226         self.api_client = self.mock_keep_services(count=2)
1227         self.keep_client = arvados.KeepClient(api_client=self.api_client)
1228
1229     def test_success_after_exception(self):
1230         with tutil.mock_keep_responses(
1231                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1232                 Exception('mock err'), 200, 200) as req_mock:
1233             self.keep_client.put('foo', num_retries=1, copies=2)
1234         self.assertEqual(3, req_mock.call_count)
1235
1236     def test_success_after_retryable_error(self):
1237         with tutil.mock_keep_responses(
1238                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1239                 500, 200, 200) as req_mock:
1240             self.keep_client.put('foo', num_retries=1, copies=2)
1241         self.assertEqual(3, req_mock.call_count)
1242
1243     def test_fail_after_final_error(self):
1244         # First retry loop gets a 200 (can't achieve replication by
1245         # storing again on that server) and a 400 (can't retry that
1246         # server at all), so we shouldn't try a third request.
1247         with tutil.mock_keep_responses(
1248                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1249                 200, 400, 200) as req_mock:
1250             with self.assertRaises(arvados.errors.KeepWriteError):
1251                 self.keep_client.put('foo', num_retries=1, copies=2)
1252         self.assertEqual(2, req_mock.call_count)
1253
1254 class KeepClientAPIErrorTest(unittest.TestCase):
1255     def test_api_fail(self):
1256         class ApiMock(object):
1257             def __getattr__(self, r):
1258                 if r == "api_token":
1259                     return "abc"
1260                 else:
1261                     raise arvados.errors.KeepReadError()
1262         keep_client = arvados.KeepClient(api_client=ApiMock(),
1263                                              proxy='', local_store='')
1264         with self.assertRaises(arvados.errors.KeepReadError):
1265             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1266         with self.assertRaises(arvados.errors.KeepReadError):
1267             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")