11308: Fix bytes vs. str problems.
[arvados.git] / sdk / python / tests / test_keep_client.py
1 from __future__ import absolute_import
2 from __future__ import division
3 from future import standard_library
4 standard_library.install_aliases()
5 from builtins import str
6 from builtins import range
7 from builtins import object
8 import hashlib
9 import mock
10 import os
11 import pycurl
12 import random
13 import re
14 import socket
15 import threading
16 import time
17 import unittest
18 import urllib.parse
19
20 import arvados
21 import arvados.retry
22 from . import arvados_testutil as tutil
23 from . import keepstub
24 from . import run_test_server
25
26 class KeepTestCase(run_test_server.TestCaseWithServers):
27     MAIN_SERVER = {}
28     KEEP_SERVER = {}
29
30     @classmethod
31     def setUpClass(cls):
32         super(KeepTestCase, cls).setUpClass()
33         run_test_server.authorize_with("admin")
34         cls.api_client = arvados.api('v1')
35         cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
36                                              proxy='', local_store='')
37
38     def test_KeepBasicRWTest(self):
39         self.assertEqual(0, self.keep_client.upload_counter.get())
40         foo_locator = self.keep_client.put('foo')
41         self.assertRegexpMatches(
42             foo_locator,
43             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
44             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
45
46         # 6 bytes because uploaded 2 copies
47         self.assertEqual(6, self.keep_client.upload_counter.get())
48
49         self.assertEqual(0, self.keep_client.download_counter.get())
50         self.assertEqual(self.keep_client.get(foo_locator),
51                          'foo',
52                          'wrong content from Keep.get(md5("foo"))')
53         self.assertEqual(3, self.keep_client.download_counter.get())
54
55     def test_KeepBinaryRWTest(self):
56         blob_str = '\xff\xfe\xf7\x00\x01\x02'
57         blob_locator = self.keep_client.put(blob_str)
58         self.assertRegexpMatches(
59             blob_locator,
60             '^7fc7c53b45e53926ba52821140fef396\+6',
61             ('wrong locator from Keep.put(<binarydata>):' + blob_locator))
62         self.assertEqual(self.keep_client.get(blob_locator),
63                          blob_str,
64                          'wrong content from Keep.get(md5(<binarydata>))')
65
66     def test_KeepLongBinaryRWTest(self):
67         blob_str = '\xff\xfe\xfd\xfc\x00\x01\x02\x03'
68         for i in range(0,23):
69             blob_str = blob_str + blob_str
70         blob_locator = self.keep_client.put(blob_str)
71         self.assertRegexpMatches(
72             blob_locator,
73             '^84d90fc0d8175dd5dcfab04b999bc956\+67108864',
74             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
75         self.assertEqual(self.keep_client.get(blob_locator),
76                          blob_str,
77                          'wrong content from Keep.get(md5(<binarydata>))')
78
79     @unittest.skip("unreliable test - please fix and close #8752")
80     def test_KeepSingleCopyRWTest(self):
81         blob_str = '\xff\xfe\xfd\xfc\x00\x01\x02\x03'
82         blob_locator = self.keep_client.put(blob_str, copies=1)
83         self.assertRegexpMatches(
84             blob_locator,
85             '^c902006bc98a3eb4a3663b65ab4a6fab\+8',
86             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
87         self.assertEqual(self.keep_client.get(blob_locator),
88                          blob_str,
89                          'wrong content from Keep.get(md5(<binarydata>))')
90
91     def test_KeepEmptyCollectionTest(self):
92         blob_locator = self.keep_client.put('', copies=1)
93         self.assertRegexpMatches(
94             blob_locator,
95             '^d41d8cd98f00b204e9800998ecf8427e\+0',
96             ('wrong locator from Keep.put(""): ' + blob_locator))
97
98     def test_unicode_must_be_ascii(self):
99         # If unicode type, must only consist of valid ASCII
100         foo_locator = self.keep_client.put(u'foo')
101         self.assertRegexpMatches(
102             foo_locator,
103             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
104             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
105
106         with self.assertRaises(UnicodeEncodeError):
107             # Error if it is not ASCII
108             self.keep_client.put(u'\xe2')
109
110         with self.assertRaises(AttributeError):
111             # Must be bytes or have an encode() method
112             self.keep_client.put({})
113
114     def test_KeepHeadTest(self):
115         locator = self.keep_client.put('test_head')
116         self.assertRegexpMatches(
117             locator,
118             '^b9a772c7049325feb7130fff1f8333e9\+9',
119             'wrong md5 hash from Keep.put for "test_head": ' + locator)
120         self.assertEqual(True, self.keep_client.head(locator))
121         self.assertEqual(self.keep_client.get(locator),
122                          'test_head',
123                          'wrong content from Keep.get for "test_head"')
124
125 class KeepPermissionTestCase(run_test_server.TestCaseWithServers):
126     MAIN_SERVER = {}
127     KEEP_SERVER = {'blob_signing_key': 'abcdefghijk0123456789',
128                    'enforce_permissions': True}
129
130     def test_KeepBasicRWTest(self):
131         run_test_server.authorize_with('active')
132         keep_client = arvados.KeepClient()
133         foo_locator = keep_client.put('foo')
134         self.assertRegexpMatches(
135             foo_locator,
136             r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
137             'invalid locator from Keep.put("foo"): ' + foo_locator)
138         self.assertEqual(keep_client.get(foo_locator),
139                          'foo',
140                          'wrong content from Keep.get(md5("foo"))')
141
142         # GET with an unsigned locator => NotFound
143         bar_locator = keep_client.put('bar')
144         unsigned_bar_locator = "37b51d194a7513e45b56f6524f2d51f2+3"
145         self.assertRegexpMatches(
146             bar_locator,
147             r'^37b51d194a7513e45b56f6524f2d51f2\+3\+A[a-f0-9]+@[a-f0-9]+$',
148             'invalid locator from Keep.put("bar"): ' + bar_locator)
149         self.assertRaises(arvados.errors.NotFoundError,
150                           keep_client.get,
151                           unsigned_bar_locator)
152
153         # GET from a different user => NotFound
154         run_test_server.authorize_with('spectator')
155         self.assertRaises(arvados.errors.NotFoundError,
156                           arvados.Keep.get,
157                           bar_locator)
158
159         # Unauthenticated GET for a signed locator => NotFound
160         # Unauthenticated GET for an unsigned locator => NotFound
161         keep_client.api_token = ''
162         self.assertRaises(arvados.errors.NotFoundError,
163                           keep_client.get,
164                           bar_locator)
165         self.assertRaises(arvados.errors.NotFoundError,
166                           keep_client.get,
167                           unsigned_bar_locator)
168
169
170 # KeepOptionalPermission: starts Keep with --permission-key-file
171 # but not --enforce-permissions (i.e. generate signatures on PUT
172 # requests, but do not require them for GET requests)
173 #
174 # All of these requests should succeed when permissions are optional:
175 # * authenticated request, signed locator
176 # * authenticated request, unsigned locator
177 # * unauthenticated request, signed locator
178 # * unauthenticated request, unsigned locator
179 class KeepOptionalPermission(run_test_server.TestCaseWithServers):
180     MAIN_SERVER = {}
181     KEEP_SERVER = {'blob_signing_key': 'abcdefghijk0123456789',
182                    'enforce_permissions': False}
183
184     @classmethod
185     def setUpClass(cls):
186         super(KeepOptionalPermission, cls).setUpClass()
187         run_test_server.authorize_with("admin")
188         cls.api_client = arvados.api('v1')
189
190     def setUp(self):
191         super(KeepOptionalPermission, self).setUp()
192         self.keep_client = arvados.KeepClient(api_client=self.api_client,
193                                               proxy='', local_store='')
194
195     def _put_foo_and_check(self):
196         signed_locator = self.keep_client.put('foo')
197         self.assertRegexpMatches(
198             signed_locator,
199             r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
200             'invalid locator from Keep.put("foo"): ' + signed_locator)
201         return signed_locator
202
203     def test_KeepAuthenticatedSignedTest(self):
204         signed_locator = self._put_foo_and_check()
205         self.assertEqual(self.keep_client.get(signed_locator),
206                          'foo',
207                          'wrong content from Keep.get(md5("foo"))')
208
209     def test_KeepAuthenticatedUnsignedTest(self):
210         signed_locator = self._put_foo_and_check()
211         self.assertEqual(self.keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8"),
212                          'foo',
213                          'wrong content from Keep.get(md5("foo"))')
214
215     def test_KeepUnauthenticatedSignedTest(self):
216         # Check that signed GET requests work even when permissions
217         # enforcement is off.
218         signed_locator = self._put_foo_and_check()
219         self.keep_client.api_token = ''
220         self.assertEqual(self.keep_client.get(signed_locator),
221                          'foo',
222                          'wrong content from Keep.get(md5("foo"))')
223
224     def test_KeepUnauthenticatedUnsignedTest(self):
225         # Since --enforce-permissions is not in effect, GET requests
226         # need not be authenticated.
227         signed_locator = self._put_foo_and_check()
228         self.keep_client.api_token = ''
229         self.assertEqual(self.keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8"),
230                          'foo',
231                          'wrong content from Keep.get(md5("foo"))')
232
233
234 class KeepProxyTestCase(run_test_server.TestCaseWithServers):
235     MAIN_SERVER = {}
236     KEEP_SERVER = {}
237     KEEP_PROXY_SERVER = {}
238
239     @classmethod
240     def setUpClass(cls):
241         super(KeepProxyTestCase, cls).setUpClass()
242         run_test_server.authorize_with('active')
243         cls.api_client = arvados.api('v1')
244
245     def tearDown(self):
246         arvados.config.settings().pop('ARVADOS_EXTERNAL_CLIENT', None)
247         super(KeepProxyTestCase, self).tearDown()
248
249     def test_KeepProxyTest1(self):
250         # Will use ARVADOS_KEEP_SERVICES environment variable that
251         # is set by setUpClass().
252         keep_client = arvados.KeepClient(api_client=self.api_client,
253                                          local_store='')
254         baz_locator = keep_client.put('baz')
255         self.assertRegexpMatches(
256             baz_locator,
257             '^73feffa4b7f6bb68e44cf984c85f6e88\+3',
258             'wrong md5 hash from Keep.put("baz"): ' + baz_locator)
259         self.assertEqual(keep_client.get(baz_locator),
260                          'baz',
261                          'wrong content from Keep.get(md5("baz"))')
262         self.assertTrue(keep_client.using_proxy)
263
264     def test_KeepProxyTest2(self):
265         # Don't instantiate the proxy directly, but set the X-External-Client
266         # header.  The API server should direct us to the proxy.
267         arvados.config.settings()['ARVADOS_EXTERNAL_CLIENT'] = 'true'
268         keep_client = arvados.KeepClient(api_client=self.api_client,
269                                          proxy='', local_store='')
270         baz_locator = keep_client.put('baz2')
271         self.assertRegexpMatches(
272             baz_locator,
273             '^91f372a266fe2bf2823cb8ec7fda31ce\+4',
274             'wrong md5 hash from Keep.put("baz2"): ' + baz_locator)
275         self.assertEqual(keep_client.get(baz_locator),
276                          'baz2',
277                          'wrong content from Keep.get(md5("baz2"))')
278         self.assertTrue(keep_client.using_proxy)
279
280     def test_KeepProxyTestMultipleURIs(self):
281         # Test using ARVADOS_KEEP_SERVICES env var overriding any
282         # existing proxy setting and setting multiple proxies
283         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
284         keep_client = arvados.KeepClient(api_client=self.api_client,
285                                          local_store='')
286         uris = [x['_service_root'] for x in keep_client._keep_services]
287         self.assertEqual(uris, ['http://10.0.0.1/',
288                                 'https://foo.example.org:1234/'])
289
290     def test_KeepProxyTestInvalidURI(self):
291         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org'
292         with self.assertRaises(arvados.errors.ArgumentError):
293             keep_client = arvados.KeepClient(api_client=self.api_client,
294                                              local_store='')
295
296
297 class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
298     def get_service_roots(self, api_client):
299         keep_client = arvados.KeepClient(api_client=api_client)
300         services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
301         return [urllib.parse.urlparse(url) for url in sorted(services)]
302
303     def test_ssl_flag_respected_in_roots(self):
304         for ssl_flag in [False, True]:
305             services = self.get_service_roots(self.mock_keep_services(
306                 service_ssl_flag=ssl_flag))
307             self.assertEqual(
308                 ('https' if ssl_flag else 'http'), services[0].scheme)
309
310     def test_correct_ports_with_ipv6_addresses(self):
311         service = self.get_service_roots(self.mock_keep_services(
312             service_type='proxy', service_host='100::1', service_port=10, count=1))[0]
313         self.assertEqual('100::1', service.hostname)
314         self.assertEqual(10, service.port)
315
316     # test_*_timeout verify that KeepClient instructs pycurl to use
317     # the appropriate connection and read timeouts. They don't care
318     # whether pycurl actually exhibits the expected timeout behavior
319     # -- those tests are in the KeepClientTimeout test class.
320
321     def test_get_timeout(self):
322         api_client = self.mock_keep_services(count=1)
323         force_timeout = socket.timeout("timed out")
324         with tutil.mock_keep_responses(force_timeout, 0) as mock:
325             keep_client = arvados.KeepClient(api_client=api_client)
326             with self.assertRaises(arvados.errors.KeepReadError):
327                 keep_client.get('ffffffffffffffffffffffffffffffff')
328             self.assertEqual(
329                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
330                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
331             self.assertEqual(
332                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
333                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
334             self.assertEqual(
335                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
336                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
337
338     def test_put_timeout(self):
339         api_client = self.mock_keep_services(count=1)
340         force_timeout = socket.timeout("timed out")
341         with tutil.mock_keep_responses(force_timeout, 0) as mock:
342             keep_client = arvados.KeepClient(api_client=api_client)
343             with self.assertRaises(arvados.errors.KeepWriteError):
344                 keep_client.put('foo')
345             self.assertEqual(
346                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
347                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
348             self.assertEqual(
349                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
350                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
351             self.assertEqual(
352                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
353                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
354
355     def test_head_timeout(self):
356         api_client = self.mock_keep_services(count=1)
357         force_timeout = socket.timeout("timed out")
358         with tutil.mock_keep_responses(force_timeout, 0) as mock:
359             keep_client = arvados.KeepClient(api_client=api_client)
360             with self.assertRaises(arvados.errors.KeepReadError):
361                 keep_client.head('ffffffffffffffffffffffffffffffff')
362             self.assertEqual(
363                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
364                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
365             self.assertEqual(
366                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
367                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
368             self.assertEqual(
369                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
370                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
371
372     def test_proxy_get_timeout(self):
373         api_client = self.mock_keep_services(service_type='proxy', count=1)
374         force_timeout = socket.timeout("timed out")
375         with tutil.mock_keep_responses(force_timeout, 0) as mock:
376             keep_client = arvados.KeepClient(api_client=api_client)
377             with self.assertRaises(arvados.errors.KeepReadError):
378                 keep_client.get('ffffffffffffffffffffffffffffffff')
379             self.assertEqual(
380                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
381                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
382             self.assertEqual(
383                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
384                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
385             self.assertEqual(
386                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
387                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
388
389     def test_proxy_head_timeout(self):
390         api_client = self.mock_keep_services(service_type='proxy', count=1)
391         force_timeout = socket.timeout("timed out")
392         with tutil.mock_keep_responses(force_timeout, 0) as mock:
393             keep_client = arvados.KeepClient(api_client=api_client)
394             with self.assertRaises(arvados.errors.KeepReadError):
395                 keep_client.head('ffffffffffffffffffffffffffffffff')
396             self.assertEqual(
397                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
398                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
399             self.assertEqual(
400                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
401                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
402             self.assertEqual(
403                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
404                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
405
406     def test_proxy_put_timeout(self):
407         api_client = self.mock_keep_services(service_type='proxy', count=1)
408         force_timeout = socket.timeout("timed out")
409         with tutil.mock_keep_responses(force_timeout, 0) as mock:
410             keep_client = arvados.KeepClient(api_client=api_client)
411             with self.assertRaises(arvados.errors.KeepWriteError):
412                 keep_client.put('foo')
413             self.assertEqual(
414                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
415                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
416             self.assertEqual(
417                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
418                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
419             self.assertEqual(
420                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
421                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
422
423     def check_no_services_error(self, verb, exc_class):
424         api_client = mock.MagicMock(name='api_client')
425         api_client.keep_services().accessible().execute.side_effect = (
426             arvados.errors.ApiError)
427         keep_client = arvados.KeepClient(api_client=api_client)
428         with self.assertRaises(exc_class) as err_check:
429             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
430         self.assertEqual(0, len(err_check.exception.request_errors()))
431
432     def test_get_error_with_no_services(self):
433         self.check_no_services_error('get', arvados.errors.KeepReadError)
434
435     def test_head_error_with_no_services(self):
436         self.check_no_services_error('head', arvados.errors.KeepReadError)
437
438     def test_put_error_with_no_services(self):
439         self.check_no_services_error('put', arvados.errors.KeepWriteError)
440
441     def check_errors_from_last_retry(self, verb, exc_class):
442         api_client = self.mock_keep_services(count=2)
443         req_mock = tutil.mock_keep_responses(
444             "retry error reporting test", 500, 500, 403, 403)
445         with req_mock, tutil.skip_sleep, \
446                 self.assertRaises(exc_class) as err_check:
447             keep_client = arvados.KeepClient(api_client=api_client)
448             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
449                                        num_retries=3)
450         self.assertEqual([403, 403], [
451                 getattr(error, 'status_code', None)
452                 for error in err_check.exception.request_errors().values()])
453
454     def test_get_error_reflects_last_retry(self):
455         self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
456
457     def test_head_error_reflects_last_retry(self):
458         self.check_errors_from_last_retry('head', arvados.errors.KeepReadError)
459
460     def test_put_error_reflects_last_retry(self):
461         self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
462
463     def test_put_error_does_not_include_successful_puts(self):
464         data = 'partial failure test'
465         data_loc = tutil.str_keep_locator(data)
466         api_client = self.mock_keep_services(count=3)
467         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
468                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
469             keep_client = arvados.KeepClient(api_client=api_client)
470             keep_client.put(data)
471         self.assertEqual(2, len(exc_check.exception.request_errors()))
472
473     def test_proxy_put_with_no_writable_services(self):
474         data = 'test with no writable services'
475         data_loc = tutil.str_keep_locator(data)
476         api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
477         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
478                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
479           keep_client = arvados.KeepClient(api_client=api_client)
480           keep_client.put(data)
481         self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
482         self.assertEqual(0, len(exc_check.exception.request_errors()))
483
484     def test_oddball_service_get(self):
485         body = 'oddball service get'
486         api_client = self.mock_keep_services(service_type='fancynewblobstore')
487         with tutil.mock_keep_responses(body, 200):
488             keep_client = arvados.KeepClient(api_client=api_client)
489             actual = keep_client.get(tutil.str_keep_locator(body))
490         self.assertEqual(body, actual)
491
492     def test_oddball_service_put(self):
493         body = 'oddball service put'
494         pdh = tutil.str_keep_locator(body)
495         api_client = self.mock_keep_services(service_type='fancynewblobstore')
496         with tutil.mock_keep_responses(pdh, 200):
497             keep_client = arvados.KeepClient(api_client=api_client)
498             actual = keep_client.put(body, copies=1)
499         self.assertEqual(pdh, actual)
500
501     def test_oddball_service_writer_count(self):
502         body = 'oddball service writer count'
503         pdh = tutil.str_keep_locator(body)
504         api_client = self.mock_keep_services(service_type='fancynewblobstore',
505                                              count=4)
506         headers = {'x-keep-replicas-stored': 3}
507         with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
508                                        **headers) as req_mock:
509             keep_client = arvados.KeepClient(api_client=api_client)
510             actual = keep_client.put(body, copies=2)
511         self.assertEqual(pdh, actual)
512         self.assertEqual(1, req_mock.call_count)
513
514
515 @tutil.skip_sleep
516 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
517
518     def setUp(self):
519         # expected_order[i] is the probe order for
520         # hash=md5(sprintf("%064x",i)) where there are 16 services
521         # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
522         # the first probe for the block consisting of 64 "0"
523         # characters is the service whose uuid is
524         # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
525         self.services = 16
526         self.expected_order = [
527             list('3eab2d5fc9681074'),
528             list('097dba52e648f1c3'),
529             list('c5b4e023f8a7d691'),
530             list('9d81c02e76a3bf54'),
531             ]
532         self.blocks = [
533             "{:064x}".format(x)
534             for x in range(len(self.expected_order))]
535         self.hashes = [
536             hashlib.md5(self.blocks[x]).hexdigest()
537             for x in range(len(self.expected_order))]
538         self.api_client = self.mock_keep_services(count=self.services)
539         self.keep_client = arvados.KeepClient(api_client=self.api_client)
540
541     def test_weighted_service_roots_against_reference_set(self):
542         # Confirm weighted_service_roots() returns the correct order
543         for i, hash in enumerate(self.hashes):
544             roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash))
545             got_order = [
546                 re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
547                 for root in roots]
548             self.assertEqual(self.expected_order[i], got_order)
549
550     def test_get_probe_order_against_reference_set(self):
551         self._test_probe_order_against_reference_set(
552             lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
553
554     def test_head_probe_order_against_reference_set(self):
555         self._test_probe_order_against_reference_set(
556             lambda i: self.keep_client.head(self.hashes[i], num_retries=1))
557
558     def test_put_probe_order_against_reference_set(self):
559         # copies=1 prevents the test from being sensitive to races
560         # between writer threads.
561         self._test_probe_order_against_reference_set(
562             lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1))
563
564     def _test_probe_order_against_reference_set(self, op):
565         for i in range(len(self.blocks)):
566             with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \
567                  self.assertRaises(arvados.errors.KeepRequestError):
568                 op(i)
569             got_order = [
570                 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL)).group(1)
571                 for resp in mock.responses]
572             self.assertEqual(self.expected_order[i]*2, got_order)
573
574     def test_put_probe_order_multiple_copies(self):
575         for copies in range(2, 4):
576             for i in range(len(self.blocks)):
577                 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \
578                      self.assertRaises(arvados.errors.KeepWriteError):
579                     self.keep_client.put(self.blocks[i], num_retries=2, copies=copies)
580                 got_order = [
581                     re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL)).group(1)
582                     for resp in mock.responses]
583                 # With T threads racing to make requests, the position
584                 # of a given server in the sequence of HTTP requests
585                 # (got_order) cannot be more than T-1 positions
586                 # earlier than that server's position in the reference
587                 # probe sequence (expected_order).
588                 #
589                 # Loop invariant: we have accounted for +pos+ expected
590                 # probes, either by seeing them in +got_order+ or by
591                 # putting them in +pending+ in the hope of seeing them
592                 # later. As long as +len(pending)<T+, we haven't
593                 # started a request too early.
594                 pending = []
595                 for pos, expected in enumerate(self.expected_order[i]*3):
596                     got = got_order[pos-len(pending)]
597                     while got in pending:
598                         del pending[pending.index(got)]
599                         got = got_order[pos-len(pending)]
600                     if got != expected:
601                         pending.append(expected)
602                         self.assertLess(
603                             len(pending), copies,
604                             "pending={}, with copies={}, got {}, expected {}".format(
605                                 pending, copies, repr(got_order), repr(self.expected_order[i]*3)))
606
607     def test_probe_waste_adding_one_server(self):
608         hashes = [
609             hashlib.md5("{:064x}".format(x)).hexdigest() for x in range(100)]
610         initial_services = 12
611         self.api_client = self.mock_keep_services(count=initial_services)
612         self.keep_client = arvados.KeepClient(api_client=self.api_client)
613         probes_before = [
614             self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
615         for added_services in range(1, 12):
616             api_client = self.mock_keep_services(count=initial_services+added_services)
617             keep_client = arvados.KeepClient(api_client=api_client)
618             total_penalty = 0
619             for hash_index in range(len(hashes)):
620                 probe_after = keep_client.weighted_service_roots(
621                     arvados.KeepLocator(hashes[hash_index]))
622                 penalty = probe_after.index(probes_before[hash_index][0])
623                 self.assertLessEqual(penalty, added_services)
624                 total_penalty += penalty
625             # Average penalty per block should not exceed
626             # N(added)/N(orig) by more than 20%, and should get closer
627             # to the ideal as we add data points.
628             expect_penalty = (
629                 added_services *
630                 len(hashes) / initial_services)
631             max_penalty = (
632                 expect_penalty *
633                 (120 - added_services)/100)
634             min_penalty = (
635                 expect_penalty * 8/10)
636             self.assertTrue(
637                 min_penalty <= total_penalty <= max_penalty,
638                 "With {}+{} services, {} blocks, penalty {} but expected {}..{}".format(
639                     initial_services,
640                     added_services,
641                     len(hashes),
642                     total_penalty,
643                     min_penalty,
644                     max_penalty))
645
646     def check_64_zeros_error_order(self, verb, exc_class):
647         data = '0' * 64
648         if verb == 'get':
649             data = tutil.str_keep_locator(data)
650         # Arbitrary port number:
651         aport = random.randint(1024,65535)
652         api_client = self.mock_keep_services(service_port=aport, count=self.services)
653         keep_client = arvados.KeepClient(api_client=api_client)
654         with mock.patch('pycurl.Curl') as curl_mock, \
655              self.assertRaises(exc_class) as err_check:
656             curl_mock.return_value.side_effect = socket.timeout
657             getattr(keep_client, verb)(data)
658         urls = [urllib.parse.urlparse(url)
659                 for url in err_check.exception.request_errors()]
660         self.assertEqual([('keep0x' + c, aport) for c in '3eab2d5fc9681074'],
661                          [(url.hostname, url.port) for url in urls])
662
663     def test_get_error_shows_probe_order(self):
664         self.check_64_zeros_error_order('get', arvados.errors.KeepReadError)
665
666     def test_put_error_shows_probe_order(self):
667         self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
668
669
670 class KeepClientTimeout(unittest.TestCase, tutil.ApiClientMock):
671     # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
672     # 1s worth of data and then trigger bandwidth errors before running
673     # out of data.
674     DATA = 'x'*2**11
675     BANDWIDTH_LOW_LIM = 1024
676     TIMEOUT_TIME = 1.0
677
678     class assertTakesBetween(unittest.TestCase):
679         def __init__(self, tmin, tmax):
680             self.tmin = tmin
681             self.tmax = tmax
682
683         def __enter__(self):
684             self.t0 = time.time()
685
686         def __exit__(self, *args, **kwargs):
687             # Round times to milliseconds, like CURL. Otherwise, we
688             # fail when CURL reaches a 1s timeout at 0.9998s.
689             delta = round(time.time() - self.t0, 3)
690             self.assertGreaterEqual(delta, self.tmin)
691             self.assertLessEqual(delta, self.tmax)
692
693     class assertTakesGreater(unittest.TestCase):
694         def __init__(self, tmin):
695             self.tmin = tmin
696
697         def __enter__(self):
698             self.t0 = time.time()
699
700         def __exit__(self, *args, **kwargs):
701             delta = round(time.time() - self.t0, 3)
702             self.assertGreaterEqual(delta, self.tmin)
703
704     def setUp(self):
705         sock = socket.socket()
706         sock.bind(('0.0.0.0', 0))
707         self.port = sock.getsockname()[1]
708         sock.close()
709         self.server = keepstub.Server(('0.0.0.0', self.port), keepstub.Handler)
710         self.thread = threading.Thread(target=self.server.serve_forever)
711         self.thread.daemon = True # Exit thread if main proc exits
712         self.thread.start()
713         self.api_client = self.mock_keep_services(
714             count=1,
715             service_host='localhost',
716             service_port=self.port,
717         )
718
719     def tearDown(self):
720         self.server.shutdown()
721
722     def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
723         return arvados.KeepClient(
724             api_client=self.api_client,
725             timeout=timeouts)
726
727     def test_timeout_slow_connect(self):
728         # Can't simulate TCP delays with our own socket. Leave our
729         # stub server running uselessly, and try to connect to an
730         # unroutable IP address instead.
731         self.api_client = self.mock_keep_services(
732             count=1,
733             service_host='240.0.0.0',
734         )
735         with self.assertTakesBetween(0.1, 0.5):
736             with self.assertRaises(arvados.errors.KeepWriteError):
737                 self.keepClient().put(self.DATA, copies=1, num_retries=0)
738
739     def test_low_bandwidth_no_delays_success(self):
740         self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
741         kc = self.keepClient()
742         loc = kc.put(self.DATA, copies=1, num_retries=0)
743         self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
744
745     def test_too_low_bandwidth_no_delays_failure(self):
746         # Check that lessening bandwidth corresponds to failing
747         kc = self.keepClient()
748         loc = kc.put(self.DATA, copies=1, num_retries=0)
749         self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
750         with self.assertTakesGreater(self.TIMEOUT_TIME):
751             with self.assertRaises(arvados.errors.KeepReadError) as e:
752                 kc.get(loc, num_retries=0)
753         with self.assertTakesGreater(self.TIMEOUT_TIME):
754             with self.assertRaises(arvados.errors.KeepWriteError):
755                 kc.put(self.DATA, copies=1, num_retries=0)
756
757     def test_low_bandwidth_with_server_response_delay_failure(self):
758         kc = self.keepClient()
759         loc = kc.put(self.DATA, copies=1, num_retries=0)
760         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
761         self.server.setdelays(response=self.TIMEOUT_TIME)
762         with self.assertTakesGreater(self.TIMEOUT_TIME):
763             with self.assertRaises(arvados.errors.KeepReadError) as e:
764                 kc.get(loc, num_retries=0)
765         with self.assertTakesGreater(self.TIMEOUT_TIME):
766             with self.assertRaises(arvados.errors.KeepWriteError):
767                 kc.put(self.DATA, copies=1, num_retries=0)
768         with self.assertTakesGreater(self.TIMEOUT_TIME):
769             with self.assertRaises(arvados.errors.KeepReadError) as e:
770                 kc.head(loc, num_retries=0)
771
772     def test_low_bandwidth_with_server_mid_delay_failure(self):
773         kc = self.keepClient()
774         loc = kc.put(self.DATA, copies=1, num_retries=0)
775         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
776         self.server.setdelays(mid_write=self.TIMEOUT_TIME, mid_read=self.TIMEOUT_TIME)
777         with self.assertTakesGreater(self.TIMEOUT_TIME):
778             with self.assertRaises(arvados.errors.KeepReadError) as e:
779                 kc.get(loc, num_retries=0)
780         with self.assertTakesGreater(self.TIMEOUT_TIME):
781             with self.assertRaises(arvados.errors.KeepWriteError):
782                 kc.put(self.DATA, copies=1, num_retries=0)
783
784     def test_timeout_slow_request(self):
785         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
786         self.server.setdelays(request=.2)
787         self._test_connect_timeout_under_200ms(loc)
788         self.server.setdelays(request=2)
789         self._test_response_timeout_under_2s(loc)
790
791     def test_timeout_slow_response(self):
792         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
793         self.server.setdelays(response=.2)
794         self._test_connect_timeout_under_200ms(loc)
795         self.server.setdelays(response=2)
796         self._test_response_timeout_under_2s(loc)
797
798     def test_timeout_slow_response_body(self):
799         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
800         self.server.setdelays(response_body=.2)
801         self._test_connect_timeout_under_200ms(loc)
802         self.server.setdelays(response_body=2)
803         self._test_response_timeout_under_2s(loc)
804
805     def _test_connect_timeout_under_200ms(self, loc):
806         # Allow 100ms to connect, then 1s for response. Everything
807         # should work, and everything should take at least 200ms to
808         # return.
809         kc = self.keepClient(timeouts=(.1, 1))
810         with self.assertTakesBetween(.2, .3):
811             kc.put(self.DATA, copies=1, num_retries=0)
812         with self.assertTakesBetween(.2, .3):
813             self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
814
815     def _test_response_timeout_under_2s(self, loc):
816         # Allow 10s to connect, then 1s for response. Nothing should
817         # work, and everything should take at least 1s to return.
818         kc = self.keepClient(timeouts=(10, 1))
819         with self.assertTakesBetween(1, 9):
820             with self.assertRaises(arvados.errors.KeepReadError):
821                 kc.get(loc, num_retries=0)
822         with self.assertTakesBetween(1, 9):
823             with self.assertRaises(arvados.errors.KeepWriteError):
824                 kc.put(self.DATA, copies=1, num_retries=0)
825
826
827 class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
828     def mock_disks_and_gateways(self, disks=3, gateways=1):
829         self.gateways = [{
830                 'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
831                 'owner_uuid': 'zzzzz-tpzed-000000000000000',
832                 'service_host': 'gatewayhost{}'.format(i),
833                 'service_port': 12345,
834                 'service_ssl_flag': True,
835                 'service_type': 'gateway:test',
836         } for i in range(gateways)]
837         self.gateway_roots = [
838             "https://{service_host}:{service_port}/".format(**gw)
839             for gw in self.gateways]
840         self.api_client = self.mock_keep_services(
841             count=disks, additional_services=self.gateways)
842         self.keepClient = arvados.KeepClient(api_client=self.api_client)
843
844     @mock.patch('pycurl.Curl')
845     def test_get_with_gateway_hint_first(self, MockCurl):
846         MockCurl.return_value = tutil.FakeCurl.make(
847             code=200, body='foo', headers={'Content-Length': 3})
848         self.mock_disks_and_gateways()
849         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@' + self.gateways[0]['uuid']
850         self.assertEqual('foo', self.keepClient.get(locator))
851         self.assertEqual(self.gateway_roots[0]+locator,
852                          MockCurl.return_value.getopt(pycurl.URL))
853         self.assertEqual(True, self.keepClient.head(locator))
854
855     @mock.patch('pycurl.Curl')
856     def test_get_with_gateway_hints_in_order(self, MockCurl):
857         gateways = 4
858         disks = 3
859         mocks = [
860             tutil.FakeCurl.make(code=404, body='')
861             for _ in range(gateways+disks)
862         ]
863         MockCurl.side_effect = tutil.queue_with(mocks)
864         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
865         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
866                            ['K@'+gw['uuid'] for gw in self.gateways])
867         with self.assertRaises(arvados.errors.NotFoundError):
868             self.keepClient.get(locator)
869         # Gateways are tried first, in the order given.
870         for i, root in enumerate(self.gateway_roots):
871             self.assertEqual(root+locator,
872                              mocks[i].getopt(pycurl.URL))
873         # Disk services are tried next.
874         for i in range(gateways, gateways+disks):
875             self.assertRegexpMatches(
876                 mocks[i].getopt(pycurl.URL),
877                 r'keep0x')
878
879     @mock.patch('pycurl.Curl')
880     def test_head_with_gateway_hints_in_order(self, MockCurl):
881         gateways = 4
882         disks = 3
883         mocks = [
884             tutil.FakeCurl.make(code=404, body='')
885             for _ in range(gateways+disks)
886         ]
887         MockCurl.side_effect = tutil.queue_with(mocks)
888         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
889         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
890                            ['K@'+gw['uuid'] for gw in self.gateways])
891         with self.assertRaises(arvados.errors.NotFoundError):
892             self.keepClient.head(locator)
893         # Gateways are tried first, in the order given.
894         for i, root in enumerate(self.gateway_roots):
895             self.assertEqual(root+locator,
896                              mocks[i].getopt(pycurl.URL))
897         # Disk services are tried next.
898         for i in range(gateways, gateways+disks):
899             self.assertRegexpMatches(
900                 mocks[i].getopt(pycurl.URL),
901                 r'keep0x')
902
903     @mock.patch('pycurl.Curl')
904     def test_get_with_remote_proxy_hint(self, MockCurl):
905         MockCurl.return_value = tutil.FakeCurl.make(
906             code=200, body='foo', headers={'Content-Length': 3})
907         self.mock_disks_and_gateways()
908         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
909         self.assertEqual('foo', self.keepClient.get(locator))
910         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
911                          MockCurl.return_value.getopt(pycurl.URL))
912
913     @mock.patch('pycurl.Curl')
914     def test_head_with_remote_proxy_hint(self, MockCurl):
915         MockCurl.return_value = tutil.FakeCurl.make(
916             code=200, body='foo', headers={'Content-Length': 3})
917         self.mock_disks_and_gateways()
918         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
919         self.assertEqual(True, self.keepClient.head(locator))
920         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
921                          MockCurl.return_value.getopt(pycurl.URL))
922
923
924 class KeepClientRetryTestMixin(object):
925     # Testing with a local Keep store won't exercise the retry behavior.
926     # Instead, our strategy is:
927     # * Create a client with one proxy specified (pointed at a black
928     #   hole), so there's no need to instantiate an API client, and
929     #   all HTTP requests come from one place.
930     # * Mock httplib's request method to provide simulated responses.
931     # This lets us test the retry logic extensively without relying on any
932     # supporting servers, and prevents side effects in case something hiccups.
933     # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
934     # run_method().
935     #
936     # Test classes must define TEST_PATCHER to a method that mocks
937     # out appropriate methods in the client.
938
939     PROXY_ADDR = 'http://[%s]:65535/' % (tutil.TEST_HOST,)
940     TEST_DATA = 'testdata'
941     TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
942
943     def setUp(self):
944         self.client_kwargs = {'proxy': self.PROXY_ADDR, 'local_store': ''}
945
946     def new_client(self, **caller_kwargs):
947         kwargs = self.client_kwargs.copy()
948         kwargs.update(caller_kwargs)
949         return arvados.KeepClient(**kwargs)
950
951     def run_method(self, *args, **kwargs):
952         raise NotImplementedError("test subclasses must define run_method")
953
954     def check_success(self, expected=None, *args, **kwargs):
955         if expected is None:
956             expected = self.DEFAULT_EXPECT
957         self.assertEqual(expected, self.run_method(*args, **kwargs))
958
959     def check_exception(self, error_class=None, *args, **kwargs):
960         if error_class is None:
961             error_class = self.DEFAULT_EXCEPTION
962         self.assertRaises(error_class, self.run_method, *args, **kwargs)
963
964     def test_immediate_success(self):
965         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 200):
966             self.check_success()
967
968     def test_retry_then_success(self):
969         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
970             self.check_success(num_retries=3)
971
972     def test_exception_then_success(self):
973         with self.TEST_PATCHER(self.DEFAULT_EXPECT, Exception('mock err'), 200):
974             self.check_success(num_retries=3)
975
976     def test_no_default_retry(self):
977         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
978             self.check_exception()
979
980     def test_no_retry_after_permanent_error(self):
981         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 403, 200):
982             self.check_exception(num_retries=3)
983
984     def test_error_after_retries_exhausted(self):
985         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 500, 200):
986             self.check_exception(num_retries=1)
987
988     def test_num_retries_instance_fallback(self):
989         self.client_kwargs['num_retries'] = 3
990         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
991             self.check_success()
992
993
994 @tutil.skip_sleep
995 class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
996     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
997     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
998     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
999     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1000
1001     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1002                    *args, **kwargs):
1003         return self.new_client().get(locator, *args, **kwargs)
1004
1005     def test_specific_exception_when_not_found(self):
1006         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1007             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1008
1009     def test_general_exception_with_mixed_errors(self):
1010         # get should raise a NotFoundError if no server returns the block,
1011         # and a high threshold of servers report that it's not found.
1012         # This test rigs up 50/50 disagreement between two servers, and
1013         # checks that it does not become a NotFoundError.
1014         client = self.new_client()
1015         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1016             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1017                 client.get(self.HINTED_LOCATOR)
1018             self.assertNotIsInstance(
1019                 exc_check.exception, arvados.errors.NotFoundError,
1020                 "mixed errors raised NotFoundError")
1021
1022     def test_hint_server_can_succeed_without_retries(self):
1023         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1024             self.check_success(locator=self.HINTED_LOCATOR)
1025
1026     def test_try_next_server_after_timeout(self):
1027         with tutil.mock_keep_responses(
1028                 (socket.timeout("timed out"), 200),
1029                 (self.DEFAULT_EXPECT, 200)):
1030             self.check_success(locator=self.HINTED_LOCATOR)
1031
1032     def test_retry_data_with_wrong_checksum(self):
1033         with tutil.mock_keep_responses(
1034                 ('baddata', 200),
1035                 (self.DEFAULT_EXPECT, 200)):
1036             self.check_success(locator=self.HINTED_LOCATOR)
1037
1038 @tutil.skip_sleep
1039 class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1040     DEFAULT_EXPECT = True
1041     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1042     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1043     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1044
1045     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1046                    *args, **kwargs):
1047         return self.new_client().head(locator, *args, **kwargs)
1048
1049     def test_specific_exception_when_not_found(self):
1050         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1051             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1052
1053     def test_general_exception_with_mixed_errors(self):
1054         # head should raise a NotFoundError if no server returns the block,
1055         # and a high threshold of servers report that it's not found.
1056         # This test rigs up 50/50 disagreement between two servers, and
1057         # checks that it does not become a NotFoundError.
1058         client = self.new_client()
1059         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1060             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1061                 client.head(self.HINTED_LOCATOR)
1062             self.assertNotIsInstance(
1063                 exc_check.exception, arvados.errors.NotFoundError,
1064                 "mixed errors raised NotFoundError")
1065
1066     def test_hint_server_can_succeed_without_retries(self):
1067         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1068             self.check_success(locator=self.HINTED_LOCATOR)
1069
1070     def test_try_next_server_after_timeout(self):
1071         with tutil.mock_keep_responses(
1072                 (socket.timeout("timed out"), 200),
1073                 (self.DEFAULT_EXPECT, 200)):
1074             self.check_success(locator=self.HINTED_LOCATOR)
1075
1076 @tutil.skip_sleep
1077 class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1078     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
1079     DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
1080     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1081
1082     def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
1083                    copies=1, *args, **kwargs):
1084         return self.new_client().put(data, copies, *args, **kwargs)
1085
1086     def test_do_not_send_multiple_copies_to_same_server(self):
1087         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200):
1088             self.check_exception(copies=2, num_retries=3)
1089
1090
1091 class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
1092
1093     class FakeKeepService(object):
1094         def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
1095             self.delay = delay
1096             self.will_succeed = will_succeed
1097             self.will_raise = will_raise
1098             self._result = {}
1099             self._result['headers'] = {}
1100             self._result['headers']['x-keep-replicas-stored'] = str(replicas)
1101             self._result['body'] = 'foobar'
1102
1103         def put(self, data_hash, data, timeout):
1104             time.sleep(self.delay)
1105             if self.will_raise is not None:
1106                 raise self.will_raise
1107             return self.will_succeed
1108
1109         def last_result(self):
1110             if self.will_succeed:
1111                 return self._result
1112
1113         def finished(self):
1114             return False
1115     
1116     def setUp(self):
1117         self.copies = 3
1118         self.pool = arvados.KeepClient.KeepWriterThreadPool(
1119             data = 'foo',
1120             data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1121             max_service_replicas = self.copies,
1122             copies = self.copies
1123         )
1124
1125     def test_only_write_enough_on_success(self):
1126         for i in range(10):
1127             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1128             self.pool.add_task(ks, None)
1129         self.pool.join()
1130         self.assertEqual(self.pool.done(), self.copies)
1131
1132     def test_only_write_enough_on_partial_success(self):
1133         for i in range(5):
1134             ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
1135             self.pool.add_task(ks, None)
1136             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1137             self.pool.add_task(ks, None)
1138         self.pool.join()
1139         self.assertEqual(self.pool.done(), self.copies)
1140
1141     def test_only_write_enough_when_some_crash(self):
1142         for i in range(5):
1143             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1144             self.pool.add_task(ks, None)
1145             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1146             self.pool.add_task(ks, None)
1147         self.pool.join()
1148         self.assertEqual(self.pool.done(), self.copies)
1149
1150     def test_fail_when_too_many_crash(self):
1151         for i in range(self.copies+1):
1152             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1153             self.pool.add_task(ks, None)
1154         for i in range(self.copies-1):
1155             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1156             self.pool.add_task(ks, None)
1157         self.pool.join()
1158         self.assertEqual(self.pool.done(), self.copies-1)
1159     
1160
1161 @tutil.skip_sleep
1162 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock):
1163     # Test put()s that need two distinct servers to succeed, possibly
1164     # requiring multiple passes through the retry loop.
1165
1166     def setUp(self):
1167         self.api_client = self.mock_keep_services(count=2)
1168         self.keep_client = arvados.KeepClient(api_client=self.api_client)
1169
1170     def test_success_after_exception(self):
1171         with tutil.mock_keep_responses(
1172                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1173                 Exception('mock err'), 200, 200) as req_mock:
1174             self.keep_client.put('foo', num_retries=1, copies=2)
1175         self.assertEqual(3, req_mock.call_count)
1176
1177     def test_success_after_retryable_error(self):
1178         with tutil.mock_keep_responses(
1179                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1180                 500, 200, 200) as req_mock:
1181             self.keep_client.put('foo', num_retries=1, copies=2)
1182         self.assertEqual(3, req_mock.call_count)
1183
1184     def test_fail_after_final_error(self):
1185         # First retry loop gets a 200 (can't achieve replication by
1186         # storing again on that server) and a 400 (can't retry that
1187         # server at all), so we shouldn't try a third request.
1188         with tutil.mock_keep_responses(
1189                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1190                 200, 400, 200) as req_mock:
1191             with self.assertRaises(arvados.errors.KeepWriteError):
1192                 self.keep_client.put('foo', num_retries=1, copies=2)
1193         self.assertEqual(2, req_mock.call_count)