Merge branch '18691-freeze-project'
[arvados.git] / sdk / python / tests / test_keep_client.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 standard_library.install_aliases()
9 from builtins import str
10 from builtins import range
11 from builtins import object
12 import hashlib
13 import mock
14 import os
15 import pycurl
16 import random
17 import re
18 import socket
19 import sys
20 import time
21 import unittest
22 import urllib.parse
23
24 import arvados
25 import arvados.retry
26 import arvados.util
27 from . import arvados_testutil as tutil
28 from . import keepstub
29 from . import run_test_server
30
31 class KeepTestCase(run_test_server.TestCaseWithServers):
32     MAIN_SERVER = {}
33     KEEP_SERVER = {}
34
35     @classmethod
36     def setUpClass(cls):
37         super(KeepTestCase, cls).setUpClass()
38         run_test_server.authorize_with("admin")
39         cls.api_client = arvados.api('v1')
40         cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
41                                              proxy='', local_store='')
42
43     def test_KeepBasicRWTest(self):
44         self.assertEqual(0, self.keep_client.upload_counter.get())
45         foo_locator = self.keep_client.put('foo')
46         self.assertRegex(
47             foo_locator,
48             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
49             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
50
51         # 6 bytes because uploaded 2 copies
52         self.assertEqual(6, self.keep_client.upload_counter.get())
53
54         self.assertEqual(0, self.keep_client.download_counter.get())
55         self.assertEqual(self.keep_client.get(foo_locator),
56                          b'foo',
57                          'wrong content from Keep.get(md5("foo"))')
58         self.assertEqual(3, self.keep_client.download_counter.get())
59
60     def test_KeepBinaryRWTest(self):
61         blob_str = b'\xff\xfe\xf7\x00\x01\x02'
62         blob_locator = self.keep_client.put(blob_str)
63         self.assertRegex(
64             blob_locator,
65             '^7fc7c53b45e53926ba52821140fef396\+6',
66             ('wrong locator from Keep.put(<binarydata>):' + blob_locator))
67         self.assertEqual(self.keep_client.get(blob_locator),
68                          blob_str,
69                          'wrong content from Keep.get(md5(<binarydata>))')
70
71     def test_KeepLongBinaryRWTest(self):
72         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
73         for i in range(0, 23):
74             blob_data = blob_data + blob_data
75         blob_locator = self.keep_client.put(blob_data)
76         self.assertRegex(
77             blob_locator,
78             '^84d90fc0d8175dd5dcfab04b999bc956\+67108864',
79             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
80         self.assertEqual(self.keep_client.get(blob_locator),
81                          blob_data,
82                          'wrong content from Keep.get(md5(<binarydata>))')
83
84     @unittest.skip("unreliable test - please fix and close #8752")
85     def test_KeepSingleCopyRWTest(self):
86         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
87         blob_locator = self.keep_client.put(blob_data, copies=1)
88         self.assertRegex(
89             blob_locator,
90             '^c902006bc98a3eb4a3663b65ab4a6fab\+8',
91             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
92         self.assertEqual(self.keep_client.get(blob_locator),
93                          blob_data,
94                          'wrong content from Keep.get(md5(<binarydata>))')
95
96     def test_KeepEmptyCollectionTest(self):
97         blob_locator = self.keep_client.put('', copies=1)
98         self.assertRegex(
99             blob_locator,
100             '^d41d8cd98f00b204e9800998ecf8427e\+0',
101             ('wrong locator from Keep.put(""): ' + blob_locator))
102
103     def test_unicode_must_be_ascii(self):
104         # If unicode type, must only consist of valid ASCII
105         foo_locator = self.keep_client.put(u'foo')
106         self.assertRegex(
107             foo_locator,
108             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
109             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
110
111         if sys.version_info < (3, 0):
112             with self.assertRaises(UnicodeEncodeError):
113                 # Error if it is not ASCII
114                 self.keep_client.put(u'\xe2')
115
116         with self.assertRaises(AttributeError):
117             # Must be bytes or have an encode() method
118             self.keep_client.put({})
119
120     def test_KeepHeadTest(self):
121         locator = self.keep_client.put('test_head')
122         self.assertRegex(
123             locator,
124             '^b9a772c7049325feb7130fff1f8333e9\+9',
125             'wrong md5 hash from Keep.put for "test_head": ' + locator)
126         self.assertEqual(True, self.keep_client.head(locator))
127         self.assertEqual(self.keep_client.get(locator),
128                          b'test_head',
129                          'wrong content from Keep.get for "test_head"')
130
131 class KeepPermissionTestCase(run_test_server.TestCaseWithServers):
132     MAIN_SERVER = {}
133     KEEP_SERVER = {'blob_signing': True}
134
135     def test_KeepBasicRWTest(self):
136         run_test_server.authorize_with('active')
137         keep_client = arvados.KeepClient()
138         foo_locator = keep_client.put('foo')
139         self.assertRegex(
140             foo_locator,
141             r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
142             'invalid locator from Keep.put("foo"): ' + foo_locator)
143         self.assertEqual(keep_client.get(foo_locator),
144                          b'foo',
145                          'wrong content from Keep.get(md5("foo"))')
146
147         # GET with an unsigned locator => NotFound
148         bar_locator = keep_client.put('bar')
149         unsigned_bar_locator = "37b51d194a7513e45b56f6524f2d51f2+3"
150         self.assertRegex(
151             bar_locator,
152             r'^37b51d194a7513e45b56f6524f2d51f2\+3\+A[a-f0-9]+@[a-f0-9]+$',
153             'invalid locator from Keep.put("bar"): ' + bar_locator)
154         self.assertRaises(arvados.errors.NotFoundError,
155                           keep_client.get,
156                           unsigned_bar_locator)
157
158         # GET from a different user => NotFound
159         run_test_server.authorize_with('spectator')
160         self.assertRaises(arvados.errors.NotFoundError,
161                           arvados.Keep.get,
162                           bar_locator)
163
164         # Unauthenticated GET for a signed locator => NotFound
165         # Unauthenticated GET for an unsigned locator => NotFound
166         keep_client.api_token = ''
167         self.assertRaises(arvados.errors.NotFoundError,
168                           keep_client.get,
169                           bar_locator)
170         self.assertRaises(arvados.errors.NotFoundError,
171                           keep_client.get,
172                           unsigned_bar_locator)
173
174
175 class KeepProxyTestCase(run_test_server.TestCaseWithServers):
176     MAIN_SERVER = {}
177     KEEP_SERVER = {}
178     KEEP_PROXY_SERVER = {}
179
180     @classmethod
181     def setUpClass(cls):
182         super(KeepProxyTestCase, cls).setUpClass()
183         run_test_server.authorize_with('active')
184         cls.api_client = arvados.api('v1')
185
186     def tearDown(self):
187         arvados.config.settings().pop('ARVADOS_EXTERNAL_CLIENT', None)
188         super(KeepProxyTestCase, self).tearDown()
189
190     def test_KeepProxyTest1(self):
191         # Will use ARVADOS_KEEP_SERVICES environment variable that
192         # is set by setUpClass().
193         keep_client = arvados.KeepClient(api_client=self.api_client,
194                                          local_store='')
195         baz_locator = keep_client.put('baz')
196         self.assertRegex(
197             baz_locator,
198             '^73feffa4b7f6bb68e44cf984c85f6e88\+3',
199             'wrong md5 hash from Keep.put("baz"): ' + baz_locator)
200         self.assertEqual(keep_client.get(baz_locator),
201                          b'baz',
202                          'wrong content from Keep.get(md5("baz"))')
203         self.assertTrue(keep_client.using_proxy)
204
205     def test_KeepProxyTest2(self):
206         # Don't instantiate the proxy directly, but set the X-External-Client
207         # header.  The API server should direct us to the proxy.
208         arvados.config.settings()['ARVADOS_EXTERNAL_CLIENT'] = 'true'
209         keep_client = arvados.KeepClient(api_client=self.api_client,
210                                          proxy='', local_store='')
211         baz_locator = keep_client.put('baz2')
212         self.assertRegex(
213             baz_locator,
214             '^91f372a266fe2bf2823cb8ec7fda31ce\+4',
215             'wrong md5 hash from Keep.put("baz2"): ' + baz_locator)
216         self.assertEqual(keep_client.get(baz_locator),
217                          b'baz2',
218                          'wrong content from Keep.get(md5("baz2"))')
219         self.assertTrue(keep_client.using_proxy)
220
221     def test_KeepProxyTestMultipleURIs(self):
222         # Test using ARVADOS_KEEP_SERVICES env var overriding any
223         # existing proxy setting and setting multiple proxies
224         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
225         keep_client = arvados.KeepClient(api_client=self.api_client,
226                                          local_store='')
227         uris = [x['_service_root'] for x in keep_client._keep_services]
228         self.assertEqual(uris, ['http://10.0.0.1/',
229                                 'https://foo.example.org:1234/'])
230
231     def test_KeepProxyTestInvalidURI(self):
232         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org'
233         with self.assertRaises(arvados.errors.ArgumentError):
234             keep_client = arvados.KeepClient(api_client=self.api_client,
235                                              local_store='')
236
237
238 class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
239     def get_service_roots(self, api_client):
240         keep_client = arvados.KeepClient(api_client=api_client)
241         services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
242         return [urllib.parse.urlparse(url) for url in sorted(services)]
243
244     def test_ssl_flag_respected_in_roots(self):
245         for ssl_flag in [False, True]:
246             services = self.get_service_roots(self.mock_keep_services(
247                 service_ssl_flag=ssl_flag))
248             self.assertEqual(
249                 ('https' if ssl_flag else 'http'), services[0].scheme)
250
251     def test_correct_ports_with_ipv6_addresses(self):
252         service = self.get_service_roots(self.mock_keep_services(
253             service_type='proxy', service_host='100::1', service_port=10, count=1))[0]
254         self.assertEqual('100::1', service.hostname)
255         self.assertEqual(10, service.port)
256
257     def test_insecure_disables_tls_verify(self):
258         api_client = self.mock_keep_services(count=1)
259         force_timeout = socket.timeout("timed out")
260
261         api_client.insecure = True
262         with tutil.mock_keep_responses(b'foo', 200) as mock:
263             keep_client = arvados.KeepClient(api_client=api_client)
264             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
265             self.assertEqual(
266                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
267                 0)
268             self.assertEqual(
269                 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
270                 0)
271
272         api_client.insecure = False
273         with tutil.mock_keep_responses(b'foo', 200) as mock:
274             keep_client = arvados.KeepClient(api_client=api_client)
275             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
276             # getopt()==None here means we didn't change the
277             # default. If we were using real pycurl instead of a mock,
278             # it would return the default value 1.
279             self.assertEqual(
280                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
281                 None)
282             self.assertEqual(
283                 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
284                 None)
285
286     def test_refresh_signature(self):
287         blk_digest = '6f5902ac237024bdd0c176cb93063dc4+11'
288         blk_sig = 'da39a3ee5e6b4b0d3255bfef95601890afd80709@53bed294'
289         local_loc = blk_digest+'+A'+blk_sig
290         remote_loc = blk_digest+'+R'+blk_sig
291         api_client = self.mock_keep_services(count=1)
292         headers = {'X-Keep-Locator':local_loc}
293         with tutil.mock_keep_responses('', 200, **headers):
294             # Check that the translated locator gets returned
295             keep_client = arvados.KeepClient(api_client=api_client)
296             self.assertEqual(local_loc, keep_client.refresh_signature(remote_loc))
297             # Check that refresh_signature() uses the correct method and headers
298             keep_client._get_or_head = mock.MagicMock()
299             keep_client.refresh_signature(remote_loc)
300             args, kwargs = keep_client._get_or_head.call_args_list[0]
301             self.assertIn(remote_loc, args)
302             self.assertEqual("HEAD", kwargs['method'])
303             self.assertIn('X-Keep-Signature', kwargs['headers'])
304
305     # test_*_timeout verify that KeepClient instructs pycurl to use
306     # the appropriate connection and read timeouts. They don't care
307     # whether pycurl actually exhibits the expected timeout behavior
308     # -- those tests are in the KeepClientTimeout test class.
309
310     def test_get_timeout(self):
311         api_client = self.mock_keep_services(count=1)
312         force_timeout = socket.timeout("timed out")
313         with tutil.mock_keep_responses(force_timeout, 0) as mock:
314             keep_client = arvados.KeepClient(api_client=api_client)
315             with self.assertRaises(arvados.errors.KeepReadError):
316                 keep_client.get('ffffffffffffffffffffffffffffffff')
317             self.assertEqual(
318                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
319                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
320             self.assertEqual(
321                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
322                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
323             self.assertEqual(
324                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
325                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
326
327     def test_put_timeout(self):
328         api_client = self.mock_keep_services(count=1)
329         force_timeout = socket.timeout("timed out")
330         with tutil.mock_keep_responses(force_timeout, 0) as mock:
331             keep_client = arvados.KeepClient(api_client=api_client)
332             with self.assertRaises(arvados.errors.KeepWriteError):
333                 keep_client.put(b'foo')
334             self.assertEqual(
335                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
336                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
337             self.assertEqual(
338                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
339                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
340             self.assertEqual(
341                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
342                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
343
344     def test_head_timeout(self):
345         api_client = self.mock_keep_services(count=1)
346         force_timeout = socket.timeout("timed out")
347         with tutil.mock_keep_responses(force_timeout, 0) as mock:
348             keep_client = arvados.KeepClient(api_client=api_client)
349             with self.assertRaises(arvados.errors.KeepReadError):
350                 keep_client.head('ffffffffffffffffffffffffffffffff')
351             self.assertEqual(
352                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
353                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
354             self.assertEqual(
355                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
356                 None)
357             self.assertEqual(
358                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
359                 None)
360
361     def test_proxy_get_timeout(self):
362         api_client = self.mock_keep_services(service_type='proxy', count=1)
363         force_timeout = socket.timeout("timed out")
364         with tutil.mock_keep_responses(force_timeout, 0) as mock:
365             keep_client = arvados.KeepClient(api_client=api_client)
366             with self.assertRaises(arvados.errors.KeepReadError):
367                 keep_client.get('ffffffffffffffffffffffffffffffff')
368             self.assertEqual(
369                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
370                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
371             self.assertEqual(
372                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
373                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
374             self.assertEqual(
375                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
376                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
377
378     def test_proxy_head_timeout(self):
379         api_client = self.mock_keep_services(service_type='proxy', count=1)
380         force_timeout = socket.timeout("timed out")
381         with tutil.mock_keep_responses(force_timeout, 0) as mock:
382             keep_client = arvados.KeepClient(api_client=api_client)
383             with self.assertRaises(arvados.errors.KeepReadError):
384                 keep_client.head('ffffffffffffffffffffffffffffffff')
385             self.assertEqual(
386                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
387                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
388             self.assertEqual(
389                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
390                 None)
391             self.assertEqual(
392                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
393                 None)
394
395     def test_proxy_put_timeout(self):
396         api_client = self.mock_keep_services(service_type='proxy', count=1)
397         force_timeout = socket.timeout("timed out")
398         with tutil.mock_keep_responses(force_timeout, 0) as mock:
399             keep_client = arvados.KeepClient(api_client=api_client)
400             with self.assertRaises(arvados.errors.KeepWriteError):
401                 keep_client.put('foo')
402             self.assertEqual(
403                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
404                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
405             self.assertEqual(
406                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
407                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
408             self.assertEqual(
409                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
410                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
411
412     def check_no_services_error(self, verb, exc_class):
413         api_client = mock.MagicMock(name='api_client')
414         api_client.keep_services().accessible().execute.side_effect = (
415             arvados.errors.ApiError)
416         keep_client = arvados.KeepClient(api_client=api_client)
417         with self.assertRaises(exc_class) as err_check:
418             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
419         self.assertEqual(0, len(err_check.exception.request_errors()))
420
421     def test_get_error_with_no_services(self):
422         self.check_no_services_error('get', arvados.errors.KeepReadError)
423
424     def test_head_error_with_no_services(self):
425         self.check_no_services_error('head', arvados.errors.KeepReadError)
426
427     def test_put_error_with_no_services(self):
428         self.check_no_services_error('put', arvados.errors.KeepWriteError)
429
430     def check_errors_from_last_retry(self, verb, exc_class):
431         api_client = self.mock_keep_services(count=2)
432         req_mock = tutil.mock_keep_responses(
433             "retry error reporting test", 500, 500, 500, 500, 500, 500, 502, 502)
434         with req_mock, tutil.skip_sleep, \
435                 self.assertRaises(exc_class) as err_check:
436             keep_client = arvados.KeepClient(api_client=api_client)
437             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
438                                        num_retries=3)
439         self.assertEqual([502, 502], [
440                 getattr(error, 'status_code', None)
441                 for error in err_check.exception.request_errors().values()])
442         self.assertRegex(str(err_check.exception), r'failed to (read|write) .* after 4 attempts')
443
444     def test_get_error_reflects_last_retry(self):
445         self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
446
447     def test_head_error_reflects_last_retry(self):
448         self.check_errors_from_last_retry('head', arvados.errors.KeepReadError)
449
450     def test_put_error_reflects_last_retry(self):
451         self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
452
453     def test_put_error_does_not_include_successful_puts(self):
454         data = 'partial failure test'
455         data_loc = tutil.str_keep_locator(data)
456         api_client = self.mock_keep_services(count=3)
457         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
458                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
459             keep_client = arvados.KeepClient(api_client=api_client)
460             keep_client.put(data)
461         self.assertEqual(2, len(exc_check.exception.request_errors()))
462
463     def test_proxy_put_with_no_writable_services(self):
464         data = 'test with no writable services'
465         data_loc = tutil.str_keep_locator(data)
466         api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
467         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
468                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
469           keep_client = arvados.KeepClient(api_client=api_client)
470           keep_client.put(data)
471         self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
472         self.assertEqual(0, len(exc_check.exception.request_errors()))
473
474     def test_oddball_service_get(self):
475         body = b'oddball service get'
476         api_client = self.mock_keep_services(service_type='fancynewblobstore')
477         with tutil.mock_keep_responses(body, 200):
478             keep_client = arvados.KeepClient(api_client=api_client)
479             actual = keep_client.get(tutil.str_keep_locator(body))
480         self.assertEqual(body, actual)
481
482     def test_oddball_service_put(self):
483         body = b'oddball service put'
484         pdh = tutil.str_keep_locator(body)
485         api_client = self.mock_keep_services(service_type='fancynewblobstore')
486         with tutil.mock_keep_responses(pdh, 200):
487             keep_client = arvados.KeepClient(api_client=api_client)
488             actual = keep_client.put(body, copies=1)
489         self.assertEqual(pdh, actual)
490
491     def test_oddball_service_writer_count(self):
492         body = b'oddball service writer count'
493         pdh = tutil.str_keep_locator(body)
494         api_client = self.mock_keep_services(service_type='fancynewblobstore',
495                                              count=4)
496         headers = {'x-keep-replicas-stored': 3}
497         with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
498                                        **headers) as req_mock:
499             keep_client = arvados.KeepClient(api_client=api_client)
500             actual = keep_client.put(body, copies=2)
501         self.assertEqual(pdh, actual)
502         self.assertEqual(1, req_mock.call_count)
503
504
505 @tutil.skip_sleep
506 class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
507     def setUp(self):
508         self.api_client = self.mock_keep_services(count=2)
509         self.keep_client = arvados.KeepClient(api_client=self.api_client)
510         self.data = b'xyzzy'
511         self.locator = '1271ed5ef305aadabc605b1609e24c52'
512
513     @mock.patch('arvados.KeepClient.KeepService.get')
514     def test_get_request_cache(self, get_mock):
515         with tutil.mock_keep_responses(self.data, 200, 200):
516             self.keep_client.get(self.locator)
517             self.keep_client.get(self.locator)
518         # Request already cached, don't require more than one request
519         get_mock.assert_called_once()
520
521     @mock.patch('arvados.KeepClient.KeepService.get')
522     def test_head_request_cache(self, get_mock):
523         with tutil.mock_keep_responses(self.data, 200, 200):
524             self.keep_client.head(self.locator)
525             self.keep_client.head(self.locator)
526         # Don't cache HEAD requests so that they're not confused with GET reqs
527         self.assertEqual(2, get_mock.call_count)
528
529     @mock.patch('arvados.KeepClient.KeepService.get')
530     def test_head_and_then_get_return_different_responses(self, get_mock):
531         head_resp = None
532         get_resp = None
533         get_mock.side_effect = ['first response', 'second response']
534         with tutil.mock_keep_responses(self.data, 200, 200):
535             head_resp = self.keep_client.head(self.locator)
536             get_resp = self.keep_client.get(self.locator)
537         self.assertEqual('first response', head_resp)
538         # First reponse was not cached because it was from a HEAD request.
539         self.assertNotEqual(head_resp, get_resp)
540
541 @tutil.skip_sleep
542 class KeepStorageClassesTestCase(unittest.TestCase, tutil.ApiClientMock):
543     def setUp(self):
544         self.api_client = self.mock_keep_services(count=2)
545         self.keep_client = arvados.KeepClient(api_client=self.api_client)
546         self.data = b'xyzzy'
547         self.locator = '1271ed5ef305aadabc605b1609e24c52'
548
549     def test_multiple_default_storage_classes_req_header(self):
550         api_mock = self.api_client_mock()
551         api_mock.config.return_value = {
552             'StorageClasses': {
553                 'foo': { 'Default': True },
554                 'bar': { 'Default': True },
555                 'baz': { 'Default': False }
556             }
557         }
558         api_client = self.mock_keep_services(api_mock=api_mock, count=2)
559         keep_client = arvados.KeepClient(api_client=api_client)
560         resp_hdr = {
561             'x-keep-storage-classes-confirmed': 'foo=1, bar=1',
562             'x-keep-replicas-stored': 1
563         }
564         with tutil.mock_keep_responses(self.locator, 200, **resp_hdr) as mock:
565             keep_client.put(self.data, copies=1)
566             req_hdr = mock.responses[0]
567             self.assertIn(
568                 'X-Keep-Storage-Classes: bar, foo', req_hdr.getopt(pycurl.HTTPHEADER))
569
570     def test_storage_classes_req_header(self):
571         self.assertEqual(
572             self.api_client.config()['StorageClasses'],
573             {'default': {'Default': True}})
574         cases = [
575             # requested, expected
576             [['foo'], 'X-Keep-Storage-Classes: foo'],
577             [['bar', 'foo'], 'X-Keep-Storage-Classes: bar, foo'],
578             [[], 'X-Keep-Storage-Classes: default'],
579             [None, 'X-Keep-Storage-Classes: default'],
580         ]
581         for req_classes, expected_header in cases:
582             headers = {'x-keep-replicas-stored': 1}
583             if req_classes is None or len(req_classes) == 0:
584                 confirmed_hdr = 'default=1'
585             elif len(req_classes) > 0:
586                 confirmed_hdr = ', '.join(["{}=1".format(cls) for cls in req_classes])
587             headers.update({'x-keep-storage-classes-confirmed': confirmed_hdr})
588             with tutil.mock_keep_responses(self.locator, 200, **headers) as mock:
589                 self.keep_client.put(self.data, copies=1, classes=req_classes)
590                 req_hdr = mock.responses[0]
591                 self.assertIn(expected_header, req_hdr.getopt(pycurl.HTTPHEADER))
592
593     def test_partial_storage_classes_put(self):
594         headers = {
595             'x-keep-replicas-stored': 1,
596             'x-keep-storage-classes-confirmed': 'foo=1'}
597         with tutil.mock_keep_responses(self.locator, 200, 503, **headers) as mock:
598             with self.assertRaises(arvados.errors.KeepWriteError):
599                 self.keep_client.put(self.data, copies=1, classes=['foo', 'bar'])
600             # 1st request, both classes pending
601             req1_headers = mock.responses[0].getopt(pycurl.HTTPHEADER)
602             self.assertIn('X-Keep-Storage-Classes: bar, foo', req1_headers)
603             # 2nd try, 'foo' class already satisfied
604             req2_headers = mock.responses[1].getopt(pycurl.HTTPHEADER)
605             self.assertIn('X-Keep-Storage-Classes: bar', req2_headers)
606
607     def test_successful_storage_classes_put_requests(self):
608         cases = [
609             # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, expected_requests
610             [ 1, ['foo'], 1, 'foo=1', 1],
611             [ 1, ['foo'], 2, 'foo=2', 1],
612             [ 2, ['foo'], 2, 'foo=2', 1],
613             [ 2, ['foo'], 1, 'foo=1', 2],
614             [ 1, ['foo', 'bar'], 1, 'foo=1, bar=1', 1],
615             [ 1, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
616             [ 2, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
617             [ 2, ['foo', 'bar'], 1, 'foo=1, bar=1', 2],
618             [ 1, ['foo', 'bar'], 1, None, 1],
619             [ 1, ['foo'], 1, None, 1],
620             [ 2, ['foo'], 2, None, 1],
621             [ 2, ['foo'], 1, None, 2],
622         ]
623         for w_copies, w_classes, c_copies, c_classes, e_reqs in cases:
624             headers = {'x-keep-replicas-stored': c_copies}
625             if c_classes is not None:
626                 headers.update({'x-keep-storage-classes-confirmed': c_classes})
627             with tutil.mock_keep_responses(self.locator, 200, 200, **headers) as mock:
628                 case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}", expected_requests={}'.format(w_copies, ', '.join(w_classes), c_copies, c_classes, e_reqs)
629                 self.assertEqual(self.locator,
630                     self.keep_client.put(self.data, copies=w_copies, classes=w_classes),
631                     case_desc)
632                 self.assertEqual(e_reqs, mock.call_count, case_desc)
633
634     def test_failed_storage_classes_put_requests(self):
635         cases = [
636             # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, return_code
637             [ 1, ['foo'], 1, 'bar=1', 200],
638             [ 1, ['foo'], 1, None, 503],
639             [ 2, ['foo'], 1, 'bar=1, foo=0', 200],
640             [ 3, ['foo'], 1, 'bar=1, foo=1', 200],
641             [ 3, ['foo', 'bar'], 1, 'bar=2, foo=1', 200],
642         ]
643         for w_copies, w_classes, c_copies, c_classes, return_code in cases:
644             headers = {'x-keep-replicas-stored': c_copies}
645             if c_classes is not None:
646                 headers.update({'x-keep-storage-classes-confirmed': c_classes})
647             with tutil.mock_keep_responses(self.locator, return_code, return_code, **headers):
648                 case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}"'.format(w_copies, ', '.join(w_classes), c_copies, c_classes)
649                 with self.assertRaises(arvados.errors.KeepWriteError, msg=case_desc):
650                     self.keep_client.put(self.data, copies=w_copies, classes=w_classes)
651
652 @tutil.skip_sleep
653 class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock):
654     def setUp(self):
655         self.api_client = self.mock_keep_services(count=2)
656         self.keep_client = arvados.KeepClient(api_client=self.api_client)
657         self.data = b'xyzzy'
658         self.locator = '1271ed5ef305aadabc605b1609e24c52'
659         self.test_id = arvados.util.new_request_id()
660         self.assertRegex(self.test_id, r'^req-[a-z0-9]{20}$')
661         # If we don't set request_id to None explicitly here, it will
662         # return <MagicMock name='api_client_mock.request_id'
663         # id='123456789'>:
664         self.api_client.request_id = None
665
666     def test_default_to_api_client_request_id(self):
667         self.api_client.request_id = self.test_id
668         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
669             self.keep_client.put(self.data)
670         self.assertEqual(2, len(mock.responses))
671         for resp in mock.responses:
672             self.assertProvidedRequestId(resp)
673
674         with tutil.mock_keep_responses(self.data, 200) as mock:
675             self.keep_client.get(self.locator)
676         self.assertProvidedRequestId(mock.responses[0])
677
678         with tutil.mock_keep_responses(b'', 200) as mock:
679             self.keep_client.head(self.locator)
680         self.assertProvidedRequestId(mock.responses[0])
681
682     def test_explicit_request_id(self):
683         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
684             self.keep_client.put(self.data, request_id=self.test_id)
685         self.assertEqual(2, len(mock.responses))
686         for resp in mock.responses:
687             self.assertProvidedRequestId(resp)
688
689         with tutil.mock_keep_responses(self.data, 200) as mock:
690             self.keep_client.get(self.locator, request_id=self.test_id)
691         self.assertProvidedRequestId(mock.responses[0])
692
693         with tutil.mock_keep_responses(b'', 200) as mock:
694             self.keep_client.head(self.locator, request_id=self.test_id)
695         self.assertProvidedRequestId(mock.responses[0])
696
697     def test_automatic_request_id(self):
698         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
699             self.keep_client.put(self.data)
700         self.assertEqual(2, len(mock.responses))
701         for resp in mock.responses:
702             self.assertAutomaticRequestId(resp)
703
704         with tutil.mock_keep_responses(self.data, 200) as mock:
705             self.keep_client.get(self.locator)
706         self.assertAutomaticRequestId(mock.responses[0])
707
708         with tutil.mock_keep_responses(b'', 200) as mock:
709             self.keep_client.head(self.locator)
710         self.assertAutomaticRequestId(mock.responses[0])
711
712     def test_request_id_in_exception(self):
713         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
714             with self.assertRaisesRegex(arvados.errors.KeepReadError, self.test_id):
715                 self.keep_client.head(self.locator, request_id=self.test_id)
716
717         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
718             with self.assertRaisesRegex(arvados.errors.KeepReadError, r'req-[a-z0-9]{20}'):
719                 self.keep_client.get(self.locator)
720
721         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
722             with self.assertRaisesRegex(arvados.errors.KeepWriteError, self.test_id):
723                 self.keep_client.put(self.data, request_id=self.test_id)
724
725         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
726             with self.assertRaisesRegex(arvados.errors.KeepWriteError, r'req-[a-z0-9]{20}'):
727                 self.keep_client.put(self.data)
728
729     def assertAutomaticRequestId(self, resp):
730         hdr = [x for x in resp.getopt(pycurl.HTTPHEADER)
731                if x.startswith('X-Request-Id: ')][0]
732         self.assertNotEqual(hdr, 'X-Request-Id: '+self.test_id)
733         self.assertRegex(hdr, r'^X-Request-Id: req-[a-z0-9]{20}$')
734
735     def assertProvidedRequestId(self, resp):
736         self.assertIn('X-Request-Id: '+self.test_id,
737                       resp.getopt(pycurl.HTTPHEADER))
738
739
740 @tutil.skip_sleep
741 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
742
743     def setUp(self):
744         # expected_order[i] is the probe order for
745         # hash=md5(sprintf("%064x",i)) where there are 16 services
746         # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
747         # the first probe for the block consisting of 64 "0"
748         # characters is the service whose uuid is
749         # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
750         self.services = 16
751         self.expected_order = [
752             list('3eab2d5fc9681074'),
753             list('097dba52e648f1c3'),
754             list('c5b4e023f8a7d691'),
755             list('9d81c02e76a3bf54'),
756             ]
757         self.blocks = [
758             "{:064x}".format(x).encode()
759             for x in range(len(self.expected_order))]
760         self.hashes = [
761             hashlib.md5(self.blocks[x]).hexdigest()
762             for x in range(len(self.expected_order))]
763         self.api_client = self.mock_keep_services(count=self.services)
764         self.keep_client = arvados.KeepClient(api_client=self.api_client)
765
766     def test_weighted_service_roots_against_reference_set(self):
767         # Confirm weighted_service_roots() returns the correct order
768         for i, hash in enumerate(self.hashes):
769             roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash))
770             got_order = [
771                 re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
772                 for root in roots]
773             self.assertEqual(self.expected_order[i], got_order)
774
775     def test_get_probe_order_against_reference_set(self):
776         self._test_probe_order_against_reference_set(
777             lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
778
779     def test_head_probe_order_against_reference_set(self):
780         self._test_probe_order_against_reference_set(
781             lambda i: self.keep_client.head(self.hashes[i], num_retries=1))
782
783     def test_put_probe_order_against_reference_set(self):
784         # copies=1 prevents the test from being sensitive to races
785         # between writer threads.
786         self._test_probe_order_against_reference_set(
787             lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1))
788
789     def _test_probe_order_against_reference_set(self, op):
790         for i in range(len(self.blocks)):
791             with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \
792                  self.assertRaises(arvados.errors.KeepRequestError):
793                 op(i)
794             got_order = [
795                 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
796                 for resp in mock.responses]
797             self.assertEqual(self.expected_order[i]*2, got_order)
798
799     def test_put_probe_order_multiple_copies(self):
800         for copies in range(2, 4):
801             for i in range(len(self.blocks)):
802                 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \
803                      self.assertRaises(arvados.errors.KeepWriteError):
804                     self.keep_client.put(self.blocks[i], num_retries=2, copies=copies)
805                 got_order = [
806                     re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
807                     for resp in mock.responses]
808                 # With T threads racing to make requests, the position
809                 # of a given server in the sequence of HTTP requests
810                 # (got_order) cannot be more than T-1 positions
811                 # earlier than that server's position in the reference
812                 # probe sequence (expected_order).
813                 #
814                 # Loop invariant: we have accounted for +pos+ expected
815                 # probes, either by seeing them in +got_order+ or by
816                 # putting them in +pending+ in the hope of seeing them
817                 # later. As long as +len(pending)<T+, we haven't
818                 # started a request too early.
819                 pending = []
820                 for pos, expected in enumerate(self.expected_order[i]*3):
821                     got = got_order[pos-len(pending)]
822                     while got in pending:
823                         del pending[pending.index(got)]
824                         got = got_order[pos-len(pending)]
825                     if got != expected:
826                         pending.append(expected)
827                         self.assertLess(
828                             len(pending), copies,
829                             "pending={}, with copies={}, got {}, expected {}".format(
830                                 pending, copies, repr(got_order), repr(self.expected_order[i]*3)))
831
832     def test_probe_waste_adding_one_server(self):
833         hashes = [
834             hashlib.md5("{:064x}".format(x).encode()).hexdigest() for x in range(100)]
835         initial_services = 12
836         self.api_client = self.mock_keep_services(count=initial_services)
837         self.keep_client = arvados.KeepClient(api_client=self.api_client)
838         probes_before = [
839             self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
840         for added_services in range(1, 12):
841             api_client = self.mock_keep_services(count=initial_services+added_services)
842             keep_client = arvados.KeepClient(api_client=api_client)
843             total_penalty = 0
844             for hash_index in range(len(hashes)):
845                 probe_after = keep_client.weighted_service_roots(
846                     arvados.KeepLocator(hashes[hash_index]))
847                 penalty = probe_after.index(probes_before[hash_index][0])
848                 self.assertLessEqual(penalty, added_services)
849                 total_penalty += penalty
850             # Average penalty per block should not exceed
851             # N(added)/N(orig) by more than 20%, and should get closer
852             # to the ideal as we add data points.
853             expect_penalty = (
854                 added_services *
855                 len(hashes) / initial_services)
856             max_penalty = (
857                 expect_penalty *
858                 (120 - added_services)/100)
859             min_penalty = (
860                 expect_penalty * 8/10)
861             self.assertTrue(
862                 min_penalty <= total_penalty <= max_penalty,
863                 "With {}+{} services, {} blocks, penalty {} but expected {}..{}".format(
864                     initial_services,
865                     added_services,
866                     len(hashes),
867                     total_penalty,
868                     min_penalty,
869                     max_penalty))
870
871     def check_64_zeros_error_order(self, verb, exc_class):
872         data = b'0' * 64
873         if verb == 'get':
874             data = tutil.str_keep_locator(data)
875         # Arbitrary port number:
876         aport = random.randint(1024,65535)
877         api_client = self.mock_keep_services(service_port=aport, count=self.services)
878         keep_client = arvados.KeepClient(api_client=api_client)
879         with mock.patch('pycurl.Curl') as curl_mock, \
880              self.assertRaises(exc_class) as err_check:
881             curl_mock.return_value = tutil.FakeCurl.make(code=500, body=b'')
882             getattr(keep_client, verb)(data)
883         urls = [urllib.parse.urlparse(url)
884                 for url in err_check.exception.request_errors()]
885         self.assertEqual([('keep0x' + c, aport) for c in '3eab2d5fc9681074'],
886                          [(url.hostname, url.port) for url in urls])
887
888     def test_get_error_shows_probe_order(self):
889         self.check_64_zeros_error_order('get', arvados.errors.KeepReadError)
890
891     def test_put_error_shows_probe_order(self):
892         self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
893
894
895 class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase):
896     # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
897     # 1s worth of data and then trigger bandwidth errors before running
898     # out of data.
899     DATA = b'x'*2**11
900     BANDWIDTH_LOW_LIM = 1024
901     TIMEOUT_TIME = 1.0
902
903     class assertTakesBetween(unittest.TestCase):
904         def __init__(self, tmin, tmax):
905             self.tmin = tmin
906             self.tmax = tmax
907
908         def __enter__(self):
909             self.t0 = time.time()
910
911         def __exit__(self, *args, **kwargs):
912             # Round times to milliseconds, like CURL. Otherwise, we
913             # fail when CURL reaches a 1s timeout at 0.9998s.
914             delta = round(time.time() - self.t0, 3)
915             self.assertGreaterEqual(delta, self.tmin)
916             self.assertLessEqual(delta, self.tmax)
917
918     class assertTakesGreater(unittest.TestCase):
919         def __init__(self, tmin):
920             self.tmin = tmin
921
922         def __enter__(self):
923             self.t0 = time.time()
924
925         def __exit__(self, *args, **kwargs):
926             delta = round(time.time() - self.t0, 3)
927             self.assertGreaterEqual(delta, self.tmin)
928
929     def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
930         return arvados.KeepClient(
931             api_client=self.api_client,
932             timeout=timeouts)
933
934     def test_timeout_slow_connect(self):
935         # Can't simulate TCP delays with our own socket. Leave our
936         # stub server running uselessly, and try to connect to an
937         # unroutable IP address instead.
938         self.api_client = self.mock_keep_services(
939             count=1,
940             service_host='240.0.0.0',
941         )
942         with self.assertTakesBetween(0.1, 0.5):
943             with self.assertRaises(arvados.errors.KeepWriteError):
944                 self.keepClient().put(self.DATA, copies=1, num_retries=0)
945
946     def test_low_bandwidth_no_delays_success(self):
947         self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
948         kc = self.keepClient()
949         loc = kc.put(self.DATA, copies=1, num_retries=0)
950         self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
951
952     def test_too_low_bandwidth_no_delays_failure(self):
953         # Check that lessening bandwidth corresponds to failing
954         kc = self.keepClient()
955         loc = kc.put(self.DATA, copies=1, num_retries=0)
956         self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
957         with self.assertTakesGreater(self.TIMEOUT_TIME):
958             with self.assertRaises(arvados.errors.KeepReadError):
959                 kc.get(loc, num_retries=0)
960         with self.assertTakesGreater(self.TIMEOUT_TIME):
961             with self.assertRaises(arvados.errors.KeepWriteError):
962                 kc.put(self.DATA, copies=1, num_retries=0)
963
964     def test_low_bandwidth_with_server_response_delay_failure(self):
965         kc = self.keepClient()
966         loc = kc.put(self.DATA, copies=1, num_retries=0)
967         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
968         # Note the actual delay must be 1s longer than the low speed
969         # limit interval in order for curl to detect it reliably.
970         self.server.setdelays(response=self.TIMEOUT_TIME+1)
971         with self.assertTakesGreater(self.TIMEOUT_TIME):
972             with self.assertRaises(arvados.errors.KeepReadError):
973                 kc.get(loc, num_retries=0)
974         with self.assertTakesGreater(self.TIMEOUT_TIME):
975             with self.assertRaises(arvados.errors.KeepWriteError):
976                 kc.put(self.DATA, copies=1, num_retries=0)
977         with self.assertTakesGreater(self.TIMEOUT_TIME):
978             kc.head(loc, num_retries=0)
979
980     def test_low_bandwidth_with_server_mid_delay_failure(self):
981         kc = self.keepClient()
982         loc = kc.put(self.DATA, copies=1, num_retries=0)
983         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
984         # Note the actual delay must be 1s longer than the low speed
985         # limit interval in order for curl to detect it reliably.
986         self.server.setdelays(mid_write=self.TIMEOUT_TIME+1, mid_read=self.TIMEOUT_TIME+1)
987         with self.assertTakesGreater(self.TIMEOUT_TIME):
988             with self.assertRaises(arvados.errors.KeepReadError) as e:
989                 kc.get(loc, num_retries=0)
990         with self.assertTakesGreater(self.TIMEOUT_TIME):
991             with self.assertRaises(arvados.errors.KeepWriteError):
992                 kc.put(self.DATA, copies=1, num_retries=0)
993
994     def test_timeout_slow_request(self):
995         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
996         self.server.setdelays(request=.2)
997         self._test_connect_timeout_under_200ms(loc)
998         self.server.setdelays(request=2)
999         self._test_response_timeout_under_2s(loc)
1000
1001     def test_timeout_slow_response(self):
1002         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
1003         self.server.setdelays(response=.2)
1004         self._test_connect_timeout_under_200ms(loc)
1005         self.server.setdelays(response=2)
1006         self._test_response_timeout_under_2s(loc)
1007
1008     def test_timeout_slow_response_body(self):
1009         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
1010         self.server.setdelays(response_body=.2)
1011         self._test_connect_timeout_under_200ms(loc)
1012         self.server.setdelays(response_body=2)
1013         self._test_response_timeout_under_2s(loc)
1014
1015     def _test_connect_timeout_under_200ms(self, loc):
1016         # Allow 100ms to connect, then 1s for response. Everything
1017         # should work, and everything should take at least 200ms to
1018         # return.
1019         kc = self.keepClient(timeouts=(.1, 1))
1020         with self.assertTakesBetween(.2, .3):
1021             kc.put(self.DATA, copies=1, num_retries=0)
1022         with self.assertTakesBetween(.2, .3):
1023             self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
1024
1025     def _test_response_timeout_under_2s(self, loc):
1026         # Allow 10s to connect, then 1s for response. Nothing should
1027         # work, and everything should take at least 1s to return.
1028         kc = self.keepClient(timeouts=(10, 1))
1029         with self.assertTakesBetween(1, 9):
1030             with self.assertRaises(arvados.errors.KeepReadError):
1031                 kc.get(loc, num_retries=0)
1032         with self.assertTakesBetween(1, 9):
1033             with self.assertRaises(arvados.errors.KeepWriteError):
1034                 kc.put(self.DATA, copies=1, num_retries=0)
1035
1036
1037 class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
1038     def mock_disks_and_gateways(self, disks=3, gateways=1):
1039         self.gateways = [{
1040                 'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
1041                 'owner_uuid': 'zzzzz-tpzed-000000000000000',
1042                 'service_host': 'gatewayhost{}'.format(i),
1043                 'service_port': 12345,
1044                 'service_ssl_flag': True,
1045                 'service_type': 'gateway:test',
1046         } for i in range(gateways)]
1047         self.gateway_roots = [
1048             "https://{service_host}:{service_port}/".format(**gw)
1049             for gw in self.gateways]
1050         self.api_client = self.mock_keep_services(
1051             count=disks, additional_services=self.gateways)
1052         self.keepClient = arvados.KeepClient(api_client=self.api_client)
1053
1054     @mock.patch('pycurl.Curl')
1055     def test_get_with_gateway_hint_first(self, MockCurl):
1056         MockCurl.return_value = tutil.FakeCurl.make(
1057             code=200, body='foo', headers={'Content-Length': 3})
1058         self.mock_disks_and_gateways()
1059         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@' + self.gateways[0]['uuid']
1060         self.assertEqual(b'foo', self.keepClient.get(locator))
1061         self.assertEqual(self.gateway_roots[0]+locator,
1062                          MockCurl.return_value.getopt(pycurl.URL).decode())
1063         self.assertEqual(True, self.keepClient.head(locator))
1064
1065     @mock.patch('pycurl.Curl')
1066     def test_get_with_gateway_hints_in_order(self, MockCurl):
1067         gateways = 4
1068         disks = 3
1069         mocks = [
1070             tutil.FakeCurl.make(code=404, body='')
1071             for _ in range(gateways+disks)
1072         ]
1073         MockCurl.side_effect = tutil.queue_with(mocks)
1074         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1075         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1076                            ['K@'+gw['uuid'] for gw in self.gateways])
1077         with self.assertRaises(arvados.errors.NotFoundError):
1078             self.keepClient.get(locator)
1079         # Gateways are tried first, in the order given.
1080         for i, root in enumerate(self.gateway_roots):
1081             self.assertEqual(root+locator,
1082                              mocks[i].getopt(pycurl.URL).decode())
1083         # Disk services are tried next.
1084         for i in range(gateways, gateways+disks):
1085             self.assertRegex(
1086                 mocks[i].getopt(pycurl.URL).decode(),
1087                 r'keep0x')
1088
1089     @mock.patch('pycurl.Curl')
1090     def test_head_with_gateway_hints_in_order(self, MockCurl):
1091         gateways = 4
1092         disks = 3
1093         mocks = [
1094             tutil.FakeCurl.make(code=404, body=b'')
1095             for _ in range(gateways+disks)
1096         ]
1097         MockCurl.side_effect = tutil.queue_with(mocks)
1098         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1099         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1100                            ['K@'+gw['uuid'] for gw in self.gateways])
1101         with self.assertRaises(arvados.errors.NotFoundError):
1102             self.keepClient.head(locator)
1103         # Gateways are tried first, in the order given.
1104         for i, root in enumerate(self.gateway_roots):
1105             self.assertEqual(root+locator,
1106                              mocks[i].getopt(pycurl.URL).decode())
1107         # Disk services are tried next.
1108         for i in range(gateways, gateways+disks):
1109             self.assertRegex(
1110                 mocks[i].getopt(pycurl.URL).decode(),
1111                 r'keep0x')
1112
1113     @mock.patch('pycurl.Curl')
1114     def test_get_with_remote_proxy_hint(self, MockCurl):
1115         MockCurl.return_value = tutil.FakeCurl.make(
1116             code=200, body=b'foo', headers={'Content-Length': 3})
1117         self.mock_disks_and_gateways()
1118         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1119         self.assertEqual(b'foo', self.keepClient.get(locator))
1120         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1121                          MockCurl.return_value.getopt(pycurl.URL).decode())
1122
1123     @mock.patch('pycurl.Curl')
1124     def test_head_with_remote_proxy_hint(self, MockCurl):
1125         MockCurl.return_value = tutil.FakeCurl.make(
1126             code=200, body=b'foo', headers={'Content-Length': 3})
1127         self.mock_disks_and_gateways()
1128         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1129         self.assertEqual(True, self.keepClient.head(locator))
1130         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1131                          MockCurl.return_value.getopt(pycurl.URL).decode())
1132
1133
1134 class KeepClientRetryTestMixin(object):
1135     # Testing with a local Keep store won't exercise the retry behavior.
1136     # Instead, our strategy is:
1137     # * Create a client with one proxy specified (pointed at a black
1138     #   hole), so there's no need to instantiate an API client, and
1139     #   all HTTP requests come from one place.
1140     # * Mock httplib's request method to provide simulated responses.
1141     # This lets us test the retry logic extensively without relying on any
1142     # supporting servers, and prevents side effects in case something hiccups.
1143     # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
1144     # run_method().
1145     #
1146     # Test classes must define TEST_PATCHER to a method that mocks
1147     # out appropriate methods in the client.
1148
1149     PROXY_ADDR = 'http://[%s]:65535/' % (tutil.TEST_HOST,)
1150     TEST_DATA = b'testdata'
1151     TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
1152
1153     def setUp(self):
1154         self.client_kwargs = {'proxy': self.PROXY_ADDR, 'local_store': ''}
1155
1156     def new_client(self, **caller_kwargs):
1157         kwargs = self.client_kwargs.copy()
1158         kwargs.update(caller_kwargs)
1159         return arvados.KeepClient(**kwargs)
1160
1161     def run_method(self, *args, **kwargs):
1162         raise NotImplementedError("test subclasses must define run_method")
1163
1164     def check_success(self, expected=None, *args, **kwargs):
1165         if expected is None:
1166             expected = self.DEFAULT_EXPECT
1167         self.assertEqual(expected, self.run_method(*args, **kwargs))
1168
1169     def check_exception(self, error_class=None, *args, **kwargs):
1170         if error_class is None:
1171             error_class = self.DEFAULT_EXCEPTION
1172         with self.assertRaises(error_class) as err:
1173             self.run_method(*args, **kwargs)
1174         return err
1175
1176     def test_immediate_success(self):
1177         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 200):
1178             self.check_success()
1179
1180     def test_retry_then_success(self):
1181         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1182             self.check_success(num_retries=3)
1183
1184     def test_exception_then_success(self):
1185         with self.TEST_PATCHER(self.DEFAULT_EXPECT, Exception('mock err'), 200):
1186             self.check_success(num_retries=3)
1187
1188     def test_no_default_retry(self):
1189         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1190             self.check_exception()
1191
1192     def test_no_retry_after_permanent_error(self):
1193         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 403, 200):
1194             self.check_exception(num_retries=3)
1195
1196     def test_error_after_retries_exhausted(self):
1197         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 500, 200):
1198             err = self.check_exception(num_retries=1)
1199         self.assertRegex(str(err.exception), r'failed to .* after 2 attempts')
1200
1201     def test_num_retries_instance_fallback(self):
1202         self.client_kwargs['num_retries'] = 3
1203         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1204             self.check_success()
1205
1206
1207 @tutil.skip_sleep
1208 class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1209     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
1210     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1211     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1212     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1213
1214     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1215                    *args, **kwargs):
1216         return self.new_client().get(locator, *args, **kwargs)
1217
1218     def test_specific_exception_when_not_found(self):
1219         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1220             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1221
1222     def test_general_exception_with_mixed_errors(self):
1223         # get should raise a NotFoundError if no server returns the block,
1224         # and a high threshold of servers report that it's not found.
1225         # This test rigs up 50/50 disagreement between two servers, and
1226         # checks that it does not become a NotFoundError.
1227         client = self.new_client()
1228         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1229             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1230                 client.get(self.HINTED_LOCATOR)
1231             self.assertNotIsInstance(
1232                 exc_check.exception, arvados.errors.NotFoundError,
1233                 "mixed errors raised NotFoundError")
1234
1235     def test_hint_server_can_succeed_without_retries(self):
1236         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1237             self.check_success(locator=self.HINTED_LOCATOR)
1238
1239     def test_try_next_server_after_timeout(self):
1240         with tutil.mock_keep_responses(
1241                 (socket.timeout("timed out"), 200),
1242                 (self.DEFAULT_EXPECT, 200)):
1243             self.check_success(locator=self.HINTED_LOCATOR)
1244
1245     def test_retry_data_with_wrong_checksum(self):
1246         with tutil.mock_keep_responses(
1247                 ('baddata', 200),
1248                 (self.DEFAULT_EXPECT, 200)):
1249             self.check_success(locator=self.HINTED_LOCATOR)
1250
1251 @tutil.skip_sleep
1252 class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1253     DEFAULT_EXPECT = True
1254     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1255     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1256     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1257
1258     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1259                    *args, **kwargs):
1260         return self.new_client().head(locator, *args, **kwargs)
1261
1262     def test_specific_exception_when_not_found(self):
1263         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1264             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1265
1266     def test_general_exception_with_mixed_errors(self):
1267         # head should raise a NotFoundError if no server returns the block,
1268         # and a high threshold of servers report that it's not found.
1269         # This test rigs up 50/50 disagreement between two servers, and
1270         # checks that it does not become a NotFoundError.
1271         client = self.new_client()
1272         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1273             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1274                 client.head(self.HINTED_LOCATOR)
1275             self.assertNotIsInstance(
1276                 exc_check.exception, arvados.errors.NotFoundError,
1277                 "mixed errors raised NotFoundError")
1278
1279     def test_hint_server_can_succeed_without_retries(self):
1280         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1281             self.check_success(locator=self.HINTED_LOCATOR)
1282
1283     def test_try_next_server_after_timeout(self):
1284         with tutil.mock_keep_responses(
1285                 (socket.timeout("timed out"), 200),
1286                 (self.DEFAULT_EXPECT, 200)):
1287             self.check_success(locator=self.HINTED_LOCATOR)
1288
1289 @tutil.skip_sleep
1290 class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1291     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
1292     DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
1293     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1294
1295     def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
1296                    copies=1, *args, **kwargs):
1297         return self.new_client().put(data, copies, *args, **kwargs)
1298
1299     def test_do_not_send_multiple_copies_to_same_server(self):
1300         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200):
1301             self.check_exception(copies=2, num_retries=3)
1302
1303
1304 class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
1305
1306     class FakeKeepService(object):
1307         def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
1308             self.delay = delay
1309             self.will_succeed = will_succeed
1310             self.will_raise = will_raise
1311             self._result = {}
1312             self._result['headers'] = {}
1313             self._result['headers']['x-keep-replicas-stored'] = str(replicas)
1314             self._result['headers']['x-keep-storage-classes-confirmed'] = 'default={}'.format(replicas)
1315             self._result['body'] = 'foobar'
1316
1317         def put(self, data_hash, data, timeout, headers):
1318             time.sleep(self.delay)
1319             if self.will_raise is not None:
1320                 raise self.will_raise
1321             return self.will_succeed
1322
1323         def last_result(self):
1324             if self.will_succeed:
1325                 return self._result
1326             else:
1327                 return {"status_code": 500, "body": "didn't succeed"}
1328
1329         def finished(self):
1330             return False
1331
1332     def setUp(self):
1333         self.copies = 3
1334         self.pool = arvados.KeepClient.KeepWriterThreadPool(
1335             data = 'foo',
1336             data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1337             max_service_replicas = self.copies,
1338             copies = self.copies
1339         )
1340
1341     def test_only_write_enough_on_success(self):
1342         for i in range(10):
1343             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1344             self.pool.add_task(ks, None)
1345         self.pool.join()
1346         self.assertEqual(self.pool.done(), (self.copies, []))
1347
1348     def test_only_write_enough_on_partial_success(self):
1349         for i in range(5):
1350             ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
1351             self.pool.add_task(ks, None)
1352             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1353             self.pool.add_task(ks, None)
1354         self.pool.join()
1355         self.assertEqual(self.pool.done(), (self.copies, []))
1356
1357     def test_only_write_enough_when_some_crash(self):
1358         for i in range(5):
1359             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1360             self.pool.add_task(ks, None)
1361             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1362             self.pool.add_task(ks, None)
1363         self.pool.join()
1364         self.assertEqual(self.pool.done(), (self.copies, []))
1365
1366     def test_fail_when_too_many_crash(self):
1367         for i in range(self.copies+1):
1368             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1369             self.pool.add_task(ks, None)
1370         for i in range(self.copies-1):
1371             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1372             self.pool.add_task(ks, None)
1373         self.pool.join()
1374         self.assertEqual(self.pool.done(), (self.copies-1, []))
1375
1376
1377 @tutil.skip_sleep
1378 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock):
1379     # Test put()s that need two distinct servers to succeed, possibly
1380     # requiring multiple passes through the retry loop.
1381
1382     def setUp(self):
1383         self.api_client = self.mock_keep_services(count=2)
1384         self.keep_client = arvados.KeepClient(api_client=self.api_client)
1385
1386     def test_success_after_exception(self):
1387         with tutil.mock_keep_responses(
1388                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1389                 Exception('mock err'), 200, 200) as req_mock:
1390             self.keep_client.put('foo', num_retries=1, copies=2)
1391         self.assertEqual(3, req_mock.call_count)
1392
1393     def test_success_after_retryable_error(self):
1394         with tutil.mock_keep_responses(
1395                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1396                 500, 200, 200) as req_mock:
1397             self.keep_client.put('foo', num_retries=1, copies=2)
1398         self.assertEqual(3, req_mock.call_count)
1399
1400     def test_fail_after_final_error(self):
1401         # First retry loop gets a 200 (can't achieve replication by
1402         # storing again on that server) and a 400 (can't retry that
1403         # server at all), so we shouldn't try a third request.
1404         with tutil.mock_keep_responses(
1405                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1406                 200, 400, 200) as req_mock:
1407             with self.assertRaises(arvados.errors.KeepWriteError):
1408                 self.keep_client.put('foo', num_retries=1, copies=2)
1409         self.assertEqual(2, req_mock.call_count)
1410
1411 class KeepClientAPIErrorTest(unittest.TestCase):
1412     def test_api_fail(self):
1413         class ApiMock(object):
1414             def __getattr__(self, r):
1415                 if r == "api_token":
1416                     return "abc"
1417                 elif r == "insecure":
1418                     return False
1419                 elif r == "config":
1420                     return lambda: {}
1421                 else:
1422                     raise arvados.errors.KeepReadError()
1423         keep_client = arvados.KeepClient(api_client=ApiMock(),
1424                                              proxy='', local_store='')
1425
1426         # The bug this is testing for is that if an API (not
1427         # keepstore) exception is thrown as part of a get(), the next
1428         # attempt to get that same block will result in a deadlock.
1429         # This is why there are two get()s in a row.  Unfortunately,
1430         # the failure mode for this test is that the test suite
1431         # deadlocks, there isn't a good way to avoid that without
1432         # adding a special case that has no use except for this test.
1433
1434         with self.assertRaises(arvados.errors.KeepReadError):
1435             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1436         with self.assertRaises(arvados.errors.KeepReadError):
1437             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")