11308: Fix traceback.format_exc() usage.
[arvados.git] / sdk / python / tests / test_keep_client.py
1 from __future__ import absolute_import
2 from __future__ import division
3 from future import standard_library
4 standard_library.install_aliases()
5 from builtins import str
6 from builtins import range
7 from builtins import object
8 import hashlib
9 import mock
10 import os
11 import pycurl
12 import random
13 import re
14 import socket
15 import sys
16 import threading
17 import time
18 import unittest
19 import urllib.parse
20
21 import arvados
22 import arvados.retry
23 from . import arvados_testutil as tutil
24 from . import keepstub
25 from . import run_test_server
26
27 class KeepTestCase(run_test_server.TestCaseWithServers):
28     MAIN_SERVER = {}
29     KEEP_SERVER = {}
30
31     @classmethod
32     def setUpClass(cls):
33         super(KeepTestCase, cls).setUpClass()
34         run_test_server.authorize_with("admin")
35         cls.api_client = arvados.api('v1')
36         cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
37                                              proxy='', local_store='')
38
39     def test_KeepBasicRWTest(self):
40         self.assertEqual(0, self.keep_client.upload_counter.get())
41         foo_locator = self.keep_client.put('foo')
42         self.assertRegexpMatches(
43             foo_locator,
44             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
45             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
46
47         # 6 bytes because uploaded 2 copies
48         self.assertEqual(6, self.keep_client.upload_counter.get())
49
50         self.assertEqual(0, self.keep_client.download_counter.get())
51         self.assertEqual(self.keep_client.get(foo_locator),
52                          b'foo',
53                          'wrong content from Keep.get(md5("foo"))')
54         self.assertEqual(3, self.keep_client.download_counter.get())
55
56     def test_KeepBinaryRWTest(self):
57         blob_str = b'\xff\xfe\xf7\x00\x01\x02'
58         blob_locator = self.keep_client.put(blob_str)
59         self.assertRegexpMatches(
60             blob_locator,
61             '^7fc7c53b45e53926ba52821140fef396\+6',
62             ('wrong locator from Keep.put(<binarydata>):' + blob_locator))
63         self.assertEqual(self.keep_client.get(blob_locator),
64                          blob_str,
65                          'wrong content from Keep.get(md5(<binarydata>))')
66
67     def test_KeepLongBinaryRWTest(self):
68         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
69         for i in range(0,23):
70             blob_data = blob_data + blob_data
71         blob_locator = self.keep_client.put(blob_data)
72         self.assertRegexpMatches(
73             blob_locator,
74             '^84d90fc0d8175dd5dcfab04b999bc956\+67108864',
75             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
76         self.assertEqual(self.keep_client.get(blob_locator),
77                          blob_data,
78                          'wrong content from Keep.get(md5(<binarydata>))')
79
80     @unittest.skip("unreliable test - please fix and close #8752")
81     def test_KeepSingleCopyRWTest(self):
82         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
83         blob_locator = self.keep_client.put(blob_data, copies=1)
84         self.assertRegexpMatches(
85             blob_locator,
86             '^c902006bc98a3eb4a3663b65ab4a6fab\+8',
87             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
88         self.assertEqual(self.keep_client.get(blob_locator),
89                          blob_data,
90                          'wrong content from Keep.get(md5(<binarydata>))')
91
92     def test_KeepEmptyCollectionTest(self):
93         blob_locator = self.keep_client.put('', copies=1)
94         self.assertRegexpMatches(
95             blob_locator,
96             '^d41d8cd98f00b204e9800998ecf8427e\+0',
97             ('wrong locator from Keep.put(""): ' + blob_locator))
98
99     def test_unicode_must_be_ascii(self):
100         # If unicode type, must only consist of valid ASCII
101         foo_locator = self.keep_client.put(u'foo')
102         self.assertRegexpMatches(
103             foo_locator,
104             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
105             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
106
107         if sys.version_info < (3, 0):
108             with self.assertRaises(UnicodeEncodeError):
109                 # Error if it is not ASCII
110                 self.keep_client.put(u'\xe2')
111
112         with self.assertRaises(AttributeError):
113             # Must be bytes or have an encode() method
114             self.keep_client.put({})
115
116     def test_KeepHeadTest(self):
117         locator = self.keep_client.put('test_head')
118         self.assertRegexpMatches(
119             locator,
120             '^b9a772c7049325feb7130fff1f8333e9\+9',
121             'wrong md5 hash from Keep.put for "test_head": ' + locator)
122         self.assertEqual(True, self.keep_client.head(locator))
123         self.assertEqual(self.keep_client.get(locator),
124                          b'test_head',
125                          'wrong content from Keep.get for "test_head"')
126
127 class KeepPermissionTestCase(run_test_server.TestCaseWithServers):
128     MAIN_SERVER = {}
129     KEEP_SERVER = {'blob_signing_key': 'abcdefghijk0123456789',
130                    'enforce_permissions': True}
131
132     def test_KeepBasicRWTest(self):
133         run_test_server.authorize_with('active')
134         keep_client = arvados.KeepClient()
135         foo_locator = keep_client.put('foo')
136         self.assertRegexpMatches(
137             foo_locator,
138             r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
139             'invalid locator from Keep.put("foo"): ' + foo_locator)
140         self.assertEqual(keep_client.get(foo_locator),
141                          b'foo',
142                          'wrong content from Keep.get(md5("foo"))')
143
144         # GET with an unsigned locator => NotFound
145         bar_locator = keep_client.put('bar')
146         unsigned_bar_locator = "37b51d194a7513e45b56f6524f2d51f2+3"
147         self.assertRegexpMatches(
148             bar_locator,
149             r'^37b51d194a7513e45b56f6524f2d51f2\+3\+A[a-f0-9]+@[a-f0-9]+$',
150             'invalid locator from Keep.put("bar"): ' + bar_locator)
151         self.assertRaises(arvados.errors.NotFoundError,
152                           keep_client.get,
153                           unsigned_bar_locator)
154
155         # GET from a different user => NotFound
156         run_test_server.authorize_with('spectator')
157         self.assertRaises(arvados.errors.NotFoundError,
158                           arvados.Keep.get,
159                           bar_locator)
160
161         # Unauthenticated GET for a signed locator => NotFound
162         # Unauthenticated GET for an unsigned locator => NotFound
163         keep_client.api_token = ''
164         self.assertRaises(arvados.errors.NotFoundError,
165                           keep_client.get,
166                           bar_locator)
167         self.assertRaises(arvados.errors.NotFoundError,
168                           keep_client.get,
169                           unsigned_bar_locator)
170
171
172 # KeepOptionalPermission: starts Keep with --permission-key-file
173 # but not --enforce-permissions (i.e. generate signatures on PUT
174 # requests, but do not require them for GET requests)
175 #
176 # All of these requests should succeed when permissions are optional:
177 # * authenticated request, signed locator
178 # * authenticated request, unsigned locator
179 # * unauthenticated request, signed locator
180 # * unauthenticated request, unsigned locator
181 class KeepOptionalPermission(run_test_server.TestCaseWithServers):
182     MAIN_SERVER = {}
183     KEEP_SERVER = {'blob_signing_key': 'abcdefghijk0123456789',
184                    'enforce_permissions': False}
185
186     @classmethod
187     def setUpClass(cls):
188         super(KeepOptionalPermission, cls).setUpClass()
189         run_test_server.authorize_with("admin")
190         cls.api_client = arvados.api('v1')
191
192     def setUp(self):
193         super(KeepOptionalPermission, self).setUp()
194         self.keep_client = arvados.KeepClient(api_client=self.api_client,
195                                               proxy='', local_store='')
196
197     def _put_foo_and_check(self):
198         signed_locator = self.keep_client.put('foo')
199         self.assertRegexpMatches(
200             signed_locator,
201             r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
202             'invalid locator from Keep.put("foo"): ' + signed_locator)
203         return signed_locator
204
205     def test_KeepAuthenticatedSignedTest(self):
206         signed_locator = self._put_foo_and_check()
207         self.assertEqual(self.keep_client.get(signed_locator),
208                          b'foo',
209                          'wrong content from Keep.get(md5("foo"))')
210
211     def test_KeepAuthenticatedUnsignedTest(self):
212         signed_locator = self._put_foo_and_check()
213         self.assertEqual(self.keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8"),
214                          b'foo',
215                          'wrong content from Keep.get(md5("foo"))')
216
217     def test_KeepUnauthenticatedSignedTest(self):
218         # Check that signed GET requests work even when permissions
219         # enforcement is off.
220         signed_locator = self._put_foo_and_check()
221         self.keep_client.api_token = ''
222         self.assertEqual(self.keep_client.get(signed_locator),
223                          b'foo',
224                          'wrong content from Keep.get(md5("foo"))')
225
226     def test_KeepUnauthenticatedUnsignedTest(self):
227         # Since --enforce-permissions is not in effect, GET requests
228         # need not be authenticated.
229         signed_locator = self._put_foo_and_check()
230         self.keep_client.api_token = ''
231         self.assertEqual(self.keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8"),
232                          b'foo',
233                          'wrong content from Keep.get(md5("foo"))')
234
235
236 class KeepProxyTestCase(run_test_server.TestCaseWithServers):
237     MAIN_SERVER = {}
238     KEEP_SERVER = {}
239     KEEP_PROXY_SERVER = {}
240
241     @classmethod
242     def setUpClass(cls):
243         super(KeepProxyTestCase, cls).setUpClass()
244         run_test_server.authorize_with('active')
245         cls.api_client = arvados.api('v1')
246
247     def tearDown(self):
248         arvados.config.settings().pop('ARVADOS_EXTERNAL_CLIENT', None)
249         super(KeepProxyTestCase, self).tearDown()
250
251     def test_KeepProxyTest1(self):
252         # Will use ARVADOS_KEEP_SERVICES environment variable that
253         # is set by setUpClass().
254         keep_client = arvados.KeepClient(api_client=self.api_client,
255                                          local_store='')
256         baz_locator = keep_client.put('baz')
257         self.assertRegexpMatches(
258             baz_locator,
259             '^73feffa4b7f6bb68e44cf984c85f6e88\+3',
260             'wrong md5 hash from Keep.put("baz"): ' + baz_locator)
261         self.assertEqual(keep_client.get(baz_locator),
262                          b'baz',
263                          'wrong content from Keep.get(md5("baz"))')
264         self.assertTrue(keep_client.using_proxy)
265
266     def test_KeepProxyTest2(self):
267         # Don't instantiate the proxy directly, but set the X-External-Client
268         # header.  The API server should direct us to the proxy.
269         arvados.config.settings()['ARVADOS_EXTERNAL_CLIENT'] = 'true'
270         keep_client = arvados.KeepClient(api_client=self.api_client,
271                                          proxy='', local_store='')
272         baz_locator = keep_client.put('baz2')
273         self.assertRegexpMatches(
274             baz_locator,
275             '^91f372a266fe2bf2823cb8ec7fda31ce\+4',
276             'wrong md5 hash from Keep.put("baz2"): ' + baz_locator)
277         self.assertEqual(keep_client.get(baz_locator),
278                          b'baz2',
279                          'wrong content from Keep.get(md5("baz2"))')
280         self.assertTrue(keep_client.using_proxy)
281
282     def test_KeepProxyTestMultipleURIs(self):
283         # Test using ARVADOS_KEEP_SERVICES env var overriding any
284         # existing proxy setting and setting multiple proxies
285         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
286         keep_client = arvados.KeepClient(api_client=self.api_client,
287                                          local_store='')
288         uris = [x['_service_root'] for x in keep_client._keep_services]
289         self.assertEqual(uris, ['http://10.0.0.1/',
290                                 'https://foo.example.org:1234/'])
291
292     def test_KeepProxyTestInvalidURI(self):
293         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org'
294         with self.assertRaises(arvados.errors.ArgumentError):
295             keep_client = arvados.KeepClient(api_client=self.api_client,
296                                              local_store='')
297
298
299 class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
300     def get_service_roots(self, api_client):
301         keep_client = arvados.KeepClient(api_client=api_client)
302         services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
303         return [urllib.parse.urlparse(url) for url in sorted(services)]
304
305     def test_ssl_flag_respected_in_roots(self):
306         for ssl_flag in [False, True]:
307             services = self.get_service_roots(self.mock_keep_services(
308                 service_ssl_flag=ssl_flag))
309             self.assertEqual(
310                 ('https' if ssl_flag else 'http'), services[0].scheme)
311
312     def test_correct_ports_with_ipv6_addresses(self):
313         service = self.get_service_roots(self.mock_keep_services(
314             service_type='proxy', service_host='100::1', service_port=10, count=1))[0]
315         self.assertEqual('100::1', service.hostname)
316         self.assertEqual(10, service.port)
317
318     # test_*_timeout verify that KeepClient instructs pycurl to use
319     # the appropriate connection and read timeouts. They don't care
320     # whether pycurl actually exhibits the expected timeout behavior
321     # -- those tests are in the KeepClientTimeout test class.
322
323     def test_get_timeout(self):
324         api_client = self.mock_keep_services(count=1)
325         force_timeout = socket.timeout("timed out")
326         with tutil.mock_keep_responses(force_timeout, 0) as mock:
327             keep_client = arvados.KeepClient(api_client=api_client)
328             with self.assertRaises(arvados.errors.KeepReadError):
329                 keep_client.get('ffffffffffffffffffffffffffffffff')
330             self.assertEqual(
331                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
332                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
333             self.assertEqual(
334                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
335                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
336             self.assertEqual(
337                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
338                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
339
340     def test_put_timeout(self):
341         api_client = self.mock_keep_services(count=1)
342         force_timeout = socket.timeout("timed out")
343         with tutil.mock_keep_responses(force_timeout, 0) as mock:
344             keep_client = arvados.KeepClient(api_client=api_client)
345             with self.assertRaises(arvados.errors.KeepWriteError):
346                 keep_client.put(b'foo')
347             self.assertEqual(
348                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
349                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
350             self.assertEqual(
351                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
352                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
353             self.assertEqual(
354                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
355                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
356
357     def test_head_timeout(self):
358         api_client = self.mock_keep_services(count=1)
359         force_timeout = socket.timeout("timed out")
360         with tutil.mock_keep_responses(force_timeout, 0) as mock:
361             keep_client = arvados.KeepClient(api_client=api_client)
362             with self.assertRaises(arvados.errors.KeepReadError):
363                 keep_client.head('ffffffffffffffffffffffffffffffff')
364             self.assertEqual(
365                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
366                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
367             self.assertEqual(
368                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
369                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
370             self.assertEqual(
371                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
372                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
373
374     def test_proxy_get_timeout(self):
375         api_client = self.mock_keep_services(service_type='proxy', count=1)
376         force_timeout = socket.timeout("timed out")
377         with tutil.mock_keep_responses(force_timeout, 0) as mock:
378             keep_client = arvados.KeepClient(api_client=api_client)
379             with self.assertRaises(arvados.errors.KeepReadError):
380                 keep_client.get('ffffffffffffffffffffffffffffffff')
381             self.assertEqual(
382                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
383                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
384             self.assertEqual(
385                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
386                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
387             self.assertEqual(
388                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
389                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
390
391     def test_proxy_head_timeout(self):
392         api_client = self.mock_keep_services(service_type='proxy', count=1)
393         force_timeout = socket.timeout("timed out")
394         with tutil.mock_keep_responses(force_timeout, 0) as mock:
395             keep_client = arvados.KeepClient(api_client=api_client)
396             with self.assertRaises(arvados.errors.KeepReadError):
397                 keep_client.head('ffffffffffffffffffffffffffffffff')
398             self.assertEqual(
399                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
400                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
401             self.assertEqual(
402                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
403                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
404             self.assertEqual(
405                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
406                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
407
408     def test_proxy_put_timeout(self):
409         api_client = self.mock_keep_services(service_type='proxy', count=1)
410         force_timeout = socket.timeout("timed out")
411         with tutil.mock_keep_responses(force_timeout, 0) as mock:
412             keep_client = arvados.KeepClient(api_client=api_client)
413             with self.assertRaises(arvados.errors.KeepWriteError):
414                 keep_client.put('foo')
415             self.assertEqual(
416                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
417                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
418             self.assertEqual(
419                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
420                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
421             self.assertEqual(
422                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
423                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
424
425     def check_no_services_error(self, verb, exc_class):
426         api_client = mock.MagicMock(name='api_client')
427         api_client.keep_services().accessible().execute.side_effect = (
428             arvados.errors.ApiError)
429         keep_client = arvados.KeepClient(api_client=api_client)
430         with self.assertRaises(exc_class) as err_check:
431             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
432         self.assertEqual(0, len(err_check.exception.request_errors()))
433
434     def test_get_error_with_no_services(self):
435         self.check_no_services_error('get', arvados.errors.KeepReadError)
436
437     def test_head_error_with_no_services(self):
438         self.check_no_services_error('head', arvados.errors.KeepReadError)
439
440     def test_put_error_with_no_services(self):
441         self.check_no_services_error('put', arvados.errors.KeepWriteError)
442
443     def check_errors_from_last_retry(self, verb, exc_class):
444         api_client = self.mock_keep_services(count=2)
445         req_mock = tutil.mock_keep_responses(
446             "retry error reporting test", 500, 500, 403, 403)
447         with req_mock, tutil.skip_sleep, \
448                 self.assertRaises(exc_class) as err_check:
449             keep_client = arvados.KeepClient(api_client=api_client)
450             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
451                                        num_retries=3)
452         self.assertEqual([403, 403], [
453                 getattr(error, 'status_code', None)
454                 for error in err_check.exception.request_errors().values()])
455
456     def test_get_error_reflects_last_retry(self):
457         self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
458
459     def test_head_error_reflects_last_retry(self):
460         self.check_errors_from_last_retry('head', arvados.errors.KeepReadError)
461
462     def test_put_error_reflects_last_retry(self):
463         self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
464
465     def test_put_error_does_not_include_successful_puts(self):
466         data = 'partial failure test'
467         data_loc = tutil.str_keep_locator(data)
468         api_client = self.mock_keep_services(count=3)
469         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
470                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
471             keep_client = arvados.KeepClient(api_client=api_client)
472             keep_client.put(data)
473         self.assertEqual(2, len(exc_check.exception.request_errors()))
474
475     def test_proxy_put_with_no_writable_services(self):
476         data = 'test with no writable services'
477         data_loc = tutil.str_keep_locator(data)
478         api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
479         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
480                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
481           keep_client = arvados.KeepClient(api_client=api_client)
482           keep_client.put(data)
483         self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
484         self.assertEqual(0, len(exc_check.exception.request_errors()))
485
486     def test_oddball_service_get(self):
487         body = b'oddball service get'
488         api_client = self.mock_keep_services(service_type='fancynewblobstore')
489         with tutil.mock_keep_responses(body, 200):
490             keep_client = arvados.KeepClient(api_client=api_client)
491             actual = keep_client.get(tutil.str_keep_locator(body))
492         self.assertEqual(body, actual)
493
494     def test_oddball_service_put(self):
495         body = b'oddball service put'
496         pdh = tutil.str_keep_locator(body)
497         api_client = self.mock_keep_services(service_type='fancynewblobstore')
498         with tutil.mock_keep_responses(pdh, 200):
499             keep_client = arvados.KeepClient(api_client=api_client)
500             actual = keep_client.put(body, copies=1)
501         self.assertEqual(pdh, actual)
502
503     def test_oddball_service_writer_count(self):
504         body = b'oddball service writer count'
505         pdh = tutil.str_keep_locator(body)
506         api_client = self.mock_keep_services(service_type='fancynewblobstore',
507                                              count=4)
508         headers = {'x-keep-replicas-stored': 3}
509         with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
510                                        **headers) as req_mock:
511             keep_client = arvados.KeepClient(api_client=api_client)
512             actual = keep_client.put(body, copies=2)
513         self.assertEqual(pdh, actual)
514         self.assertEqual(1, req_mock.call_count)
515
516
517 @tutil.skip_sleep
518 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
519
520     def setUp(self):
521         # expected_order[i] is the probe order for
522         # hash=md5(sprintf("%064x",i)) where there are 16 services
523         # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
524         # the first probe for the block consisting of 64 "0"
525         # characters is the service whose uuid is
526         # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
527         self.services = 16
528         self.expected_order = [
529             list('3eab2d5fc9681074'),
530             list('097dba52e648f1c3'),
531             list('c5b4e023f8a7d691'),
532             list('9d81c02e76a3bf54'),
533             ]
534         self.blocks = [
535             "{:064x}".format(x).encode()
536             for x in range(len(self.expected_order))]
537         self.hashes = [
538             hashlib.md5(self.blocks[x]).hexdigest()
539             for x in range(len(self.expected_order))]
540         self.api_client = self.mock_keep_services(count=self.services)
541         self.keep_client = arvados.KeepClient(api_client=self.api_client)
542
543     def test_weighted_service_roots_against_reference_set(self):
544         # Confirm weighted_service_roots() returns the correct order
545         for i, hash in enumerate(self.hashes):
546             roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash))
547             got_order = [
548                 re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
549                 for root in roots]
550             self.assertEqual(self.expected_order[i], got_order)
551
552     def test_get_probe_order_against_reference_set(self):
553         self._test_probe_order_against_reference_set(
554             lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
555
556     def test_head_probe_order_against_reference_set(self):
557         self._test_probe_order_against_reference_set(
558             lambda i: self.keep_client.head(self.hashes[i], num_retries=1))
559
560     def test_put_probe_order_against_reference_set(self):
561         # copies=1 prevents the test from being sensitive to races
562         # between writer threads.
563         self._test_probe_order_against_reference_set(
564             lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1))
565
566     def _test_probe_order_against_reference_set(self, op):
567         for i in range(len(self.blocks)):
568             with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \
569                  self.assertRaises(arvados.errors.KeepRequestError):
570                 op(i)
571             got_order = [
572                 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
573                 for resp in mock.responses]
574             self.assertEqual(self.expected_order[i]*2, got_order)
575
576     def test_put_probe_order_multiple_copies(self):
577         for copies in range(2, 4):
578             for i in range(len(self.blocks)):
579                 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \
580                      self.assertRaises(arvados.errors.KeepWriteError):
581                     self.keep_client.put(self.blocks[i], num_retries=2, copies=copies)
582                 got_order = [
583                     re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
584                     for resp in mock.responses]
585                 # With T threads racing to make requests, the position
586                 # of a given server in the sequence of HTTP requests
587                 # (got_order) cannot be more than T-1 positions
588                 # earlier than that server's position in the reference
589                 # probe sequence (expected_order).
590                 #
591                 # Loop invariant: we have accounted for +pos+ expected
592                 # probes, either by seeing them in +got_order+ or by
593                 # putting them in +pending+ in the hope of seeing them
594                 # later. As long as +len(pending)<T+, we haven't
595                 # started a request too early.
596                 pending = []
597                 for pos, expected in enumerate(self.expected_order[i]*3):
598                     got = got_order[pos-len(pending)]
599                     while got in pending:
600                         del pending[pending.index(got)]
601                         got = got_order[pos-len(pending)]
602                     if got != expected:
603                         pending.append(expected)
604                         self.assertLess(
605                             len(pending), copies,
606                             "pending={}, with copies={}, got {}, expected {}".format(
607                                 pending, copies, repr(got_order), repr(self.expected_order[i]*3)))
608
609     def test_probe_waste_adding_one_server(self):
610         hashes = [
611             hashlib.md5("{:064x}".format(x).encode()).hexdigest() for x in range(100)]
612         initial_services = 12
613         self.api_client = self.mock_keep_services(count=initial_services)
614         self.keep_client = arvados.KeepClient(api_client=self.api_client)
615         probes_before = [
616             self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
617         for added_services in range(1, 12):
618             api_client = self.mock_keep_services(count=initial_services+added_services)
619             keep_client = arvados.KeepClient(api_client=api_client)
620             total_penalty = 0
621             for hash_index in range(len(hashes)):
622                 probe_after = keep_client.weighted_service_roots(
623                     arvados.KeepLocator(hashes[hash_index]))
624                 penalty = probe_after.index(probes_before[hash_index][0])
625                 self.assertLessEqual(penalty, added_services)
626                 total_penalty += penalty
627             # Average penalty per block should not exceed
628             # N(added)/N(orig) by more than 20%, and should get closer
629             # to the ideal as we add data points.
630             expect_penalty = (
631                 added_services *
632                 len(hashes) / initial_services)
633             max_penalty = (
634                 expect_penalty *
635                 (120 - added_services)/100)
636             min_penalty = (
637                 expect_penalty * 8/10)
638             self.assertTrue(
639                 min_penalty <= total_penalty <= max_penalty,
640                 "With {}+{} services, {} blocks, penalty {} but expected {}..{}".format(
641                     initial_services,
642                     added_services,
643                     len(hashes),
644                     total_penalty,
645                     min_penalty,
646                     max_penalty))
647
648     def check_64_zeros_error_order(self, verb, exc_class):
649         data = b'0' * 64
650         if verb == 'get':
651             data = tutil.str_keep_locator(data)
652         # Arbitrary port number:
653         aport = random.randint(1024,65535)
654         api_client = self.mock_keep_services(service_port=aport, count=self.services)
655         keep_client = arvados.KeepClient(api_client=api_client)
656         with mock.patch('pycurl.Curl') as curl_mock, \
657              self.assertRaises(exc_class) as err_check:
658             curl_mock.return_value = tutil.FakeCurl.make(code=500, body=b'')
659             getattr(keep_client, verb)(data)
660         urls = [urllib.parse.urlparse(url)
661                 for url in err_check.exception.request_errors()]
662         self.assertEqual([('keep0x' + c, aport) for c in '3eab2d5fc9681074'],
663                          [(url.hostname, url.port) for url in urls])
664
665     def test_get_error_shows_probe_order(self):
666         self.check_64_zeros_error_order('get', arvados.errors.KeepReadError)
667
668     def test_put_error_shows_probe_order(self):
669         self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
670
671
672 class KeepClientTimeout(unittest.TestCase, tutil.ApiClientMock):
673     # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
674     # 1s worth of data and then trigger bandwidth errors before running
675     # out of data.
676     DATA = b'x'*2**11
677     BANDWIDTH_LOW_LIM = 1024
678     TIMEOUT_TIME = 1.0
679
680     class assertTakesBetween(unittest.TestCase):
681         def __init__(self, tmin, tmax):
682             self.tmin = tmin
683             self.tmax = tmax
684
685         def __enter__(self):
686             self.t0 = time.time()
687
688         def __exit__(self, *args, **kwargs):
689             # Round times to milliseconds, like CURL. Otherwise, we
690             # fail when CURL reaches a 1s timeout at 0.9998s.
691             delta = round(time.time() - self.t0, 3)
692             self.assertGreaterEqual(delta, self.tmin)
693             self.assertLessEqual(delta, self.tmax)
694
695     class assertTakesGreater(unittest.TestCase):
696         def __init__(self, tmin):
697             self.tmin = tmin
698
699         def __enter__(self):
700             self.t0 = time.time()
701
702         def __exit__(self, *args, **kwargs):
703             delta = round(time.time() - self.t0, 3)
704             self.assertGreaterEqual(delta, self.tmin)
705
706     def setUp(self):
707         sock = socket.socket()
708         sock.bind(('0.0.0.0', 0))
709         self.port = sock.getsockname()[1]
710         sock.close()
711         self.server = keepstub.Server(('0.0.0.0', self.port), keepstub.Handler)
712         self.thread = threading.Thread(target=self.server.serve_forever)
713         self.thread.daemon = True # Exit thread if main proc exits
714         self.thread.start()
715         self.api_client = self.mock_keep_services(
716             count=1,
717             service_host='localhost',
718             service_port=self.port,
719         )
720
721     def tearDown(self):
722         self.server.shutdown()
723
724     def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
725         return arvados.KeepClient(
726             api_client=self.api_client,
727             timeout=timeouts)
728
729     def test_timeout_slow_connect(self):
730         # Can't simulate TCP delays with our own socket. Leave our
731         # stub server running uselessly, and try to connect to an
732         # unroutable IP address instead.
733         self.api_client = self.mock_keep_services(
734             count=1,
735             service_host='240.0.0.0',
736         )
737         with self.assertTakesBetween(0.1, 0.5):
738             with self.assertRaises(arvados.errors.KeepWriteError):
739                 self.keepClient().put(self.DATA, copies=1, num_retries=0)
740
741     def test_low_bandwidth_no_delays_success(self):
742         self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
743         kc = self.keepClient()
744         loc = kc.put(self.DATA, copies=1, num_retries=0)
745         self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
746
747     def test_too_low_bandwidth_no_delays_failure(self):
748         # Check that lessening bandwidth corresponds to failing
749         kc = self.keepClient()
750         loc = kc.put(self.DATA, copies=1, num_retries=0)
751         self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
752         with self.assertTakesGreater(self.TIMEOUT_TIME):
753             with self.assertRaises(arvados.errors.KeepReadError) as e:
754                 kc.get(loc, num_retries=0)
755         with self.assertTakesGreater(self.TIMEOUT_TIME):
756             with self.assertRaises(arvados.errors.KeepWriteError):
757                 kc.put(self.DATA, copies=1, num_retries=0)
758
759     def test_low_bandwidth_with_server_response_delay_failure(self):
760         kc = self.keepClient()
761         loc = kc.put(self.DATA, copies=1, num_retries=0)
762         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
763         self.server.setdelays(response=self.TIMEOUT_TIME)
764         with self.assertTakesGreater(self.TIMEOUT_TIME):
765             with self.assertRaises(arvados.errors.KeepReadError) as e:
766                 kc.get(loc, num_retries=0)
767         with self.assertTakesGreater(self.TIMEOUT_TIME):
768             with self.assertRaises(arvados.errors.KeepWriteError):
769                 kc.put(self.DATA, copies=1, num_retries=0)
770         with self.assertTakesGreater(self.TIMEOUT_TIME):
771             with self.assertRaises(arvados.errors.KeepReadError) as e:
772                 kc.head(loc, num_retries=0)
773
774     def test_low_bandwidth_with_server_mid_delay_failure(self):
775         kc = self.keepClient()
776         loc = kc.put(self.DATA, copies=1, num_retries=0)
777         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
778         self.server.setdelays(mid_write=self.TIMEOUT_TIME, mid_read=self.TIMEOUT_TIME)
779         with self.assertTakesGreater(self.TIMEOUT_TIME):
780             with self.assertRaises(arvados.errors.KeepReadError) as e:
781                 kc.get(loc, num_retries=0)
782         with self.assertTakesGreater(self.TIMEOUT_TIME):
783             with self.assertRaises(arvados.errors.KeepWriteError):
784                 kc.put(self.DATA, copies=1, num_retries=0)
785
786     def test_timeout_slow_request(self):
787         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
788         self.server.setdelays(request=.2)
789         self._test_connect_timeout_under_200ms(loc)
790         self.server.setdelays(request=2)
791         self._test_response_timeout_under_2s(loc)
792
793     def test_timeout_slow_response(self):
794         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
795         self.server.setdelays(response=.2)
796         self._test_connect_timeout_under_200ms(loc)
797         self.server.setdelays(response=2)
798         self._test_response_timeout_under_2s(loc)
799
800     def test_timeout_slow_response_body(self):
801         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
802         self.server.setdelays(response_body=.2)
803         self._test_connect_timeout_under_200ms(loc)
804         self.server.setdelays(response_body=2)
805         self._test_response_timeout_under_2s(loc)
806
807     def _test_connect_timeout_under_200ms(self, loc):
808         # Allow 100ms to connect, then 1s for response. Everything
809         # should work, and everything should take at least 200ms to
810         # return.
811         kc = self.keepClient(timeouts=(.1, 1))
812         with self.assertTakesBetween(.2, .3):
813             kc.put(self.DATA, copies=1, num_retries=0)
814         with self.assertTakesBetween(.2, .3):
815             self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
816
817     def _test_response_timeout_under_2s(self, loc):
818         # Allow 10s to connect, then 1s for response. Nothing should
819         # work, and everything should take at least 1s to return.
820         kc = self.keepClient(timeouts=(10, 1))
821         with self.assertTakesBetween(1, 9):
822             with self.assertRaises(arvados.errors.KeepReadError):
823                 kc.get(loc, num_retries=0)
824         with self.assertTakesBetween(1, 9):
825             with self.assertRaises(arvados.errors.KeepWriteError):
826                 kc.put(self.DATA, copies=1, num_retries=0)
827
828
829 class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
830     def mock_disks_and_gateways(self, disks=3, gateways=1):
831         self.gateways = [{
832                 'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
833                 'owner_uuid': 'zzzzz-tpzed-000000000000000',
834                 'service_host': 'gatewayhost{}'.format(i),
835                 'service_port': 12345,
836                 'service_ssl_flag': True,
837                 'service_type': 'gateway:test',
838         } for i in range(gateways)]
839         self.gateway_roots = [
840             "https://{service_host}:{service_port}/".format(**gw)
841             for gw in self.gateways]
842         self.api_client = self.mock_keep_services(
843             count=disks, additional_services=self.gateways)
844         self.keepClient = arvados.KeepClient(api_client=self.api_client)
845
846     @mock.patch('pycurl.Curl')
847     def test_get_with_gateway_hint_first(self, MockCurl):
848         MockCurl.return_value = tutil.FakeCurl.make(
849             code=200, body='foo', headers={'Content-Length': 3})
850         self.mock_disks_and_gateways()
851         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@' + self.gateways[0]['uuid']
852         self.assertEqual(b'foo', self.keepClient.get(locator))
853         self.assertEqual(self.gateway_roots[0]+locator,
854                          MockCurl.return_value.getopt(pycurl.URL).decode())
855         self.assertEqual(True, self.keepClient.head(locator))
856
857     @mock.patch('pycurl.Curl')
858     def test_get_with_gateway_hints_in_order(self, MockCurl):
859         gateways = 4
860         disks = 3
861         mocks = [
862             tutil.FakeCurl.make(code=404, body='')
863             for _ in range(gateways+disks)
864         ]
865         MockCurl.side_effect = tutil.queue_with(mocks)
866         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
867         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
868                            ['K@'+gw['uuid'] for gw in self.gateways])
869         with self.assertRaises(arvados.errors.NotFoundError):
870             self.keepClient.get(locator)
871         # Gateways are tried first, in the order given.
872         for i, root in enumerate(self.gateway_roots):
873             self.assertEqual(root+locator,
874                              mocks[i].getopt(pycurl.URL).decode())
875         # Disk services are tried next.
876         for i in range(gateways, gateways+disks):
877             self.assertRegexpMatches(
878                 mocks[i].getopt(pycurl.URL).decode(),
879                 r'keep0x')
880
881     @mock.patch('pycurl.Curl')
882     def test_head_with_gateway_hints_in_order(self, MockCurl):
883         gateways = 4
884         disks = 3
885         mocks = [
886             tutil.FakeCurl.make(code=404, body=b'')
887             for _ in range(gateways+disks)
888         ]
889         MockCurl.side_effect = tutil.queue_with(mocks)
890         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
891         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
892                            ['K@'+gw['uuid'] for gw in self.gateways])
893         with self.assertRaises(arvados.errors.NotFoundError):
894             self.keepClient.head(locator)
895         # Gateways are tried first, in the order given.
896         for i, root in enumerate(self.gateway_roots):
897             self.assertEqual(root+locator,
898                              mocks[i].getopt(pycurl.URL).decode())
899         # Disk services are tried next.
900         for i in range(gateways, gateways+disks):
901             self.assertRegexpMatches(
902                 mocks[i].getopt(pycurl.URL).decode(),
903                 r'keep0x')
904
905     @mock.patch('pycurl.Curl')
906     def test_get_with_remote_proxy_hint(self, MockCurl):
907         MockCurl.return_value = tutil.FakeCurl.make(
908             code=200, body=b'foo', headers={'Content-Length': 3})
909         self.mock_disks_and_gateways()
910         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
911         self.assertEqual(b'foo', self.keepClient.get(locator))
912         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
913                          MockCurl.return_value.getopt(pycurl.URL).decode())
914
915     @mock.patch('pycurl.Curl')
916     def test_head_with_remote_proxy_hint(self, MockCurl):
917         MockCurl.return_value = tutil.FakeCurl.make(
918             code=200, body=b'foo', headers={'Content-Length': 3})
919         self.mock_disks_and_gateways()
920         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
921         self.assertEqual(True, self.keepClient.head(locator))
922         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
923                          MockCurl.return_value.getopt(pycurl.URL).decode())
924
925
926 class KeepClientRetryTestMixin(object):
927     # Testing with a local Keep store won't exercise the retry behavior.
928     # Instead, our strategy is:
929     # * Create a client with one proxy specified (pointed at a black
930     #   hole), so there's no need to instantiate an API client, and
931     #   all HTTP requests come from one place.
932     # * Mock httplib's request method to provide simulated responses.
933     # This lets us test the retry logic extensively without relying on any
934     # supporting servers, and prevents side effects in case something hiccups.
935     # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
936     # run_method().
937     #
938     # Test classes must define TEST_PATCHER to a method that mocks
939     # out appropriate methods in the client.
940
941     PROXY_ADDR = 'http://[%s]:65535/' % (tutil.TEST_HOST,)
942     TEST_DATA = b'testdata'
943     TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
944
945     def setUp(self):
946         self.client_kwargs = {'proxy': self.PROXY_ADDR, 'local_store': ''}
947
948     def new_client(self, **caller_kwargs):
949         kwargs = self.client_kwargs.copy()
950         kwargs.update(caller_kwargs)
951         return arvados.KeepClient(**kwargs)
952
953     def run_method(self, *args, **kwargs):
954         raise NotImplementedError("test subclasses must define run_method")
955
956     def check_success(self, expected=None, *args, **kwargs):
957         if expected is None:
958             expected = self.DEFAULT_EXPECT
959         self.assertEqual(expected, self.run_method(*args, **kwargs))
960
961     def check_exception(self, error_class=None, *args, **kwargs):
962         if error_class is None:
963             error_class = self.DEFAULT_EXCEPTION
964         self.assertRaises(error_class, self.run_method, *args, **kwargs)
965
966     def test_immediate_success(self):
967         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 200):
968             self.check_success()
969
970     def test_retry_then_success(self):
971         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
972             self.check_success(num_retries=3)
973
974     def test_exception_then_success(self):
975         with self.TEST_PATCHER(self.DEFAULT_EXPECT, Exception('mock err'), 200):
976             self.check_success(num_retries=3)
977
978     def test_no_default_retry(self):
979         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
980             self.check_exception()
981
982     def test_no_retry_after_permanent_error(self):
983         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 403, 200):
984             self.check_exception(num_retries=3)
985
986     def test_error_after_retries_exhausted(self):
987         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 500, 200):
988             self.check_exception(num_retries=1)
989
990     def test_num_retries_instance_fallback(self):
991         self.client_kwargs['num_retries'] = 3
992         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
993             self.check_success()
994
995
996 @tutil.skip_sleep
997 class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
998     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
999     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1000     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1001     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1002
1003     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1004                    *args, **kwargs):
1005         return self.new_client().get(locator, *args, **kwargs)
1006
1007     def test_specific_exception_when_not_found(self):
1008         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1009             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1010
1011     def test_general_exception_with_mixed_errors(self):
1012         # get should raise a NotFoundError if no server returns the block,
1013         # and a high threshold of servers report that it's not found.
1014         # This test rigs up 50/50 disagreement between two servers, and
1015         # checks that it does not become a NotFoundError.
1016         client = self.new_client()
1017         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1018             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1019                 client.get(self.HINTED_LOCATOR)
1020             self.assertNotIsInstance(
1021                 exc_check.exception, arvados.errors.NotFoundError,
1022                 "mixed errors raised NotFoundError")
1023
1024     def test_hint_server_can_succeed_without_retries(self):
1025         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1026             self.check_success(locator=self.HINTED_LOCATOR)
1027
1028     def test_try_next_server_after_timeout(self):
1029         with tutil.mock_keep_responses(
1030                 (socket.timeout("timed out"), 200),
1031                 (self.DEFAULT_EXPECT, 200)):
1032             self.check_success(locator=self.HINTED_LOCATOR)
1033
1034     def test_retry_data_with_wrong_checksum(self):
1035         with tutil.mock_keep_responses(
1036                 ('baddata', 200),
1037                 (self.DEFAULT_EXPECT, 200)):
1038             self.check_success(locator=self.HINTED_LOCATOR)
1039
1040 @tutil.skip_sleep
1041 class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1042     DEFAULT_EXPECT = True
1043     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1044     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1045     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1046
1047     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1048                    *args, **kwargs):
1049         return self.new_client().head(locator, *args, **kwargs)
1050
1051     def test_specific_exception_when_not_found(self):
1052         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1053             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1054
1055     def test_general_exception_with_mixed_errors(self):
1056         # head should raise a NotFoundError if no server returns the block,
1057         # and a high threshold of servers report that it's not found.
1058         # This test rigs up 50/50 disagreement between two servers, and
1059         # checks that it does not become a NotFoundError.
1060         client = self.new_client()
1061         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1062             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1063                 client.head(self.HINTED_LOCATOR)
1064             self.assertNotIsInstance(
1065                 exc_check.exception, arvados.errors.NotFoundError,
1066                 "mixed errors raised NotFoundError")
1067
1068     def test_hint_server_can_succeed_without_retries(self):
1069         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1070             self.check_success(locator=self.HINTED_LOCATOR)
1071
1072     def test_try_next_server_after_timeout(self):
1073         with tutil.mock_keep_responses(
1074                 (socket.timeout("timed out"), 200),
1075                 (self.DEFAULT_EXPECT, 200)):
1076             self.check_success(locator=self.HINTED_LOCATOR)
1077
1078 @tutil.skip_sleep
1079 class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1080     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
1081     DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
1082     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1083
1084     def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
1085                    copies=1, *args, **kwargs):
1086         return self.new_client().put(data, copies, *args, **kwargs)
1087
1088     def test_do_not_send_multiple_copies_to_same_server(self):
1089         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200):
1090             self.check_exception(copies=2, num_retries=3)
1091
1092
1093 class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
1094
1095     class FakeKeepService(object):
1096         def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
1097             self.delay = delay
1098             self.will_succeed = will_succeed
1099             self.will_raise = will_raise
1100             self._result = {}
1101             self._result['headers'] = {}
1102             self._result['headers']['x-keep-replicas-stored'] = str(replicas)
1103             self._result['body'] = 'foobar'
1104
1105         def put(self, data_hash, data, timeout):
1106             time.sleep(self.delay)
1107             if self.will_raise is not None:
1108                 raise self.will_raise
1109             return self.will_succeed
1110
1111         def last_result(self):
1112             if self.will_succeed:
1113                 return self._result
1114
1115         def finished(self):
1116             return False
1117     
1118     def setUp(self):
1119         self.copies = 3
1120         self.pool = arvados.KeepClient.KeepWriterThreadPool(
1121             data = 'foo',
1122             data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1123             max_service_replicas = self.copies,
1124             copies = self.copies
1125         )
1126
1127     def test_only_write_enough_on_success(self):
1128         for i in range(10):
1129             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1130             self.pool.add_task(ks, None)
1131         self.pool.join()
1132         self.assertEqual(self.pool.done(), self.copies)
1133
1134     def test_only_write_enough_on_partial_success(self):
1135         for i in range(5):
1136             ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
1137             self.pool.add_task(ks, None)
1138             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1139             self.pool.add_task(ks, None)
1140         self.pool.join()
1141         self.assertEqual(self.pool.done(), self.copies)
1142
1143     def test_only_write_enough_when_some_crash(self):
1144         for i in range(5):
1145             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1146             self.pool.add_task(ks, None)
1147             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1148             self.pool.add_task(ks, None)
1149         self.pool.join()
1150         self.assertEqual(self.pool.done(), self.copies)
1151
1152     def test_fail_when_too_many_crash(self):
1153         for i in range(self.copies+1):
1154             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1155             self.pool.add_task(ks, None)
1156         for i in range(self.copies-1):
1157             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1158             self.pool.add_task(ks, None)
1159         self.pool.join()
1160         self.assertEqual(self.pool.done(), self.copies-1)
1161     
1162
1163 @tutil.skip_sleep
1164 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock):
1165     # Test put()s that need two distinct servers to succeed, possibly
1166     # requiring multiple passes through the retry loop.
1167
1168     def setUp(self):
1169         self.api_client = self.mock_keep_services(count=2)
1170         self.keep_client = arvados.KeepClient(api_client=self.api_client)
1171
1172     def test_success_after_exception(self):
1173         with tutil.mock_keep_responses(
1174                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1175                 Exception('mock err'), 200, 200) as req_mock:
1176             self.keep_client.put('foo', num_retries=1, copies=2)
1177         self.assertEqual(3, req_mock.call_count)
1178
1179     def test_success_after_retryable_error(self):
1180         with tutil.mock_keep_responses(
1181                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1182                 500, 200, 200) as req_mock:
1183             self.keep_client.put('foo', num_retries=1, copies=2)
1184         self.assertEqual(3, req_mock.call_count)
1185
1186     def test_fail_after_final_error(self):
1187         # First retry loop gets a 200 (can't achieve replication by
1188         # storing again on that server) and a 400 (can't retry that
1189         # server at all), so we shouldn't try a third request.
1190         with tutil.mock_keep_responses(
1191                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1192                 200, 400, 200) as req_mock:
1193             with self.assertRaises(arvados.errors.KeepWriteError):
1194                 self.keep_client.put('foo', num_retries=1, copies=2)
1195         self.assertEqual(2, req_mock.call_count)