11308: Futurize stage2.
[arvados.git] / sdk / python / tests / test_keep_client.py
1 from __future__ import absolute_import
2 from __future__ import division
3 from future import standard_library
4 standard_library.install_aliases()
5 from builtins import str
6 from builtins import range
7 from past.utils import old_div
8 from builtins import object
9 import hashlib
10 import mock
11 import os
12 import pycurl
13 import random
14 import re
15 import socket
16 import threading
17 import time
18 import unittest
19 import urllib.parse
20
21 import arvados
22 import arvados.retry
23 from . import arvados_testutil as tutil
24 from . import keepstub
25 from . import run_test_server
26
27 class KeepTestCase(run_test_server.TestCaseWithServers):
28     MAIN_SERVER = {}
29     KEEP_SERVER = {}
30
31     @classmethod
32     def setUpClass(cls):
33         super(KeepTestCase, cls).setUpClass()
34         run_test_server.authorize_with("admin")
35         cls.api_client = arvados.api('v1')
36         cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
37                                              proxy='', local_store='')
38
39     def test_KeepBasicRWTest(self):
40         self.assertEqual(0, self.keep_client.upload_counter.get())
41         foo_locator = self.keep_client.put('foo')
42         self.assertRegexpMatches(
43             foo_locator,
44             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
45             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
46
47         # 6 bytes because uploaded 2 copies
48         self.assertEqual(6, self.keep_client.upload_counter.get())
49
50         self.assertEqual(0, self.keep_client.download_counter.get())
51         self.assertEqual(self.keep_client.get(foo_locator),
52                          'foo',
53                          'wrong content from Keep.get(md5("foo"))')
54         self.assertEqual(3, self.keep_client.download_counter.get())
55
56     def test_KeepBinaryRWTest(self):
57         blob_str = '\xff\xfe\xf7\x00\x01\x02'
58         blob_locator = self.keep_client.put(blob_str)
59         self.assertRegexpMatches(
60             blob_locator,
61             '^7fc7c53b45e53926ba52821140fef396\+6',
62             ('wrong locator from Keep.put(<binarydata>):' + blob_locator))
63         self.assertEqual(self.keep_client.get(blob_locator),
64                          blob_str,
65                          'wrong content from Keep.get(md5(<binarydata>))')
66
67     def test_KeepLongBinaryRWTest(self):
68         blob_str = '\xff\xfe\xfd\xfc\x00\x01\x02\x03'
69         for i in range(0,23):
70             blob_str = blob_str + blob_str
71         blob_locator = self.keep_client.put(blob_str)
72         self.assertRegexpMatches(
73             blob_locator,
74             '^84d90fc0d8175dd5dcfab04b999bc956\+67108864',
75             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
76         self.assertEqual(self.keep_client.get(blob_locator),
77                          blob_str,
78                          'wrong content from Keep.get(md5(<binarydata>))')
79
80     @unittest.skip("unreliable test - please fix and close #8752")
81     def test_KeepSingleCopyRWTest(self):
82         blob_str = '\xff\xfe\xfd\xfc\x00\x01\x02\x03'
83         blob_locator = self.keep_client.put(blob_str, copies=1)
84         self.assertRegexpMatches(
85             blob_locator,
86             '^c902006bc98a3eb4a3663b65ab4a6fab\+8',
87             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
88         self.assertEqual(self.keep_client.get(blob_locator),
89                          blob_str,
90                          'wrong content from Keep.get(md5(<binarydata>))')
91
92     def test_KeepEmptyCollectionTest(self):
93         blob_locator = self.keep_client.put('', copies=1)
94         self.assertRegexpMatches(
95             blob_locator,
96             '^d41d8cd98f00b204e9800998ecf8427e\+0',
97             ('wrong locator from Keep.put(""): ' + blob_locator))
98
99     def test_unicode_must_be_ascii(self):
100         # If unicode type, must only consist of valid ASCII
101         foo_locator = self.keep_client.put(u'foo')
102         self.assertRegexpMatches(
103             foo_locator,
104             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
105             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
106
107         with self.assertRaises(UnicodeEncodeError):
108             # Error if it is not ASCII
109             self.keep_client.put(u'\xe2')
110
111         with self.assertRaises(arvados.errors.ArgumentError):
112             # Must be a string type
113             self.keep_client.put({})
114
115     def test_KeepHeadTest(self):
116         locator = self.keep_client.put('test_head')
117         self.assertRegexpMatches(
118             locator,
119             '^b9a772c7049325feb7130fff1f8333e9\+9',
120             'wrong md5 hash from Keep.put for "test_head": ' + locator)
121         self.assertEqual(True, self.keep_client.head(locator))
122         self.assertEqual(self.keep_client.get(locator),
123                          'test_head',
124                          'wrong content from Keep.get for "test_head"')
125
126 class KeepPermissionTestCase(run_test_server.TestCaseWithServers):
127     MAIN_SERVER = {}
128     KEEP_SERVER = {'blob_signing_key': 'abcdefghijk0123456789',
129                    'enforce_permissions': True}
130
131     def test_KeepBasicRWTest(self):
132         run_test_server.authorize_with('active')
133         keep_client = arvados.KeepClient()
134         foo_locator = keep_client.put('foo')
135         self.assertRegexpMatches(
136             foo_locator,
137             r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
138             'invalid locator from Keep.put("foo"): ' + foo_locator)
139         self.assertEqual(keep_client.get(foo_locator),
140                          'foo',
141                          'wrong content from Keep.get(md5("foo"))')
142
143         # GET with an unsigned locator => NotFound
144         bar_locator = keep_client.put('bar')
145         unsigned_bar_locator = "37b51d194a7513e45b56f6524f2d51f2+3"
146         self.assertRegexpMatches(
147             bar_locator,
148             r'^37b51d194a7513e45b56f6524f2d51f2\+3\+A[a-f0-9]+@[a-f0-9]+$',
149             'invalid locator from Keep.put("bar"): ' + bar_locator)
150         self.assertRaises(arvados.errors.NotFoundError,
151                           keep_client.get,
152                           unsigned_bar_locator)
153
154         # GET from a different user => NotFound
155         run_test_server.authorize_with('spectator')
156         self.assertRaises(arvados.errors.NotFoundError,
157                           arvados.Keep.get,
158                           bar_locator)
159
160         # Unauthenticated GET for a signed locator => NotFound
161         # Unauthenticated GET for an unsigned locator => NotFound
162         keep_client.api_token = ''
163         self.assertRaises(arvados.errors.NotFoundError,
164                           keep_client.get,
165                           bar_locator)
166         self.assertRaises(arvados.errors.NotFoundError,
167                           keep_client.get,
168                           unsigned_bar_locator)
169
170
171 # KeepOptionalPermission: starts Keep with --permission-key-file
172 # but not --enforce-permissions (i.e. generate signatures on PUT
173 # requests, but do not require them for GET requests)
174 #
175 # All of these requests should succeed when permissions are optional:
176 # * authenticated request, signed locator
177 # * authenticated request, unsigned locator
178 # * unauthenticated request, signed locator
179 # * unauthenticated request, unsigned locator
180 class KeepOptionalPermission(run_test_server.TestCaseWithServers):
181     MAIN_SERVER = {}
182     KEEP_SERVER = {'blob_signing_key': 'abcdefghijk0123456789',
183                    'enforce_permissions': False}
184
185     @classmethod
186     def setUpClass(cls):
187         super(KeepOptionalPermission, cls).setUpClass()
188         run_test_server.authorize_with("admin")
189         cls.api_client = arvados.api('v1')
190
191     def setUp(self):
192         super(KeepOptionalPermission, self).setUp()
193         self.keep_client = arvados.KeepClient(api_client=self.api_client,
194                                               proxy='', local_store='')
195
196     def _put_foo_and_check(self):
197         signed_locator = self.keep_client.put('foo')
198         self.assertRegexpMatches(
199             signed_locator,
200             r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
201             'invalid locator from Keep.put("foo"): ' + signed_locator)
202         return signed_locator
203
204     def test_KeepAuthenticatedSignedTest(self):
205         signed_locator = self._put_foo_and_check()
206         self.assertEqual(self.keep_client.get(signed_locator),
207                          'foo',
208                          'wrong content from Keep.get(md5("foo"))')
209
210     def test_KeepAuthenticatedUnsignedTest(self):
211         signed_locator = self._put_foo_and_check()
212         self.assertEqual(self.keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8"),
213                          'foo',
214                          'wrong content from Keep.get(md5("foo"))')
215
216     def test_KeepUnauthenticatedSignedTest(self):
217         # Check that signed GET requests work even when permissions
218         # enforcement is off.
219         signed_locator = self._put_foo_and_check()
220         self.keep_client.api_token = ''
221         self.assertEqual(self.keep_client.get(signed_locator),
222                          'foo',
223                          'wrong content from Keep.get(md5("foo"))')
224
225     def test_KeepUnauthenticatedUnsignedTest(self):
226         # Since --enforce-permissions is not in effect, GET requests
227         # need not be authenticated.
228         signed_locator = self._put_foo_and_check()
229         self.keep_client.api_token = ''
230         self.assertEqual(self.keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8"),
231                          'foo',
232                          'wrong content from Keep.get(md5("foo"))')
233
234
235 class KeepProxyTestCase(run_test_server.TestCaseWithServers):
236     MAIN_SERVER = {}
237     KEEP_SERVER = {}
238     KEEP_PROXY_SERVER = {}
239
240     @classmethod
241     def setUpClass(cls):
242         super(KeepProxyTestCase, cls).setUpClass()
243         run_test_server.authorize_with('active')
244         cls.api_client = arvados.api('v1')
245
246     def tearDown(self):
247         arvados.config.settings().pop('ARVADOS_EXTERNAL_CLIENT', None)
248         super(KeepProxyTestCase, self).tearDown()
249
250     def test_KeepProxyTest1(self):
251         # Will use ARVADOS_KEEP_SERVICES environment variable that
252         # is set by setUpClass().
253         keep_client = arvados.KeepClient(api_client=self.api_client,
254                                          local_store='')
255         baz_locator = keep_client.put('baz')
256         self.assertRegexpMatches(
257             baz_locator,
258             '^73feffa4b7f6bb68e44cf984c85f6e88\+3',
259             'wrong md5 hash from Keep.put("baz"): ' + baz_locator)
260         self.assertEqual(keep_client.get(baz_locator),
261                          'baz',
262                          'wrong content from Keep.get(md5("baz"))')
263         self.assertTrue(keep_client.using_proxy)
264
265     def test_KeepProxyTest2(self):
266         # Don't instantiate the proxy directly, but set the X-External-Client
267         # header.  The API server should direct us to the proxy.
268         arvados.config.settings()['ARVADOS_EXTERNAL_CLIENT'] = 'true'
269         keep_client = arvados.KeepClient(api_client=self.api_client,
270                                          proxy='', local_store='')
271         baz_locator = keep_client.put('baz2')
272         self.assertRegexpMatches(
273             baz_locator,
274             '^91f372a266fe2bf2823cb8ec7fda31ce\+4',
275             'wrong md5 hash from Keep.put("baz2"): ' + baz_locator)
276         self.assertEqual(keep_client.get(baz_locator),
277                          'baz2',
278                          'wrong content from Keep.get(md5("baz2"))')
279         self.assertTrue(keep_client.using_proxy)
280
281     def test_KeepProxyTestMultipleURIs(self):
282         # Test using ARVADOS_KEEP_SERVICES env var overriding any
283         # existing proxy setting and setting multiple proxies
284         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
285         keep_client = arvados.KeepClient(api_client=self.api_client,
286                                          local_store='')
287         uris = [x['_service_root'] for x in keep_client._keep_services]
288         self.assertEqual(uris, ['http://10.0.0.1/',
289                                 'https://foo.example.org:1234/'])
290
291     def test_KeepProxyTestInvalidURI(self):
292         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org'
293         with self.assertRaises(arvados.errors.ArgumentError):
294             keep_client = arvados.KeepClient(api_client=self.api_client,
295                                              local_store='')
296
297
298 class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
299     def get_service_roots(self, api_client):
300         keep_client = arvados.KeepClient(api_client=api_client)
301         services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
302         return [urllib.parse.urlparse(url) for url in sorted(services)]
303
304     def test_ssl_flag_respected_in_roots(self):
305         for ssl_flag in [False, True]:
306             services = self.get_service_roots(self.mock_keep_services(
307                 service_ssl_flag=ssl_flag))
308             self.assertEqual(
309                 ('https' if ssl_flag else 'http'), services[0].scheme)
310
311     def test_correct_ports_with_ipv6_addresses(self):
312         service = self.get_service_roots(self.mock_keep_services(
313             service_type='proxy', service_host='100::1', service_port=10, count=1))[0]
314         self.assertEqual('100::1', service.hostname)
315         self.assertEqual(10, service.port)
316
317     # test_*_timeout verify that KeepClient instructs pycurl to use
318     # the appropriate connection and read timeouts. They don't care
319     # whether pycurl actually exhibits the expected timeout behavior
320     # -- those tests are in the KeepClientTimeout test class.
321
322     def test_get_timeout(self):
323         api_client = self.mock_keep_services(count=1)
324         force_timeout = socket.timeout("timed out")
325         with tutil.mock_keep_responses(force_timeout, 0) as mock:
326             keep_client = arvados.KeepClient(api_client=api_client)
327             with self.assertRaises(arvados.errors.KeepReadError):
328                 keep_client.get('ffffffffffffffffffffffffffffffff')
329             self.assertEqual(
330                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
331                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
332             self.assertEqual(
333                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
334                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
335             self.assertEqual(
336                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
337                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
338
339     def test_put_timeout(self):
340         api_client = self.mock_keep_services(count=1)
341         force_timeout = socket.timeout("timed out")
342         with tutil.mock_keep_responses(force_timeout, 0) as mock:
343             keep_client = arvados.KeepClient(api_client=api_client)
344             with self.assertRaises(arvados.errors.KeepWriteError):
345                 keep_client.put('foo')
346             self.assertEqual(
347                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
348                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
349             self.assertEqual(
350                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
351                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
352             self.assertEqual(
353                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
354                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
355
356     def test_head_timeout(self):
357         api_client = self.mock_keep_services(count=1)
358         force_timeout = socket.timeout("timed out")
359         with tutil.mock_keep_responses(force_timeout, 0) as mock:
360             keep_client = arvados.KeepClient(api_client=api_client)
361             with self.assertRaises(arvados.errors.KeepReadError):
362                 keep_client.head('ffffffffffffffffffffffffffffffff')
363             self.assertEqual(
364                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
365                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
366             self.assertEqual(
367                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
368                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
369             self.assertEqual(
370                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
371                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
372
373     def test_proxy_get_timeout(self):
374         api_client = self.mock_keep_services(service_type='proxy', count=1)
375         force_timeout = socket.timeout("timed out")
376         with tutil.mock_keep_responses(force_timeout, 0) as mock:
377             keep_client = arvados.KeepClient(api_client=api_client)
378             with self.assertRaises(arvados.errors.KeepReadError):
379                 keep_client.get('ffffffffffffffffffffffffffffffff')
380             self.assertEqual(
381                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
382                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
383             self.assertEqual(
384                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
385                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
386             self.assertEqual(
387                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
388                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
389
390     def test_proxy_head_timeout(self):
391         api_client = self.mock_keep_services(service_type='proxy', count=1)
392         force_timeout = socket.timeout("timed out")
393         with tutil.mock_keep_responses(force_timeout, 0) as mock:
394             keep_client = arvados.KeepClient(api_client=api_client)
395             with self.assertRaises(arvados.errors.KeepReadError):
396                 keep_client.head('ffffffffffffffffffffffffffffffff')
397             self.assertEqual(
398                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
399                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
400             self.assertEqual(
401                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
402                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
403             self.assertEqual(
404                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
405                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
406
407     def test_proxy_put_timeout(self):
408         api_client = self.mock_keep_services(service_type='proxy', count=1)
409         force_timeout = socket.timeout("timed out")
410         with tutil.mock_keep_responses(force_timeout, 0) as mock:
411             keep_client = arvados.KeepClient(api_client=api_client)
412             with self.assertRaises(arvados.errors.KeepWriteError):
413                 keep_client.put('foo')
414             self.assertEqual(
415                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
416                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
417             self.assertEqual(
418                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
419                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
420             self.assertEqual(
421                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
422                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
423
424     def check_no_services_error(self, verb, exc_class):
425         api_client = mock.MagicMock(name='api_client')
426         api_client.keep_services().accessible().execute.side_effect = (
427             arvados.errors.ApiError)
428         keep_client = arvados.KeepClient(api_client=api_client)
429         with self.assertRaises(exc_class) as err_check:
430             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
431         self.assertEqual(0, len(err_check.exception.request_errors()))
432
433     def test_get_error_with_no_services(self):
434         self.check_no_services_error('get', arvados.errors.KeepReadError)
435
436     def test_head_error_with_no_services(self):
437         self.check_no_services_error('head', arvados.errors.KeepReadError)
438
439     def test_put_error_with_no_services(self):
440         self.check_no_services_error('put', arvados.errors.KeepWriteError)
441
442     def check_errors_from_last_retry(self, verb, exc_class):
443         api_client = self.mock_keep_services(count=2)
444         req_mock = tutil.mock_keep_responses(
445             "retry error reporting test", 500, 500, 403, 403)
446         with req_mock, tutil.skip_sleep, \
447                 self.assertRaises(exc_class) as err_check:
448             keep_client = arvados.KeepClient(api_client=api_client)
449             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
450                                        num_retries=3)
451         self.assertEqual([403, 403], [
452                 getattr(error, 'status_code', None)
453                 for error in err_check.exception.request_errors().values()])
454
455     def test_get_error_reflects_last_retry(self):
456         self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
457
458     def test_head_error_reflects_last_retry(self):
459         self.check_errors_from_last_retry('head', arvados.errors.KeepReadError)
460
461     def test_put_error_reflects_last_retry(self):
462         self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
463
464     def test_put_error_does_not_include_successful_puts(self):
465         data = 'partial failure test'
466         data_loc = tutil.str_keep_locator(data)
467         api_client = self.mock_keep_services(count=3)
468         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
469                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
470             keep_client = arvados.KeepClient(api_client=api_client)
471             keep_client.put(data)
472         self.assertEqual(2, len(exc_check.exception.request_errors()))
473
474     def test_proxy_put_with_no_writable_services(self):
475         data = 'test with no writable services'
476         data_loc = tutil.str_keep_locator(data)
477         api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
478         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
479                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
480           keep_client = arvados.KeepClient(api_client=api_client)
481           keep_client.put(data)
482         self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
483         self.assertEqual(0, len(exc_check.exception.request_errors()))
484
485     def test_oddball_service_get(self):
486         body = 'oddball service get'
487         api_client = self.mock_keep_services(service_type='fancynewblobstore')
488         with tutil.mock_keep_responses(body, 200):
489             keep_client = arvados.KeepClient(api_client=api_client)
490             actual = keep_client.get(tutil.str_keep_locator(body))
491         self.assertEqual(body, actual)
492
493     def test_oddball_service_put(self):
494         body = 'oddball service put'
495         pdh = tutil.str_keep_locator(body)
496         api_client = self.mock_keep_services(service_type='fancynewblobstore')
497         with tutil.mock_keep_responses(pdh, 200):
498             keep_client = arvados.KeepClient(api_client=api_client)
499             actual = keep_client.put(body, copies=1)
500         self.assertEqual(pdh, actual)
501
502     def test_oddball_service_writer_count(self):
503         body = 'oddball service writer count'
504         pdh = tutil.str_keep_locator(body)
505         api_client = self.mock_keep_services(service_type='fancynewblobstore',
506                                              count=4)
507         headers = {'x-keep-replicas-stored': 3}
508         with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
509                                        **headers) as req_mock:
510             keep_client = arvados.KeepClient(api_client=api_client)
511             actual = keep_client.put(body, copies=2)
512         self.assertEqual(pdh, actual)
513         self.assertEqual(1, req_mock.call_count)
514
515
516 @tutil.skip_sleep
517 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
518
519     def setUp(self):
520         # expected_order[i] is the probe order for
521         # hash=md5(sprintf("%064x",i)) where there are 16 services
522         # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
523         # the first probe for the block consisting of 64 "0"
524         # characters is the service whose uuid is
525         # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
526         self.services = 16
527         self.expected_order = [
528             list('3eab2d5fc9681074'),
529             list('097dba52e648f1c3'),
530             list('c5b4e023f8a7d691'),
531             list('9d81c02e76a3bf54'),
532             ]
533         self.blocks = [
534             "{:064x}".format(x)
535             for x in range(len(self.expected_order))]
536         self.hashes = [
537             hashlib.md5(self.blocks[x]).hexdigest()
538             for x in range(len(self.expected_order))]
539         self.api_client = self.mock_keep_services(count=self.services)
540         self.keep_client = arvados.KeepClient(api_client=self.api_client)
541
542     def test_weighted_service_roots_against_reference_set(self):
543         # Confirm weighted_service_roots() returns the correct order
544         for i, hash in enumerate(self.hashes):
545             roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash))
546             got_order = [
547                 re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
548                 for root in roots]
549             self.assertEqual(self.expected_order[i], got_order)
550
551     def test_get_probe_order_against_reference_set(self):
552         self._test_probe_order_against_reference_set(
553             lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
554
555     def test_head_probe_order_against_reference_set(self):
556         self._test_probe_order_against_reference_set(
557             lambda i: self.keep_client.head(self.hashes[i], num_retries=1))
558
559     def test_put_probe_order_against_reference_set(self):
560         # copies=1 prevents the test from being sensitive to races
561         # between writer threads.
562         self._test_probe_order_against_reference_set(
563             lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1))
564
565     def _test_probe_order_against_reference_set(self, op):
566         for i in range(len(self.blocks)):
567             with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \
568                  self.assertRaises(arvados.errors.KeepRequestError):
569                 op(i)
570             got_order = [
571                 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL)).group(1)
572                 for resp in mock.responses]
573             self.assertEqual(self.expected_order[i]*2, got_order)
574
575     def test_put_probe_order_multiple_copies(self):
576         for copies in range(2, 4):
577             for i in range(len(self.blocks)):
578                 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \
579                      self.assertRaises(arvados.errors.KeepWriteError):
580                     self.keep_client.put(self.blocks[i], num_retries=2, copies=copies)
581                 got_order = [
582                     re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL)).group(1)
583                     for resp in mock.responses]
584                 # With T threads racing to make requests, the position
585                 # of a given server in the sequence of HTTP requests
586                 # (got_order) cannot be more than T-1 positions
587                 # earlier than that server's position in the reference
588                 # probe sequence (expected_order).
589                 #
590                 # Loop invariant: we have accounted for +pos+ expected
591                 # probes, either by seeing them in +got_order+ or by
592                 # putting them in +pending+ in the hope of seeing them
593                 # later. As long as +len(pending)<T+, we haven't
594                 # started a request too early.
595                 pending = []
596                 for pos, expected in enumerate(self.expected_order[i]*3):
597                     got = got_order[pos-len(pending)]
598                     while got in pending:
599                         del pending[pending.index(got)]
600                         got = got_order[pos-len(pending)]
601                     if got != expected:
602                         pending.append(expected)
603                         self.assertLess(
604                             len(pending), copies,
605                             "pending={}, with copies={}, got {}, expected {}".format(
606                                 pending, copies, repr(got_order), repr(self.expected_order[i]*3)))
607
608     def test_probe_waste_adding_one_server(self):
609         hashes = [
610             hashlib.md5("{:064x}".format(x)).hexdigest() for x in range(100)]
611         initial_services = 12
612         self.api_client = self.mock_keep_services(count=initial_services)
613         self.keep_client = arvados.KeepClient(api_client=self.api_client)
614         probes_before = [
615             self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
616         for added_services in range(1, 12):
617             api_client = self.mock_keep_services(count=initial_services+added_services)
618             keep_client = arvados.KeepClient(api_client=api_client)
619             total_penalty = 0
620             for hash_index in range(len(hashes)):
621                 probe_after = keep_client.weighted_service_roots(
622                     arvados.KeepLocator(hashes[hash_index]))
623                 penalty = probe_after.index(probes_before[hash_index][0])
624                 self.assertLessEqual(penalty, added_services)
625                 total_penalty += penalty
626             # Average penalty per block should not exceed
627             # N(added)/N(orig) by more than 20%, and should get closer
628             # to the ideal as we add data points.
629             expect_penalty = (
630                 added_services *
631                 len(hashes) / initial_services)
632             max_penalty = (
633                 expect_penalty *
634                 (120 - added_services)/100)
635             min_penalty = (
636                 expect_penalty * 8/10)
637             self.assertTrue(
638                 min_penalty <= total_penalty <= max_penalty,
639                 "With {}+{} services, {} blocks, penalty {} but expected {}..{}".format(
640                     initial_services,
641                     added_services,
642                     len(hashes),
643                     total_penalty,
644                     min_penalty,
645                     max_penalty))
646
647     def check_64_zeros_error_order(self, verb, exc_class):
648         data = '0' * 64
649         if verb == 'get':
650             data = tutil.str_keep_locator(data)
651         # Arbitrary port number:
652         aport = random.randint(1024,65535)
653         api_client = self.mock_keep_services(service_port=aport, count=self.services)
654         keep_client = arvados.KeepClient(api_client=api_client)
655         with mock.patch('pycurl.Curl') as curl_mock, \
656              self.assertRaises(exc_class) as err_check:
657             curl_mock.return_value.side_effect = socket.timeout
658             getattr(keep_client, verb)(data)
659         urls = [urllib.parse.urlparse(url)
660                 for url in err_check.exception.request_errors()]
661         self.assertEqual([('keep0x' + c, aport) for c in '3eab2d5fc9681074'],
662                          [(url.hostname, url.port) for url in urls])
663
664     def test_get_error_shows_probe_order(self):
665         self.check_64_zeros_error_order('get', arvados.errors.KeepReadError)
666
667     def test_put_error_shows_probe_order(self):
668         self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
669
670
671 class KeepClientTimeout(unittest.TestCase, tutil.ApiClientMock):
672     # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
673     # 1s worth of data and then trigger bandwidth errors before running
674     # out of data.
675     DATA = 'x'*2**11
676     BANDWIDTH_LOW_LIM = 1024
677     TIMEOUT_TIME = 1.0
678
679     class assertTakesBetween(unittest.TestCase):
680         def __init__(self, tmin, tmax):
681             self.tmin = tmin
682             self.tmax = tmax
683
684         def __enter__(self):
685             self.t0 = time.time()
686
687         def __exit__(self, *args, **kwargs):
688             # Round times to milliseconds, like CURL. Otherwise, we
689             # fail when CURL reaches a 1s timeout at 0.9998s.
690             delta = round(time.time() - self.t0, 3)
691             self.assertGreaterEqual(delta, self.tmin)
692             self.assertLessEqual(delta, self.tmax)
693
694     class assertTakesGreater(unittest.TestCase):
695         def __init__(self, tmin):
696             self.tmin = tmin
697
698         def __enter__(self):
699             self.t0 = time.time()
700
701         def __exit__(self, *args, **kwargs):
702             delta = round(time.time() - self.t0, 3)
703             self.assertGreaterEqual(delta, self.tmin)
704
705     def setUp(self):
706         sock = socket.socket()
707         sock.bind(('0.0.0.0', 0))
708         self.port = sock.getsockname()[1]
709         sock.close()
710         self.server = keepstub.Server(('0.0.0.0', self.port), keepstub.Handler)
711         self.thread = threading.Thread(target=self.server.serve_forever)
712         self.thread.daemon = True # Exit thread if main proc exits
713         self.thread.start()
714         self.api_client = self.mock_keep_services(
715             count=1,
716             service_host='localhost',
717             service_port=self.port,
718         )
719
720     def tearDown(self):
721         self.server.shutdown()
722
723     def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
724         return arvados.KeepClient(
725             api_client=self.api_client,
726             timeout=timeouts)
727
728     def test_timeout_slow_connect(self):
729         # Can't simulate TCP delays with our own socket. Leave our
730         # stub server running uselessly, and try to connect to an
731         # unroutable IP address instead.
732         self.api_client = self.mock_keep_services(
733             count=1,
734             service_host='240.0.0.0',
735         )
736         with self.assertTakesBetween(0.1, 0.5):
737             with self.assertRaises(arvados.errors.KeepWriteError):
738                 self.keepClient().put(self.DATA, copies=1, num_retries=0)
739
740     def test_low_bandwidth_no_delays_success(self):
741         self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
742         kc = self.keepClient()
743         loc = kc.put(self.DATA, copies=1, num_retries=0)
744         self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
745
746     def test_too_low_bandwidth_no_delays_failure(self):
747         # Check that lessening bandwidth corresponds to failing
748         kc = self.keepClient()
749         loc = kc.put(self.DATA, copies=1, num_retries=0)
750         self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
751         with self.assertTakesGreater(self.TIMEOUT_TIME):
752             with self.assertRaises(arvados.errors.KeepReadError) as e:
753                 kc.get(loc, num_retries=0)
754         with self.assertTakesGreater(self.TIMEOUT_TIME):
755             with self.assertRaises(arvados.errors.KeepWriteError):
756                 kc.put(self.DATA, copies=1, num_retries=0)
757
758     def test_low_bandwidth_with_server_response_delay_failure(self):
759         kc = self.keepClient()
760         loc = kc.put(self.DATA, copies=1, num_retries=0)
761         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
762         self.server.setdelays(response=self.TIMEOUT_TIME)
763         with self.assertTakesGreater(self.TIMEOUT_TIME):
764             with self.assertRaises(arvados.errors.KeepReadError) as e:
765                 kc.get(loc, num_retries=0)
766         with self.assertTakesGreater(self.TIMEOUT_TIME):
767             with self.assertRaises(arvados.errors.KeepWriteError):
768                 kc.put(self.DATA, copies=1, num_retries=0)
769         with self.assertTakesGreater(self.TIMEOUT_TIME):
770             with self.assertRaises(arvados.errors.KeepReadError) as e:
771                 kc.head(loc, num_retries=0)
772
773     def test_low_bandwidth_with_server_mid_delay_failure(self):
774         kc = self.keepClient()
775         loc = kc.put(self.DATA, copies=1, num_retries=0)
776         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
777         self.server.setdelays(mid_write=self.TIMEOUT_TIME, mid_read=self.TIMEOUT_TIME)
778         with self.assertTakesGreater(self.TIMEOUT_TIME):
779             with self.assertRaises(arvados.errors.KeepReadError) as e:
780                 kc.get(loc, num_retries=0)
781         with self.assertTakesGreater(self.TIMEOUT_TIME):
782             with self.assertRaises(arvados.errors.KeepWriteError):
783                 kc.put(self.DATA, copies=1, num_retries=0)
784
785     def test_timeout_slow_request(self):
786         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
787         self.server.setdelays(request=.2)
788         self._test_connect_timeout_under_200ms(loc)
789         self.server.setdelays(request=2)
790         self._test_response_timeout_under_2s(loc)
791
792     def test_timeout_slow_response(self):
793         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
794         self.server.setdelays(response=.2)
795         self._test_connect_timeout_under_200ms(loc)
796         self.server.setdelays(response=2)
797         self._test_response_timeout_under_2s(loc)
798
799     def test_timeout_slow_response_body(self):
800         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
801         self.server.setdelays(response_body=.2)
802         self._test_connect_timeout_under_200ms(loc)
803         self.server.setdelays(response_body=2)
804         self._test_response_timeout_under_2s(loc)
805
806     def _test_connect_timeout_under_200ms(self, loc):
807         # Allow 100ms to connect, then 1s for response. Everything
808         # should work, and everything should take at least 200ms to
809         # return.
810         kc = self.keepClient(timeouts=(.1, 1))
811         with self.assertTakesBetween(.2, .3):
812             kc.put(self.DATA, copies=1, num_retries=0)
813         with self.assertTakesBetween(.2, .3):
814             self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
815
816     def _test_response_timeout_under_2s(self, loc):
817         # Allow 10s to connect, then 1s for response. Nothing should
818         # work, and everything should take at least 1s to return.
819         kc = self.keepClient(timeouts=(10, 1))
820         with self.assertTakesBetween(1, 9):
821             with self.assertRaises(arvados.errors.KeepReadError):
822                 kc.get(loc, num_retries=0)
823         with self.assertTakesBetween(1, 9):
824             with self.assertRaises(arvados.errors.KeepWriteError):
825                 kc.put(self.DATA, copies=1, num_retries=0)
826
827
828 class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
829     def mock_disks_and_gateways(self, disks=3, gateways=1):
830         self.gateways = [{
831                 'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
832                 'owner_uuid': 'zzzzz-tpzed-000000000000000',
833                 'service_host': 'gatewayhost{}'.format(i),
834                 'service_port': 12345,
835                 'service_ssl_flag': True,
836                 'service_type': 'gateway:test',
837         } for i in range(gateways)]
838         self.gateway_roots = [
839             "https://{service_host}:{service_port}/".format(**gw)
840             for gw in self.gateways]
841         self.api_client = self.mock_keep_services(
842             count=disks, additional_services=self.gateways)
843         self.keepClient = arvados.KeepClient(api_client=self.api_client)
844
845     @mock.patch('pycurl.Curl')
846     def test_get_with_gateway_hint_first(self, MockCurl):
847         MockCurl.return_value = tutil.FakeCurl.make(
848             code=200, body='foo', headers={'Content-Length': 3})
849         self.mock_disks_and_gateways()
850         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@' + self.gateways[0]['uuid']
851         self.assertEqual('foo', self.keepClient.get(locator))
852         self.assertEqual(self.gateway_roots[0]+locator,
853                          MockCurl.return_value.getopt(pycurl.URL))
854         self.assertEqual(True, self.keepClient.head(locator))
855
856     @mock.patch('pycurl.Curl')
857     def test_get_with_gateway_hints_in_order(self, MockCurl):
858         gateways = 4
859         disks = 3
860         mocks = [
861             tutil.FakeCurl.make(code=404, body='')
862             for _ in range(gateways+disks)
863         ]
864         MockCurl.side_effect = tutil.queue_with(mocks)
865         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
866         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
867                            ['K@'+gw['uuid'] for gw in self.gateways])
868         with self.assertRaises(arvados.errors.NotFoundError):
869             self.keepClient.get(locator)
870         # Gateways are tried first, in the order given.
871         for i, root in enumerate(self.gateway_roots):
872             self.assertEqual(root+locator,
873                              mocks[i].getopt(pycurl.URL))
874         # Disk services are tried next.
875         for i in range(gateways, gateways+disks):
876             self.assertRegexpMatches(
877                 mocks[i].getopt(pycurl.URL),
878                 r'keep0x')
879
880     @mock.patch('pycurl.Curl')
881     def test_head_with_gateway_hints_in_order(self, MockCurl):
882         gateways = 4
883         disks = 3
884         mocks = [
885             tutil.FakeCurl.make(code=404, body='')
886             for _ in range(gateways+disks)
887         ]
888         MockCurl.side_effect = tutil.queue_with(mocks)
889         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
890         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
891                            ['K@'+gw['uuid'] for gw in self.gateways])
892         with self.assertRaises(arvados.errors.NotFoundError):
893             self.keepClient.head(locator)
894         # Gateways are tried first, in the order given.
895         for i, root in enumerate(self.gateway_roots):
896             self.assertEqual(root+locator,
897                              mocks[i].getopt(pycurl.URL))
898         # Disk services are tried next.
899         for i in range(gateways, gateways+disks):
900             self.assertRegexpMatches(
901                 mocks[i].getopt(pycurl.URL),
902                 r'keep0x')
903
904     @mock.patch('pycurl.Curl')
905     def test_get_with_remote_proxy_hint(self, MockCurl):
906         MockCurl.return_value = tutil.FakeCurl.make(
907             code=200, body='foo', headers={'Content-Length': 3})
908         self.mock_disks_and_gateways()
909         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
910         self.assertEqual('foo', self.keepClient.get(locator))
911         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
912                          MockCurl.return_value.getopt(pycurl.URL))
913
914     @mock.patch('pycurl.Curl')
915     def test_head_with_remote_proxy_hint(self, MockCurl):
916         MockCurl.return_value = tutil.FakeCurl.make(
917             code=200, body='foo', headers={'Content-Length': 3})
918         self.mock_disks_and_gateways()
919         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
920         self.assertEqual(True, self.keepClient.head(locator))
921         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
922                          MockCurl.return_value.getopt(pycurl.URL))
923
924
925 class KeepClientRetryTestMixin(object):
926     # Testing with a local Keep store won't exercise the retry behavior.
927     # Instead, our strategy is:
928     # * Create a client with one proxy specified (pointed at a black
929     #   hole), so there's no need to instantiate an API client, and
930     #   all HTTP requests come from one place.
931     # * Mock httplib's request method to provide simulated responses.
932     # This lets us test the retry logic extensively without relying on any
933     # supporting servers, and prevents side effects in case something hiccups.
934     # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
935     # run_method().
936     #
937     # Test classes must define TEST_PATCHER to a method that mocks
938     # out appropriate methods in the client.
939
940     PROXY_ADDR = 'http://[%s]:65535/' % (tutil.TEST_HOST,)
941     TEST_DATA = 'testdata'
942     TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
943
944     def setUp(self):
945         self.client_kwargs = {'proxy': self.PROXY_ADDR, 'local_store': ''}
946
947     def new_client(self, **caller_kwargs):
948         kwargs = self.client_kwargs.copy()
949         kwargs.update(caller_kwargs)
950         return arvados.KeepClient(**kwargs)
951
952     def run_method(self, *args, **kwargs):
953         raise NotImplementedError("test subclasses must define run_method")
954
955     def check_success(self, expected=None, *args, **kwargs):
956         if expected is None:
957             expected = self.DEFAULT_EXPECT
958         self.assertEqual(expected, self.run_method(*args, **kwargs))
959
960     def check_exception(self, error_class=None, *args, **kwargs):
961         if error_class is None:
962             error_class = self.DEFAULT_EXCEPTION
963         self.assertRaises(error_class, self.run_method, *args, **kwargs)
964
965     def test_immediate_success(self):
966         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 200):
967             self.check_success()
968
969     def test_retry_then_success(self):
970         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
971             self.check_success(num_retries=3)
972
973     def test_exception_then_success(self):
974         with self.TEST_PATCHER(self.DEFAULT_EXPECT, Exception('mock err'), 200):
975             self.check_success(num_retries=3)
976
977     def test_no_default_retry(self):
978         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
979             self.check_exception()
980
981     def test_no_retry_after_permanent_error(self):
982         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 403, 200):
983             self.check_exception(num_retries=3)
984
985     def test_error_after_retries_exhausted(self):
986         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 500, 200):
987             self.check_exception(num_retries=1)
988
989     def test_num_retries_instance_fallback(self):
990         self.client_kwargs['num_retries'] = 3
991         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
992             self.check_success()
993
994
995 @tutil.skip_sleep
996 class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
997     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
998     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
999     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1000     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1001
1002     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1003                    *args, **kwargs):
1004         return self.new_client().get(locator, *args, **kwargs)
1005
1006     def test_specific_exception_when_not_found(self):
1007         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1008             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1009
1010     def test_general_exception_with_mixed_errors(self):
1011         # get should raise a NotFoundError if no server returns the block,
1012         # and a high threshold of servers report that it's not found.
1013         # This test rigs up 50/50 disagreement between two servers, and
1014         # checks that it does not become a NotFoundError.
1015         client = self.new_client()
1016         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1017             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1018                 client.get(self.HINTED_LOCATOR)
1019             self.assertNotIsInstance(
1020                 exc_check.exception, arvados.errors.NotFoundError,
1021                 "mixed errors raised NotFoundError")
1022
1023     def test_hint_server_can_succeed_without_retries(self):
1024         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1025             self.check_success(locator=self.HINTED_LOCATOR)
1026
1027     def test_try_next_server_after_timeout(self):
1028         with tutil.mock_keep_responses(
1029                 (socket.timeout("timed out"), 200),
1030                 (self.DEFAULT_EXPECT, 200)):
1031             self.check_success(locator=self.HINTED_LOCATOR)
1032
1033     def test_retry_data_with_wrong_checksum(self):
1034         with tutil.mock_keep_responses(
1035                 ('baddata', 200),
1036                 (self.DEFAULT_EXPECT, 200)):
1037             self.check_success(locator=self.HINTED_LOCATOR)
1038
1039 @tutil.skip_sleep
1040 class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1041     DEFAULT_EXPECT = True
1042     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1043     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1044     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1045
1046     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1047                    *args, **kwargs):
1048         return self.new_client().head(locator, *args, **kwargs)
1049
1050     def test_specific_exception_when_not_found(self):
1051         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1052             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1053
1054     def test_general_exception_with_mixed_errors(self):
1055         # head should raise a NotFoundError if no server returns the block,
1056         # and a high threshold of servers report that it's not found.
1057         # This test rigs up 50/50 disagreement between two servers, and
1058         # checks that it does not become a NotFoundError.
1059         client = self.new_client()
1060         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1061             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1062                 client.head(self.HINTED_LOCATOR)
1063             self.assertNotIsInstance(
1064                 exc_check.exception, arvados.errors.NotFoundError,
1065                 "mixed errors raised NotFoundError")
1066
1067     def test_hint_server_can_succeed_without_retries(self):
1068         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1069             self.check_success(locator=self.HINTED_LOCATOR)
1070
1071     def test_try_next_server_after_timeout(self):
1072         with tutil.mock_keep_responses(
1073                 (socket.timeout("timed out"), 200),
1074                 (self.DEFAULT_EXPECT, 200)):
1075             self.check_success(locator=self.HINTED_LOCATOR)
1076
1077 @tutil.skip_sleep
1078 class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1079     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
1080     DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
1081     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1082
1083     def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
1084                    copies=1, *args, **kwargs):
1085         return self.new_client().put(data, copies, *args, **kwargs)
1086
1087     def test_do_not_send_multiple_copies_to_same_server(self):
1088         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200):
1089             self.check_exception(copies=2, num_retries=3)
1090
1091
1092 class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
1093
1094     class FakeKeepService(object):
1095         def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
1096             self.delay = delay
1097             self.will_succeed = will_succeed
1098             self.will_raise = will_raise
1099             self._result = {}
1100             self._result['headers'] = {}
1101             self._result['headers']['x-keep-replicas-stored'] = str(replicas)
1102             self._result['body'] = 'foobar'
1103
1104         def put(self, data_hash, data, timeout):
1105             time.sleep(self.delay)
1106             if self.will_raise is not None:
1107                 raise self.will_raise
1108             return self.will_succeed
1109
1110         def last_result(self):
1111             if self.will_succeed:
1112                 return self._result
1113
1114         def finished(self):
1115             return False
1116     
1117     def setUp(self):
1118         self.copies = 3
1119         self.pool = arvados.KeepClient.KeepWriterThreadPool(
1120             data = 'foo',
1121             data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1122             max_service_replicas = self.copies,
1123             copies = self.copies
1124         )
1125
1126     def test_only_write_enough_on_success(self):
1127         for i in range(10):
1128             ks = self.FakeKeepService(delay=old_div(i,10.0), will_succeed=True)
1129             self.pool.add_task(ks, None)
1130         self.pool.join()
1131         self.assertEqual(self.pool.done(), self.copies)
1132
1133     def test_only_write_enough_on_partial_success(self):
1134         for i in range(5):
1135             ks = self.FakeKeepService(delay=old_div(i,10.0), will_succeed=False)
1136             self.pool.add_task(ks, None)
1137             ks = self.FakeKeepService(delay=old_div(i,10.0), will_succeed=True)
1138             self.pool.add_task(ks, None)
1139         self.pool.join()
1140         self.assertEqual(self.pool.done(), self.copies)
1141
1142     def test_only_write_enough_when_some_crash(self):
1143         for i in range(5):
1144             ks = self.FakeKeepService(delay=old_div(i,10.0), will_raise=Exception())
1145             self.pool.add_task(ks, None)
1146             ks = self.FakeKeepService(delay=old_div(i,10.0), will_succeed=True)
1147             self.pool.add_task(ks, None)
1148         self.pool.join()
1149         self.assertEqual(self.pool.done(), self.copies)
1150
1151     def test_fail_when_too_many_crash(self):
1152         for i in range(self.copies+1):
1153             ks = self.FakeKeepService(delay=old_div(i,10.0), will_raise=Exception())
1154             self.pool.add_task(ks, None)
1155         for i in range(self.copies-1):
1156             ks = self.FakeKeepService(delay=old_div(i,10.0), will_succeed=True)
1157             self.pool.add_task(ks, None)
1158         self.pool.join()
1159         self.assertEqual(self.pool.done(), self.copies-1)
1160     
1161
1162 @tutil.skip_sleep
1163 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock):
1164     # Test put()s that need two distinct servers to succeed, possibly
1165     # requiring multiple passes through the retry loop.
1166
1167     def setUp(self):
1168         self.api_client = self.mock_keep_services(count=2)
1169         self.keep_client = arvados.KeepClient(api_client=self.api_client)
1170
1171     def test_success_after_exception(self):
1172         with tutil.mock_keep_responses(
1173                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1174                 Exception('mock err'), 200, 200) as req_mock:
1175             self.keep_client.put('foo', num_retries=1, copies=2)
1176         self.assertEqual(3, req_mock.call_count)
1177
1178     def test_success_after_retryable_error(self):
1179         with tutil.mock_keep_responses(
1180                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1181                 500, 200, 200) as req_mock:
1182             self.keep_client.put('foo', num_retries=1, copies=2)
1183         self.assertEqual(3, req_mock.call_count)
1184
1185     def test_fail_after_final_error(self):
1186         # First retry loop gets a 200 (can't achieve replication by
1187         # storing again on that server) and a 400 (can't retry that
1188         # server at all), so we shouldn't try a third request.
1189         with tutil.mock_keep_responses(
1190                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1191                 200, 400, 200) as req_mock:
1192             with self.assertRaises(arvados.errors.KeepWriteError):
1193                 self.keep_client.put('foo', num_retries=1, copies=2)
1194         self.assertEqual(2, req_mock.call_count)