Merge branch '17635-pysdk-collection-preserve-version' into main. Closes #17635
[arvados.git] / sdk / python / tests / test_keep_client.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 standard_library.install_aliases()
9 from builtins import str
10 from builtins import range
11 from builtins import object
12 import hashlib
13 import mock
14 import os
15 import pycurl
16 import random
17 import re
18 import socket
19 import sys
20 import time
21 import unittest
22 import urllib.parse
23
24 import arvados
25 import arvados.retry
26 import arvados.util
27 from . import arvados_testutil as tutil
28 from . import keepstub
29 from . import run_test_server
30
31 class KeepTestCase(run_test_server.TestCaseWithServers):
32     MAIN_SERVER = {}
33     KEEP_SERVER = {}
34
35     @classmethod
36     def setUpClass(cls):
37         super(KeepTestCase, cls).setUpClass()
38         run_test_server.authorize_with("admin")
39         cls.api_client = arvados.api('v1')
40         cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
41                                              proxy='', local_store='')
42
43     def test_KeepBasicRWTest(self):
44         self.assertEqual(0, self.keep_client.upload_counter.get())
45         foo_locator = self.keep_client.put('foo')
46         self.assertRegex(
47             foo_locator,
48             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
49             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
50
51         # 6 bytes because uploaded 2 copies
52         self.assertEqual(6, self.keep_client.upload_counter.get())
53
54         self.assertEqual(0, self.keep_client.download_counter.get())
55         self.assertEqual(self.keep_client.get(foo_locator),
56                          b'foo',
57                          'wrong content from Keep.get(md5("foo"))')
58         self.assertEqual(3, self.keep_client.download_counter.get())
59
60     def test_KeepBinaryRWTest(self):
61         blob_str = b'\xff\xfe\xf7\x00\x01\x02'
62         blob_locator = self.keep_client.put(blob_str)
63         self.assertRegex(
64             blob_locator,
65             '^7fc7c53b45e53926ba52821140fef396\+6',
66             ('wrong locator from Keep.put(<binarydata>):' + blob_locator))
67         self.assertEqual(self.keep_client.get(blob_locator),
68                          blob_str,
69                          'wrong content from Keep.get(md5(<binarydata>))')
70
71     def test_KeepLongBinaryRWTest(self):
72         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
73         for i in range(0, 23):
74             blob_data = blob_data + blob_data
75         blob_locator = self.keep_client.put(blob_data)
76         self.assertRegex(
77             blob_locator,
78             '^84d90fc0d8175dd5dcfab04b999bc956\+67108864',
79             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
80         self.assertEqual(self.keep_client.get(blob_locator),
81                          blob_data,
82                          'wrong content from Keep.get(md5(<binarydata>))')
83
84     @unittest.skip("unreliable test - please fix and close #8752")
85     def test_KeepSingleCopyRWTest(self):
86         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
87         blob_locator = self.keep_client.put(blob_data, copies=1)
88         self.assertRegex(
89             blob_locator,
90             '^c902006bc98a3eb4a3663b65ab4a6fab\+8',
91             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
92         self.assertEqual(self.keep_client.get(blob_locator),
93                          blob_data,
94                          'wrong content from Keep.get(md5(<binarydata>))')
95
96     def test_KeepEmptyCollectionTest(self):
97         blob_locator = self.keep_client.put('', copies=1)
98         self.assertRegex(
99             blob_locator,
100             '^d41d8cd98f00b204e9800998ecf8427e\+0',
101             ('wrong locator from Keep.put(""): ' + blob_locator))
102
103     def test_unicode_must_be_ascii(self):
104         # If unicode type, must only consist of valid ASCII
105         foo_locator = self.keep_client.put(u'foo')
106         self.assertRegex(
107             foo_locator,
108             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
109             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
110
111         if sys.version_info < (3, 0):
112             with self.assertRaises(UnicodeEncodeError):
113                 # Error if it is not ASCII
114                 self.keep_client.put(u'\xe2')
115
116         with self.assertRaises(AttributeError):
117             # Must be bytes or have an encode() method
118             self.keep_client.put({})
119
120     def test_KeepHeadTest(self):
121         locator = self.keep_client.put('test_head')
122         self.assertRegex(
123             locator,
124             '^b9a772c7049325feb7130fff1f8333e9\+9',
125             'wrong md5 hash from Keep.put for "test_head": ' + locator)
126         self.assertEqual(True, self.keep_client.head(locator))
127         self.assertEqual(self.keep_client.get(locator),
128                          b'test_head',
129                          'wrong content from Keep.get for "test_head"')
130
131 class KeepPermissionTestCase(run_test_server.TestCaseWithServers):
132     MAIN_SERVER = {}
133     KEEP_SERVER = {'blob_signing': True}
134
135     def test_KeepBasicRWTest(self):
136         run_test_server.authorize_with('active')
137         keep_client = arvados.KeepClient()
138         foo_locator = keep_client.put('foo')
139         self.assertRegex(
140             foo_locator,
141             r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
142             'invalid locator from Keep.put("foo"): ' + foo_locator)
143         self.assertEqual(keep_client.get(foo_locator),
144                          b'foo',
145                          'wrong content from Keep.get(md5("foo"))')
146
147         # GET with an unsigned locator => NotFound
148         bar_locator = keep_client.put('bar')
149         unsigned_bar_locator = "37b51d194a7513e45b56f6524f2d51f2+3"
150         self.assertRegex(
151             bar_locator,
152             r'^37b51d194a7513e45b56f6524f2d51f2\+3\+A[a-f0-9]+@[a-f0-9]+$',
153             'invalid locator from Keep.put("bar"): ' + bar_locator)
154         self.assertRaises(arvados.errors.NotFoundError,
155                           keep_client.get,
156                           unsigned_bar_locator)
157
158         # GET from a different user => NotFound
159         run_test_server.authorize_with('spectator')
160         self.assertRaises(arvados.errors.NotFoundError,
161                           arvados.Keep.get,
162                           bar_locator)
163
164         # Unauthenticated GET for a signed locator => NotFound
165         # Unauthenticated GET for an unsigned locator => NotFound
166         keep_client.api_token = ''
167         self.assertRaises(arvados.errors.NotFoundError,
168                           keep_client.get,
169                           bar_locator)
170         self.assertRaises(arvados.errors.NotFoundError,
171                           keep_client.get,
172                           unsigned_bar_locator)
173
174
175 class KeepProxyTestCase(run_test_server.TestCaseWithServers):
176     MAIN_SERVER = {}
177     KEEP_SERVER = {}
178     KEEP_PROXY_SERVER = {}
179
180     @classmethod
181     def setUpClass(cls):
182         super(KeepProxyTestCase, cls).setUpClass()
183         run_test_server.authorize_with('active')
184         cls.api_client = arvados.api('v1')
185
186     def tearDown(self):
187         arvados.config.settings().pop('ARVADOS_EXTERNAL_CLIENT', None)
188         super(KeepProxyTestCase, self).tearDown()
189
190     def test_KeepProxyTest1(self):
191         # Will use ARVADOS_KEEP_SERVICES environment variable that
192         # is set by setUpClass().
193         keep_client = arvados.KeepClient(api_client=self.api_client,
194                                          local_store='')
195         baz_locator = keep_client.put('baz')
196         self.assertRegex(
197             baz_locator,
198             '^73feffa4b7f6bb68e44cf984c85f6e88\+3',
199             'wrong md5 hash from Keep.put("baz"): ' + baz_locator)
200         self.assertEqual(keep_client.get(baz_locator),
201                          b'baz',
202                          'wrong content from Keep.get(md5("baz"))')
203         self.assertTrue(keep_client.using_proxy)
204
205     def test_KeepProxyTest2(self):
206         # Don't instantiate the proxy directly, but set the X-External-Client
207         # header.  The API server should direct us to the proxy.
208         arvados.config.settings()['ARVADOS_EXTERNAL_CLIENT'] = 'true'
209         keep_client = arvados.KeepClient(api_client=self.api_client,
210                                          proxy='', local_store='')
211         baz_locator = keep_client.put('baz2')
212         self.assertRegex(
213             baz_locator,
214             '^91f372a266fe2bf2823cb8ec7fda31ce\+4',
215             'wrong md5 hash from Keep.put("baz2"): ' + baz_locator)
216         self.assertEqual(keep_client.get(baz_locator),
217                          b'baz2',
218                          'wrong content from Keep.get(md5("baz2"))')
219         self.assertTrue(keep_client.using_proxy)
220
221     def test_KeepProxyTestMultipleURIs(self):
222         # Test using ARVADOS_KEEP_SERVICES env var overriding any
223         # existing proxy setting and setting multiple proxies
224         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
225         keep_client = arvados.KeepClient(api_client=self.api_client,
226                                          local_store='')
227         uris = [x['_service_root'] for x in keep_client._keep_services]
228         self.assertEqual(uris, ['http://10.0.0.1/',
229                                 'https://foo.example.org:1234/'])
230
231     def test_KeepProxyTestInvalidURI(self):
232         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org'
233         with self.assertRaises(arvados.errors.ArgumentError):
234             keep_client = arvados.KeepClient(api_client=self.api_client,
235                                              local_store='')
236
237
238 class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
239     def get_service_roots(self, api_client):
240         keep_client = arvados.KeepClient(api_client=api_client)
241         services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
242         return [urllib.parse.urlparse(url) for url in sorted(services)]
243
244     def test_ssl_flag_respected_in_roots(self):
245         for ssl_flag in [False, True]:
246             services = self.get_service_roots(self.mock_keep_services(
247                 service_ssl_flag=ssl_flag))
248             self.assertEqual(
249                 ('https' if ssl_flag else 'http'), services[0].scheme)
250
251     def test_correct_ports_with_ipv6_addresses(self):
252         service = self.get_service_roots(self.mock_keep_services(
253             service_type='proxy', service_host='100::1', service_port=10, count=1))[0]
254         self.assertEqual('100::1', service.hostname)
255         self.assertEqual(10, service.port)
256
257     def test_insecure_disables_tls_verify(self):
258         api_client = self.mock_keep_services(count=1)
259         force_timeout = socket.timeout("timed out")
260
261         api_client.insecure = True
262         with tutil.mock_keep_responses(b'foo', 200) as mock:
263             keep_client = arvados.KeepClient(api_client=api_client)
264             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
265             self.assertEqual(
266                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
267                 0)
268
269         api_client.insecure = False
270         with tutil.mock_keep_responses(b'foo', 200) as mock:
271             keep_client = arvados.KeepClient(api_client=api_client)
272             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
273             # getopt()==None here means we didn't change the
274             # default. If we were using real pycurl instead of a mock,
275             # it would return the default value 1.
276             self.assertEqual(
277                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
278                 None)
279
280     def test_refresh_signature(self):
281         blk_digest = '6f5902ac237024bdd0c176cb93063dc4+11'
282         blk_sig = 'da39a3ee5e6b4b0d3255bfef95601890afd80709@53bed294'
283         local_loc = blk_digest+'+A'+blk_sig
284         remote_loc = blk_digest+'+R'+blk_sig
285         api_client = self.mock_keep_services(count=1)
286         headers = {'X-Keep-Locator':local_loc}
287         with tutil.mock_keep_responses('', 200, **headers):
288             # Check that the translated locator gets returned
289             keep_client = arvados.KeepClient(api_client=api_client)
290             self.assertEqual(local_loc, keep_client.refresh_signature(remote_loc))
291             # Check that refresh_signature() uses the correct method and headers
292             keep_client._get_or_head = mock.MagicMock()
293             keep_client.refresh_signature(remote_loc)
294             args, kwargs = keep_client._get_or_head.call_args_list[0]
295             self.assertIn(remote_loc, args)
296             self.assertEqual("HEAD", kwargs['method'])
297             self.assertIn('X-Keep-Signature', kwargs['headers'])
298
299     # test_*_timeout verify that KeepClient instructs pycurl to use
300     # the appropriate connection and read timeouts. They don't care
301     # whether pycurl actually exhibits the expected timeout behavior
302     # -- those tests are in the KeepClientTimeout test class.
303
304     def test_get_timeout(self):
305         api_client = self.mock_keep_services(count=1)
306         force_timeout = socket.timeout("timed out")
307         with tutil.mock_keep_responses(force_timeout, 0) as mock:
308             keep_client = arvados.KeepClient(api_client=api_client)
309             with self.assertRaises(arvados.errors.KeepReadError):
310                 keep_client.get('ffffffffffffffffffffffffffffffff')
311             self.assertEqual(
312                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
313                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
314             self.assertEqual(
315                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
316                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
317             self.assertEqual(
318                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
319                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
320
321     def test_put_timeout(self):
322         api_client = self.mock_keep_services(count=1)
323         force_timeout = socket.timeout("timed out")
324         with tutil.mock_keep_responses(force_timeout, 0) as mock:
325             keep_client = arvados.KeepClient(api_client=api_client)
326             with self.assertRaises(arvados.errors.KeepWriteError):
327                 keep_client.put(b'foo')
328             self.assertEqual(
329                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
330                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
331             self.assertEqual(
332                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
333                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
334             self.assertEqual(
335                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
336                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
337
338     def test_head_timeout(self):
339         api_client = self.mock_keep_services(count=1)
340         force_timeout = socket.timeout("timed out")
341         with tutil.mock_keep_responses(force_timeout, 0) as mock:
342             keep_client = arvados.KeepClient(api_client=api_client)
343             with self.assertRaises(arvados.errors.KeepReadError):
344                 keep_client.head('ffffffffffffffffffffffffffffffff')
345             self.assertEqual(
346                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
347                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
348             self.assertEqual(
349                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
350                 None)
351             self.assertEqual(
352                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
353                 None)
354
355     def test_proxy_get_timeout(self):
356         api_client = self.mock_keep_services(service_type='proxy', count=1)
357         force_timeout = socket.timeout("timed out")
358         with tutil.mock_keep_responses(force_timeout, 0) as mock:
359             keep_client = arvados.KeepClient(api_client=api_client)
360             with self.assertRaises(arvados.errors.KeepReadError):
361                 keep_client.get('ffffffffffffffffffffffffffffffff')
362             self.assertEqual(
363                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
364                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
365             self.assertEqual(
366                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
367                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
368             self.assertEqual(
369                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
370                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
371
372     def test_proxy_head_timeout(self):
373         api_client = self.mock_keep_services(service_type='proxy', count=1)
374         force_timeout = socket.timeout("timed out")
375         with tutil.mock_keep_responses(force_timeout, 0) as mock:
376             keep_client = arvados.KeepClient(api_client=api_client)
377             with self.assertRaises(arvados.errors.KeepReadError):
378                 keep_client.head('ffffffffffffffffffffffffffffffff')
379             self.assertEqual(
380                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
381                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
382             self.assertEqual(
383                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
384                 None)
385             self.assertEqual(
386                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
387                 None)
388
389     def test_proxy_put_timeout(self):
390         api_client = self.mock_keep_services(service_type='proxy', count=1)
391         force_timeout = socket.timeout("timed out")
392         with tutil.mock_keep_responses(force_timeout, 0) as mock:
393             keep_client = arvados.KeepClient(api_client=api_client)
394             with self.assertRaises(arvados.errors.KeepWriteError):
395                 keep_client.put('foo')
396             self.assertEqual(
397                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
398                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
399             self.assertEqual(
400                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
401                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
402             self.assertEqual(
403                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
404                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
405
406     def check_no_services_error(self, verb, exc_class):
407         api_client = mock.MagicMock(name='api_client')
408         api_client.keep_services().accessible().execute.side_effect = (
409             arvados.errors.ApiError)
410         keep_client = arvados.KeepClient(api_client=api_client)
411         with self.assertRaises(exc_class) as err_check:
412             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
413         self.assertEqual(0, len(err_check.exception.request_errors()))
414
415     def test_get_error_with_no_services(self):
416         self.check_no_services_error('get', arvados.errors.KeepReadError)
417
418     def test_head_error_with_no_services(self):
419         self.check_no_services_error('head', arvados.errors.KeepReadError)
420
421     def test_put_error_with_no_services(self):
422         self.check_no_services_error('put', arvados.errors.KeepWriteError)
423
424     def check_errors_from_last_retry(self, verb, exc_class):
425         api_client = self.mock_keep_services(count=2)
426         req_mock = tutil.mock_keep_responses(
427             "retry error reporting test", 500, 500, 500, 500, 500, 500, 502, 502)
428         with req_mock, tutil.skip_sleep, \
429                 self.assertRaises(exc_class) as err_check:
430             keep_client = arvados.KeepClient(api_client=api_client)
431             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
432                                        num_retries=3)
433         self.assertEqual([502, 502], [
434                 getattr(error, 'status_code', None)
435                 for error in err_check.exception.request_errors().values()])
436         self.assertRegex(str(err_check.exception), r'failed to (read|write) .* after 4 attempts')
437
438     def test_get_error_reflects_last_retry(self):
439         self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
440
441     def test_head_error_reflects_last_retry(self):
442         self.check_errors_from_last_retry('head', arvados.errors.KeepReadError)
443
444     def test_put_error_reflects_last_retry(self):
445         self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
446
447     def test_put_error_does_not_include_successful_puts(self):
448         data = 'partial failure test'
449         data_loc = tutil.str_keep_locator(data)
450         api_client = self.mock_keep_services(count=3)
451         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
452                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
453             keep_client = arvados.KeepClient(api_client=api_client)
454             keep_client.put(data)
455         self.assertEqual(2, len(exc_check.exception.request_errors()))
456
457     def test_proxy_put_with_no_writable_services(self):
458         data = 'test with no writable services'
459         data_loc = tutil.str_keep_locator(data)
460         api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
461         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
462                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
463           keep_client = arvados.KeepClient(api_client=api_client)
464           keep_client.put(data)
465         self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
466         self.assertEqual(0, len(exc_check.exception.request_errors()))
467
468     def test_oddball_service_get(self):
469         body = b'oddball service get'
470         api_client = self.mock_keep_services(service_type='fancynewblobstore')
471         with tutil.mock_keep_responses(body, 200):
472             keep_client = arvados.KeepClient(api_client=api_client)
473             actual = keep_client.get(tutil.str_keep_locator(body))
474         self.assertEqual(body, actual)
475
476     def test_oddball_service_put(self):
477         body = b'oddball service put'
478         pdh = tutil.str_keep_locator(body)
479         api_client = self.mock_keep_services(service_type='fancynewblobstore')
480         with tutil.mock_keep_responses(pdh, 200):
481             keep_client = arvados.KeepClient(api_client=api_client)
482             actual = keep_client.put(body, copies=1)
483         self.assertEqual(pdh, actual)
484
485     def test_oddball_service_writer_count(self):
486         body = b'oddball service writer count'
487         pdh = tutil.str_keep_locator(body)
488         api_client = self.mock_keep_services(service_type='fancynewblobstore',
489                                              count=4)
490         headers = {'x-keep-replicas-stored': 3}
491         with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
492                                        **headers) as req_mock:
493             keep_client = arvados.KeepClient(api_client=api_client)
494             actual = keep_client.put(body, copies=2)
495         self.assertEqual(pdh, actual)
496         self.assertEqual(1, req_mock.call_count)
497
498
499 @tutil.skip_sleep
500 class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
501     def setUp(self):
502         self.api_client = self.mock_keep_services(count=2)
503         self.keep_client = arvados.KeepClient(api_client=self.api_client)
504         self.data = b'xyzzy'
505         self.locator = '1271ed5ef305aadabc605b1609e24c52'
506
507     @mock.patch('arvados.KeepClient.KeepService.get')
508     def test_get_request_cache(self, get_mock):
509         with tutil.mock_keep_responses(self.data, 200, 200):
510             self.keep_client.get(self.locator)
511             self.keep_client.get(self.locator)
512         # Request already cached, don't require more than one request
513         get_mock.assert_called_once()
514
515     @mock.patch('arvados.KeepClient.KeepService.get')
516     def test_head_request_cache(self, get_mock):
517         with tutil.mock_keep_responses(self.data, 200, 200):
518             self.keep_client.head(self.locator)
519             self.keep_client.head(self.locator)
520         # Don't cache HEAD requests so that they're not confused with GET reqs
521         self.assertEqual(2, get_mock.call_count)
522
523     @mock.patch('arvados.KeepClient.KeepService.get')
524     def test_head_and_then_get_return_different_responses(self, get_mock):
525         head_resp = None
526         get_resp = None
527         get_mock.side_effect = ['first response', 'second response']
528         with tutil.mock_keep_responses(self.data, 200, 200):
529             head_resp = self.keep_client.head(self.locator)
530             get_resp = self.keep_client.get(self.locator)
531         self.assertEqual('first response', head_resp)
532         # First reponse was not cached because it was from a HEAD request.
533         self.assertNotEqual(head_resp, get_resp)
534
535 @tutil.skip_sleep
536 class KeepStorageClassesTestCase(unittest.TestCase, tutil.ApiClientMock):
537     def setUp(self):
538         self.api_client = self.mock_keep_services(count=2)
539         self.keep_client = arvados.KeepClient(api_client=self.api_client)
540         self.data = b'xyzzy'
541         self.locator = '1271ed5ef305aadabc605b1609e24c52'
542
543     def test_multiple_default_storage_classes_req_header(self):
544         api_mock = self.api_client_mock()
545         api_mock.config.return_value = {
546             'StorageClasses': {
547                 'foo': { 'Default': True },
548                 'bar': { 'Default': True },
549                 'baz': { 'Default': False }
550             }
551         }
552         api_client = self.mock_keep_services(api_mock=api_mock, count=2)
553         keep_client = arvados.KeepClient(api_client=api_client)
554         resp_hdr = {
555             'x-keep-storage-classes-confirmed': 'foo=1, bar=1',
556             'x-keep-replicas-stored': 1
557         }
558         with tutil.mock_keep_responses(self.locator, 200, **resp_hdr) as mock:
559             keep_client.put(self.data, copies=1)
560             req_hdr = mock.responses[0]
561             self.assertIn(
562                 'X-Keep-Storage-Classes: bar, foo', req_hdr.getopt(pycurl.HTTPHEADER))
563
564     def test_storage_classes_req_header(self):
565         self.assertEqual(
566             self.api_client.config()['StorageClasses'],
567             {'default': {'Default': True}})
568         cases = [
569             # requested, expected
570             [['foo'], 'X-Keep-Storage-Classes: foo'],
571             [['bar', 'foo'], 'X-Keep-Storage-Classes: bar, foo'],
572             [[], 'X-Keep-Storage-Classes: default'],
573             [None, 'X-Keep-Storage-Classes: default'],
574         ]
575         for req_classes, expected_header in cases:
576             headers = {'x-keep-replicas-stored': 1}
577             if req_classes is None or len(req_classes) == 0:
578                 confirmed_hdr = 'default=1'
579             elif len(req_classes) > 0:
580                 confirmed_hdr = ', '.join(["{}=1".format(cls) for cls in req_classes])
581             headers.update({'x-keep-storage-classes-confirmed': confirmed_hdr})
582             with tutil.mock_keep_responses(self.locator, 200, **headers) as mock:
583                 self.keep_client.put(self.data, copies=1, classes=req_classes)
584                 req_hdr = mock.responses[0]
585                 self.assertIn(expected_header, req_hdr.getopt(pycurl.HTTPHEADER))
586
587     def test_partial_storage_classes_put(self):
588         headers = {
589             'x-keep-replicas-stored': 1,
590             'x-keep-storage-classes-confirmed': 'foo=1'}
591         with tutil.mock_keep_responses(self.locator, 200, 503, **headers) as mock:
592             with self.assertRaises(arvados.errors.KeepWriteError):
593                 self.keep_client.put(self.data, copies=1, classes=['foo', 'bar'])
594             # 1st request, both classes pending
595             req1_headers = mock.responses[0].getopt(pycurl.HTTPHEADER)
596             self.assertIn('X-Keep-Storage-Classes: bar, foo', req1_headers)
597             # 2nd try, 'foo' class already satisfied
598             req2_headers = mock.responses[1].getopt(pycurl.HTTPHEADER)
599             self.assertIn('X-Keep-Storage-Classes: bar', req2_headers)
600
601     def test_successful_storage_classes_put_requests(self):
602         cases = [
603             # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, expected_requests
604             [ 1, ['foo'], 1, 'foo=1', 1],
605             [ 1, ['foo'], 2, 'foo=2', 1],
606             [ 2, ['foo'], 2, 'foo=2', 1],
607             [ 2, ['foo'], 1, 'foo=1', 2],
608             [ 1, ['foo', 'bar'], 1, 'foo=1, bar=1', 1],
609             [ 1, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
610             [ 2, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
611             [ 2, ['foo', 'bar'], 1, 'foo=1, bar=1', 2],
612             [ 1, ['foo', 'bar'], 1, None, 1],
613             [ 1, ['foo'], 1, None, 1],
614             [ 2, ['foo'], 2, None, 1],
615             [ 2, ['foo'], 1, None, 2],
616         ]
617         for w_copies, w_classes, c_copies, c_classes, e_reqs in cases:
618             headers = {'x-keep-replicas-stored': c_copies}
619             if c_classes is not None:
620                 headers.update({'x-keep-storage-classes-confirmed': c_classes})
621             with tutil.mock_keep_responses(self.locator, 200, 200, **headers) as mock:
622                 case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}", expected_requests={}'.format(w_copies, ', '.join(w_classes), c_copies, c_classes, e_reqs)
623                 self.assertEqual(self.locator,
624                     self.keep_client.put(self.data, copies=w_copies, classes=w_classes),
625                     case_desc)
626                 self.assertEqual(e_reqs, mock.call_count, case_desc)
627
628     def test_failed_storage_classes_put_requests(self):
629         cases = [
630             # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, return_code
631             [ 1, ['foo'], 1, 'bar=1', 200],
632             [ 1, ['foo'], 1, None, 503],
633             [ 2, ['foo'], 1, 'bar=1, foo=0', 200],
634             [ 3, ['foo'], 1, 'bar=1, foo=1', 200],
635             [ 3, ['foo', 'bar'], 1, 'bar=2, foo=1', 200],
636         ]
637         for w_copies, w_classes, c_copies, c_classes, return_code in cases:
638             headers = {'x-keep-replicas-stored': c_copies}
639             if c_classes is not None:
640                 headers.update({'x-keep-storage-classes-confirmed': c_classes})
641             with tutil.mock_keep_responses(self.locator, return_code, return_code, **headers):
642                 case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}"'.format(w_copies, ', '.join(w_classes), c_copies, c_classes)
643                 with self.assertRaises(arvados.errors.KeepWriteError, msg=case_desc):
644                     self.keep_client.put(self.data, copies=w_copies, classes=w_classes)
645
646 @tutil.skip_sleep
647 class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock):
648     def setUp(self):
649         self.api_client = self.mock_keep_services(count=2)
650         self.keep_client = arvados.KeepClient(api_client=self.api_client)
651         self.data = b'xyzzy'
652         self.locator = '1271ed5ef305aadabc605b1609e24c52'
653         self.test_id = arvados.util.new_request_id()
654         self.assertRegex(self.test_id, r'^req-[a-z0-9]{20}$')
655         # If we don't set request_id to None explicitly here, it will
656         # return <MagicMock name='api_client_mock.request_id'
657         # id='123456789'>:
658         self.api_client.request_id = None
659
660     def test_default_to_api_client_request_id(self):
661         self.api_client.request_id = self.test_id
662         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
663             self.keep_client.put(self.data)
664         self.assertEqual(2, len(mock.responses))
665         for resp in mock.responses:
666             self.assertProvidedRequestId(resp)
667
668         with tutil.mock_keep_responses(self.data, 200) as mock:
669             self.keep_client.get(self.locator)
670         self.assertProvidedRequestId(mock.responses[0])
671
672         with tutil.mock_keep_responses(b'', 200) as mock:
673             self.keep_client.head(self.locator)
674         self.assertProvidedRequestId(mock.responses[0])
675
676     def test_explicit_request_id(self):
677         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
678             self.keep_client.put(self.data, request_id=self.test_id)
679         self.assertEqual(2, len(mock.responses))
680         for resp in mock.responses:
681             self.assertProvidedRequestId(resp)
682
683         with tutil.mock_keep_responses(self.data, 200) as mock:
684             self.keep_client.get(self.locator, request_id=self.test_id)
685         self.assertProvidedRequestId(mock.responses[0])
686
687         with tutil.mock_keep_responses(b'', 200) as mock:
688             self.keep_client.head(self.locator, request_id=self.test_id)
689         self.assertProvidedRequestId(mock.responses[0])
690
691     def test_automatic_request_id(self):
692         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
693             self.keep_client.put(self.data)
694         self.assertEqual(2, len(mock.responses))
695         for resp in mock.responses:
696             self.assertAutomaticRequestId(resp)
697
698         with tutil.mock_keep_responses(self.data, 200) as mock:
699             self.keep_client.get(self.locator)
700         self.assertAutomaticRequestId(mock.responses[0])
701
702         with tutil.mock_keep_responses(b'', 200) as mock:
703             self.keep_client.head(self.locator)
704         self.assertAutomaticRequestId(mock.responses[0])
705
706     def test_request_id_in_exception(self):
707         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
708             with self.assertRaisesRegex(arvados.errors.KeepReadError, self.test_id):
709                 self.keep_client.head(self.locator, request_id=self.test_id)
710
711         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
712             with self.assertRaisesRegex(arvados.errors.KeepReadError, r'req-[a-z0-9]{20}'):
713                 self.keep_client.get(self.locator)
714
715         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
716             with self.assertRaisesRegex(arvados.errors.KeepWriteError, self.test_id):
717                 self.keep_client.put(self.data, request_id=self.test_id)
718
719         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
720             with self.assertRaisesRegex(arvados.errors.KeepWriteError, r'req-[a-z0-9]{20}'):
721                 self.keep_client.put(self.data)
722
723     def assertAutomaticRequestId(self, resp):
724         hdr = [x for x in resp.getopt(pycurl.HTTPHEADER)
725                if x.startswith('X-Request-Id: ')][0]
726         self.assertNotEqual(hdr, 'X-Request-Id: '+self.test_id)
727         self.assertRegex(hdr, r'^X-Request-Id: req-[a-z0-9]{20}$')
728
729     def assertProvidedRequestId(self, resp):
730         self.assertIn('X-Request-Id: '+self.test_id,
731                       resp.getopt(pycurl.HTTPHEADER))
732
733
734 @tutil.skip_sleep
735 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
736
737     def setUp(self):
738         # expected_order[i] is the probe order for
739         # hash=md5(sprintf("%064x",i)) where there are 16 services
740         # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
741         # the first probe for the block consisting of 64 "0"
742         # characters is the service whose uuid is
743         # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
744         self.services = 16
745         self.expected_order = [
746             list('3eab2d5fc9681074'),
747             list('097dba52e648f1c3'),
748             list('c5b4e023f8a7d691'),
749             list('9d81c02e76a3bf54'),
750             ]
751         self.blocks = [
752             "{:064x}".format(x).encode()
753             for x in range(len(self.expected_order))]
754         self.hashes = [
755             hashlib.md5(self.blocks[x]).hexdigest()
756             for x in range(len(self.expected_order))]
757         self.api_client = self.mock_keep_services(count=self.services)
758         self.keep_client = arvados.KeepClient(api_client=self.api_client)
759
760     def test_weighted_service_roots_against_reference_set(self):
761         # Confirm weighted_service_roots() returns the correct order
762         for i, hash in enumerate(self.hashes):
763             roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash))
764             got_order = [
765                 re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
766                 for root in roots]
767             self.assertEqual(self.expected_order[i], got_order)
768
769     def test_get_probe_order_against_reference_set(self):
770         self._test_probe_order_against_reference_set(
771             lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
772
773     def test_head_probe_order_against_reference_set(self):
774         self._test_probe_order_against_reference_set(
775             lambda i: self.keep_client.head(self.hashes[i], num_retries=1))
776
777     def test_put_probe_order_against_reference_set(self):
778         # copies=1 prevents the test from being sensitive to races
779         # between writer threads.
780         self._test_probe_order_against_reference_set(
781             lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1))
782
783     def _test_probe_order_against_reference_set(self, op):
784         for i in range(len(self.blocks)):
785             with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \
786                  self.assertRaises(arvados.errors.KeepRequestError):
787                 op(i)
788             got_order = [
789                 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
790                 for resp in mock.responses]
791             self.assertEqual(self.expected_order[i]*2, got_order)
792
793     def test_put_probe_order_multiple_copies(self):
794         for copies in range(2, 4):
795             for i in range(len(self.blocks)):
796                 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \
797                      self.assertRaises(arvados.errors.KeepWriteError):
798                     self.keep_client.put(self.blocks[i], num_retries=2, copies=copies)
799                 got_order = [
800                     re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
801                     for resp in mock.responses]
802                 # With T threads racing to make requests, the position
803                 # of a given server in the sequence of HTTP requests
804                 # (got_order) cannot be more than T-1 positions
805                 # earlier than that server's position in the reference
806                 # probe sequence (expected_order).
807                 #
808                 # Loop invariant: we have accounted for +pos+ expected
809                 # probes, either by seeing them in +got_order+ or by
810                 # putting them in +pending+ in the hope of seeing them
811                 # later. As long as +len(pending)<T+, we haven't
812                 # started a request too early.
813                 pending = []
814                 for pos, expected in enumerate(self.expected_order[i]*3):
815                     got = got_order[pos-len(pending)]
816                     while got in pending:
817                         del pending[pending.index(got)]
818                         got = got_order[pos-len(pending)]
819                     if got != expected:
820                         pending.append(expected)
821                         self.assertLess(
822                             len(pending), copies,
823                             "pending={}, with copies={}, got {}, expected {}".format(
824                                 pending, copies, repr(got_order), repr(self.expected_order[i]*3)))
825
826     def test_probe_waste_adding_one_server(self):
827         hashes = [
828             hashlib.md5("{:064x}".format(x).encode()).hexdigest() for x in range(100)]
829         initial_services = 12
830         self.api_client = self.mock_keep_services(count=initial_services)
831         self.keep_client = arvados.KeepClient(api_client=self.api_client)
832         probes_before = [
833             self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
834         for added_services in range(1, 12):
835             api_client = self.mock_keep_services(count=initial_services+added_services)
836             keep_client = arvados.KeepClient(api_client=api_client)
837             total_penalty = 0
838             for hash_index in range(len(hashes)):
839                 probe_after = keep_client.weighted_service_roots(
840                     arvados.KeepLocator(hashes[hash_index]))
841                 penalty = probe_after.index(probes_before[hash_index][0])
842                 self.assertLessEqual(penalty, added_services)
843                 total_penalty += penalty
844             # Average penalty per block should not exceed
845             # N(added)/N(orig) by more than 20%, and should get closer
846             # to the ideal as we add data points.
847             expect_penalty = (
848                 added_services *
849                 len(hashes) / initial_services)
850             max_penalty = (
851                 expect_penalty *
852                 (120 - added_services)/100)
853             min_penalty = (
854                 expect_penalty * 8/10)
855             self.assertTrue(
856                 min_penalty <= total_penalty <= max_penalty,
857                 "With {}+{} services, {} blocks, penalty {} but expected {}..{}".format(
858                     initial_services,
859                     added_services,
860                     len(hashes),
861                     total_penalty,
862                     min_penalty,
863                     max_penalty))
864
865     def check_64_zeros_error_order(self, verb, exc_class):
866         data = b'0' * 64
867         if verb == 'get':
868             data = tutil.str_keep_locator(data)
869         # Arbitrary port number:
870         aport = random.randint(1024,65535)
871         api_client = self.mock_keep_services(service_port=aport, count=self.services)
872         keep_client = arvados.KeepClient(api_client=api_client)
873         with mock.patch('pycurl.Curl') as curl_mock, \
874              self.assertRaises(exc_class) as err_check:
875             curl_mock.return_value = tutil.FakeCurl.make(code=500, body=b'')
876             getattr(keep_client, verb)(data)
877         urls = [urllib.parse.urlparse(url)
878                 for url in err_check.exception.request_errors()]
879         self.assertEqual([('keep0x' + c, aport) for c in '3eab2d5fc9681074'],
880                          [(url.hostname, url.port) for url in urls])
881
882     def test_get_error_shows_probe_order(self):
883         self.check_64_zeros_error_order('get', arvados.errors.KeepReadError)
884
885     def test_put_error_shows_probe_order(self):
886         self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
887
888
889 class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase):
890     # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
891     # 1s worth of data and then trigger bandwidth errors before running
892     # out of data.
893     DATA = b'x'*2**11
894     BANDWIDTH_LOW_LIM = 1024
895     TIMEOUT_TIME = 1.0
896
897     class assertTakesBetween(unittest.TestCase):
898         def __init__(self, tmin, tmax):
899             self.tmin = tmin
900             self.tmax = tmax
901
902         def __enter__(self):
903             self.t0 = time.time()
904
905         def __exit__(self, *args, **kwargs):
906             # Round times to milliseconds, like CURL. Otherwise, we
907             # fail when CURL reaches a 1s timeout at 0.9998s.
908             delta = round(time.time() - self.t0, 3)
909             self.assertGreaterEqual(delta, self.tmin)
910             self.assertLessEqual(delta, self.tmax)
911
912     class assertTakesGreater(unittest.TestCase):
913         def __init__(self, tmin):
914             self.tmin = tmin
915
916         def __enter__(self):
917             self.t0 = time.time()
918
919         def __exit__(self, *args, **kwargs):
920             delta = round(time.time() - self.t0, 3)
921             self.assertGreaterEqual(delta, self.tmin)
922
923     def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
924         return arvados.KeepClient(
925             api_client=self.api_client,
926             timeout=timeouts)
927
928     def test_timeout_slow_connect(self):
929         # Can't simulate TCP delays with our own socket. Leave our
930         # stub server running uselessly, and try to connect to an
931         # unroutable IP address instead.
932         self.api_client = self.mock_keep_services(
933             count=1,
934             service_host='240.0.0.0',
935         )
936         with self.assertTakesBetween(0.1, 0.5):
937             with self.assertRaises(arvados.errors.KeepWriteError):
938                 self.keepClient().put(self.DATA, copies=1, num_retries=0)
939
940     def test_low_bandwidth_no_delays_success(self):
941         self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
942         kc = self.keepClient()
943         loc = kc.put(self.DATA, copies=1, num_retries=0)
944         self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
945
946     def test_too_low_bandwidth_no_delays_failure(self):
947         # Check that lessening bandwidth corresponds to failing
948         kc = self.keepClient()
949         loc = kc.put(self.DATA, copies=1, num_retries=0)
950         self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
951         with self.assertTakesGreater(self.TIMEOUT_TIME):
952             with self.assertRaises(arvados.errors.KeepReadError):
953                 kc.get(loc, num_retries=0)
954         with self.assertTakesGreater(self.TIMEOUT_TIME):
955             with self.assertRaises(arvados.errors.KeepWriteError):
956                 kc.put(self.DATA, copies=1, num_retries=0)
957
958     def test_low_bandwidth_with_server_response_delay_failure(self):
959         kc = self.keepClient()
960         loc = kc.put(self.DATA, copies=1, num_retries=0)
961         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
962         # Note the actual delay must be 1s longer than the low speed
963         # limit interval in order for curl to detect it reliably.
964         self.server.setdelays(response=self.TIMEOUT_TIME+1)
965         with self.assertTakesGreater(self.TIMEOUT_TIME):
966             with self.assertRaises(arvados.errors.KeepReadError):
967                 kc.get(loc, num_retries=0)
968         with self.assertTakesGreater(self.TIMEOUT_TIME):
969             with self.assertRaises(arvados.errors.KeepWriteError):
970                 kc.put(self.DATA, copies=1, num_retries=0)
971         with self.assertTakesGreater(self.TIMEOUT_TIME):
972             kc.head(loc, num_retries=0)
973
974     def test_low_bandwidth_with_server_mid_delay_failure(self):
975         kc = self.keepClient()
976         loc = kc.put(self.DATA, copies=1, num_retries=0)
977         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
978         # Note the actual delay must be 1s longer than the low speed
979         # limit interval in order for curl to detect it reliably.
980         self.server.setdelays(mid_write=self.TIMEOUT_TIME+1, mid_read=self.TIMEOUT_TIME+1)
981         with self.assertTakesGreater(self.TIMEOUT_TIME):
982             with self.assertRaises(arvados.errors.KeepReadError) as e:
983                 kc.get(loc, num_retries=0)
984         with self.assertTakesGreater(self.TIMEOUT_TIME):
985             with self.assertRaises(arvados.errors.KeepWriteError):
986                 kc.put(self.DATA, copies=1, num_retries=0)
987
988     def test_timeout_slow_request(self):
989         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
990         self.server.setdelays(request=.2)
991         self._test_connect_timeout_under_200ms(loc)
992         self.server.setdelays(request=2)
993         self._test_response_timeout_under_2s(loc)
994
995     def test_timeout_slow_response(self):
996         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
997         self.server.setdelays(response=.2)
998         self._test_connect_timeout_under_200ms(loc)
999         self.server.setdelays(response=2)
1000         self._test_response_timeout_under_2s(loc)
1001
1002     def test_timeout_slow_response_body(self):
1003         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
1004         self.server.setdelays(response_body=.2)
1005         self._test_connect_timeout_under_200ms(loc)
1006         self.server.setdelays(response_body=2)
1007         self._test_response_timeout_under_2s(loc)
1008
1009     def _test_connect_timeout_under_200ms(self, loc):
1010         # Allow 100ms to connect, then 1s for response. Everything
1011         # should work, and everything should take at least 200ms to
1012         # return.
1013         kc = self.keepClient(timeouts=(.1, 1))
1014         with self.assertTakesBetween(.2, .3):
1015             kc.put(self.DATA, copies=1, num_retries=0)
1016         with self.assertTakesBetween(.2, .3):
1017             self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
1018
1019     def _test_response_timeout_under_2s(self, loc):
1020         # Allow 10s to connect, then 1s for response. Nothing should
1021         # work, and everything should take at least 1s to return.
1022         kc = self.keepClient(timeouts=(10, 1))
1023         with self.assertTakesBetween(1, 9):
1024             with self.assertRaises(arvados.errors.KeepReadError):
1025                 kc.get(loc, num_retries=0)
1026         with self.assertTakesBetween(1, 9):
1027             with self.assertRaises(arvados.errors.KeepWriteError):
1028                 kc.put(self.DATA, copies=1, num_retries=0)
1029
1030
1031 class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
1032     def mock_disks_and_gateways(self, disks=3, gateways=1):
1033         self.gateways = [{
1034                 'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
1035                 'owner_uuid': 'zzzzz-tpzed-000000000000000',
1036                 'service_host': 'gatewayhost{}'.format(i),
1037                 'service_port': 12345,
1038                 'service_ssl_flag': True,
1039                 'service_type': 'gateway:test',
1040         } for i in range(gateways)]
1041         self.gateway_roots = [
1042             "https://{service_host}:{service_port}/".format(**gw)
1043             for gw in self.gateways]
1044         self.api_client = self.mock_keep_services(
1045             count=disks, additional_services=self.gateways)
1046         self.keepClient = arvados.KeepClient(api_client=self.api_client)
1047
1048     @mock.patch('pycurl.Curl')
1049     def test_get_with_gateway_hint_first(self, MockCurl):
1050         MockCurl.return_value = tutil.FakeCurl.make(
1051             code=200, body='foo', headers={'Content-Length': 3})
1052         self.mock_disks_and_gateways()
1053         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@' + self.gateways[0]['uuid']
1054         self.assertEqual(b'foo', self.keepClient.get(locator))
1055         self.assertEqual(self.gateway_roots[0]+locator,
1056                          MockCurl.return_value.getopt(pycurl.URL).decode())
1057         self.assertEqual(True, self.keepClient.head(locator))
1058
1059     @mock.patch('pycurl.Curl')
1060     def test_get_with_gateway_hints_in_order(self, MockCurl):
1061         gateways = 4
1062         disks = 3
1063         mocks = [
1064             tutil.FakeCurl.make(code=404, body='')
1065             for _ in range(gateways+disks)
1066         ]
1067         MockCurl.side_effect = tutil.queue_with(mocks)
1068         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1069         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1070                            ['K@'+gw['uuid'] for gw in self.gateways])
1071         with self.assertRaises(arvados.errors.NotFoundError):
1072             self.keepClient.get(locator)
1073         # Gateways are tried first, in the order given.
1074         for i, root in enumerate(self.gateway_roots):
1075             self.assertEqual(root+locator,
1076                              mocks[i].getopt(pycurl.URL).decode())
1077         # Disk services are tried next.
1078         for i in range(gateways, gateways+disks):
1079             self.assertRegex(
1080                 mocks[i].getopt(pycurl.URL).decode(),
1081                 r'keep0x')
1082
1083     @mock.patch('pycurl.Curl')
1084     def test_head_with_gateway_hints_in_order(self, MockCurl):
1085         gateways = 4
1086         disks = 3
1087         mocks = [
1088             tutil.FakeCurl.make(code=404, body=b'')
1089             for _ in range(gateways+disks)
1090         ]
1091         MockCurl.side_effect = tutil.queue_with(mocks)
1092         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1093         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1094                            ['K@'+gw['uuid'] for gw in self.gateways])
1095         with self.assertRaises(arvados.errors.NotFoundError):
1096             self.keepClient.head(locator)
1097         # Gateways are tried first, in the order given.
1098         for i, root in enumerate(self.gateway_roots):
1099             self.assertEqual(root+locator,
1100                              mocks[i].getopt(pycurl.URL).decode())
1101         # Disk services are tried next.
1102         for i in range(gateways, gateways+disks):
1103             self.assertRegex(
1104                 mocks[i].getopt(pycurl.URL).decode(),
1105                 r'keep0x')
1106
1107     @mock.patch('pycurl.Curl')
1108     def test_get_with_remote_proxy_hint(self, MockCurl):
1109         MockCurl.return_value = tutil.FakeCurl.make(
1110             code=200, body=b'foo', headers={'Content-Length': 3})
1111         self.mock_disks_and_gateways()
1112         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1113         self.assertEqual(b'foo', self.keepClient.get(locator))
1114         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1115                          MockCurl.return_value.getopt(pycurl.URL).decode())
1116
1117     @mock.patch('pycurl.Curl')
1118     def test_head_with_remote_proxy_hint(self, MockCurl):
1119         MockCurl.return_value = tutil.FakeCurl.make(
1120             code=200, body=b'foo', headers={'Content-Length': 3})
1121         self.mock_disks_and_gateways()
1122         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1123         self.assertEqual(True, self.keepClient.head(locator))
1124         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1125                          MockCurl.return_value.getopt(pycurl.URL).decode())
1126
1127
1128 class KeepClientRetryTestMixin(object):
1129     # Testing with a local Keep store won't exercise the retry behavior.
1130     # Instead, our strategy is:
1131     # * Create a client with one proxy specified (pointed at a black
1132     #   hole), so there's no need to instantiate an API client, and
1133     #   all HTTP requests come from one place.
1134     # * Mock httplib's request method to provide simulated responses.
1135     # This lets us test the retry logic extensively without relying on any
1136     # supporting servers, and prevents side effects in case something hiccups.
1137     # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
1138     # run_method().
1139     #
1140     # Test classes must define TEST_PATCHER to a method that mocks
1141     # out appropriate methods in the client.
1142
1143     PROXY_ADDR = 'http://[%s]:65535/' % (tutil.TEST_HOST,)
1144     TEST_DATA = b'testdata'
1145     TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
1146
1147     def setUp(self):
1148         self.client_kwargs = {'proxy': self.PROXY_ADDR, 'local_store': ''}
1149
1150     def new_client(self, **caller_kwargs):
1151         kwargs = self.client_kwargs.copy()
1152         kwargs.update(caller_kwargs)
1153         return arvados.KeepClient(**kwargs)
1154
1155     def run_method(self, *args, **kwargs):
1156         raise NotImplementedError("test subclasses must define run_method")
1157
1158     def check_success(self, expected=None, *args, **kwargs):
1159         if expected is None:
1160             expected = self.DEFAULT_EXPECT
1161         self.assertEqual(expected, self.run_method(*args, **kwargs))
1162
1163     def check_exception(self, error_class=None, *args, **kwargs):
1164         if error_class is None:
1165             error_class = self.DEFAULT_EXCEPTION
1166         with self.assertRaises(error_class) as err:
1167             self.run_method(*args, **kwargs)
1168         return err
1169
1170     def test_immediate_success(self):
1171         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 200):
1172             self.check_success()
1173
1174     def test_retry_then_success(self):
1175         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1176             self.check_success(num_retries=3)
1177
1178     def test_exception_then_success(self):
1179         with self.TEST_PATCHER(self.DEFAULT_EXPECT, Exception('mock err'), 200):
1180             self.check_success(num_retries=3)
1181
1182     def test_no_default_retry(self):
1183         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1184             self.check_exception()
1185
1186     def test_no_retry_after_permanent_error(self):
1187         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 403, 200):
1188             self.check_exception(num_retries=3)
1189
1190     def test_error_after_retries_exhausted(self):
1191         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 500, 200):
1192             err = self.check_exception(num_retries=1)
1193         self.assertRegex(str(err.exception), r'failed to .* after 2 attempts')
1194
1195     def test_num_retries_instance_fallback(self):
1196         self.client_kwargs['num_retries'] = 3
1197         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1198             self.check_success()
1199
1200
1201 @tutil.skip_sleep
1202 class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1203     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
1204     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1205     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1206     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1207
1208     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1209                    *args, **kwargs):
1210         return self.new_client().get(locator, *args, **kwargs)
1211
1212     def test_specific_exception_when_not_found(self):
1213         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1214             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1215
1216     def test_general_exception_with_mixed_errors(self):
1217         # get should raise a NotFoundError if no server returns the block,
1218         # and a high threshold of servers report that it's not found.
1219         # This test rigs up 50/50 disagreement between two servers, and
1220         # checks that it does not become a NotFoundError.
1221         client = self.new_client()
1222         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1223             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1224                 client.get(self.HINTED_LOCATOR)
1225             self.assertNotIsInstance(
1226                 exc_check.exception, arvados.errors.NotFoundError,
1227                 "mixed errors raised NotFoundError")
1228
1229     def test_hint_server_can_succeed_without_retries(self):
1230         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1231             self.check_success(locator=self.HINTED_LOCATOR)
1232
1233     def test_try_next_server_after_timeout(self):
1234         with tutil.mock_keep_responses(
1235                 (socket.timeout("timed out"), 200),
1236                 (self.DEFAULT_EXPECT, 200)):
1237             self.check_success(locator=self.HINTED_LOCATOR)
1238
1239     def test_retry_data_with_wrong_checksum(self):
1240         with tutil.mock_keep_responses(
1241                 ('baddata', 200),
1242                 (self.DEFAULT_EXPECT, 200)):
1243             self.check_success(locator=self.HINTED_LOCATOR)
1244
1245 @tutil.skip_sleep
1246 class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1247     DEFAULT_EXPECT = True
1248     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1249     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1250     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1251
1252     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1253                    *args, **kwargs):
1254         return self.new_client().head(locator, *args, **kwargs)
1255
1256     def test_specific_exception_when_not_found(self):
1257         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1258             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1259
1260     def test_general_exception_with_mixed_errors(self):
1261         # head should raise a NotFoundError if no server returns the block,
1262         # and a high threshold of servers report that it's not found.
1263         # This test rigs up 50/50 disagreement between two servers, and
1264         # checks that it does not become a NotFoundError.
1265         client = self.new_client()
1266         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1267             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1268                 client.head(self.HINTED_LOCATOR)
1269             self.assertNotIsInstance(
1270                 exc_check.exception, arvados.errors.NotFoundError,
1271                 "mixed errors raised NotFoundError")
1272
1273     def test_hint_server_can_succeed_without_retries(self):
1274         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1275             self.check_success(locator=self.HINTED_LOCATOR)
1276
1277     def test_try_next_server_after_timeout(self):
1278         with tutil.mock_keep_responses(
1279                 (socket.timeout("timed out"), 200),
1280                 (self.DEFAULT_EXPECT, 200)):
1281             self.check_success(locator=self.HINTED_LOCATOR)
1282
1283 @tutil.skip_sleep
1284 class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1285     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
1286     DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
1287     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1288
1289     def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
1290                    copies=1, *args, **kwargs):
1291         return self.new_client().put(data, copies, *args, **kwargs)
1292
1293     def test_do_not_send_multiple_copies_to_same_server(self):
1294         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200):
1295             self.check_exception(copies=2, num_retries=3)
1296
1297
1298 class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
1299
1300     class FakeKeepService(object):
1301         def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
1302             self.delay = delay
1303             self.will_succeed = will_succeed
1304             self.will_raise = will_raise
1305             self._result = {}
1306             self._result['headers'] = {}
1307             self._result['headers']['x-keep-replicas-stored'] = str(replicas)
1308             self._result['headers']['x-keep-storage-classes-confirmed'] = 'default={}'.format(replicas)
1309             self._result['body'] = 'foobar'
1310
1311         def put(self, data_hash, data, timeout, headers):
1312             time.sleep(self.delay)
1313             if self.will_raise is not None:
1314                 raise self.will_raise
1315             return self.will_succeed
1316
1317         def last_result(self):
1318             if self.will_succeed:
1319                 return self._result
1320             else:
1321                 return {"status_code": 500, "body": "didn't succeed"}
1322
1323         def finished(self):
1324             return False
1325
1326     def setUp(self):
1327         self.copies = 3
1328         self.pool = arvados.KeepClient.KeepWriterThreadPool(
1329             data = 'foo',
1330             data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1331             max_service_replicas = self.copies,
1332             copies = self.copies
1333         )
1334
1335     def test_only_write_enough_on_success(self):
1336         for i in range(10):
1337             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1338             self.pool.add_task(ks, None)
1339         self.pool.join()
1340         self.assertEqual(self.pool.done(), (self.copies, []))
1341
1342     def test_only_write_enough_on_partial_success(self):
1343         for i in range(5):
1344             ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
1345             self.pool.add_task(ks, None)
1346             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1347             self.pool.add_task(ks, None)
1348         self.pool.join()
1349         self.assertEqual(self.pool.done(), (self.copies, []))
1350
1351     def test_only_write_enough_when_some_crash(self):
1352         for i in range(5):
1353             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1354             self.pool.add_task(ks, None)
1355             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1356             self.pool.add_task(ks, None)
1357         self.pool.join()
1358         self.assertEqual(self.pool.done(), (self.copies, []))
1359
1360     def test_fail_when_too_many_crash(self):
1361         for i in range(self.copies+1):
1362             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1363             self.pool.add_task(ks, None)
1364         for i in range(self.copies-1):
1365             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1366             self.pool.add_task(ks, None)
1367         self.pool.join()
1368         self.assertEqual(self.pool.done(), (self.copies-1, []))
1369
1370
1371 @tutil.skip_sleep
1372 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock):
1373     # Test put()s that need two distinct servers to succeed, possibly
1374     # requiring multiple passes through the retry loop.
1375
1376     def setUp(self):
1377         self.api_client = self.mock_keep_services(count=2)
1378         self.keep_client = arvados.KeepClient(api_client=self.api_client)
1379
1380     def test_success_after_exception(self):
1381         with tutil.mock_keep_responses(
1382                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1383                 Exception('mock err'), 200, 200) as req_mock:
1384             self.keep_client.put('foo', num_retries=1, copies=2)
1385         self.assertEqual(3, req_mock.call_count)
1386
1387     def test_success_after_retryable_error(self):
1388         with tutil.mock_keep_responses(
1389                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1390                 500, 200, 200) as req_mock:
1391             self.keep_client.put('foo', num_retries=1, copies=2)
1392         self.assertEqual(3, req_mock.call_count)
1393
1394     def test_fail_after_final_error(self):
1395         # First retry loop gets a 200 (can't achieve replication by
1396         # storing again on that server) and a 400 (can't retry that
1397         # server at all), so we shouldn't try a third request.
1398         with tutil.mock_keep_responses(
1399                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1400                 200, 400, 200) as req_mock:
1401             with self.assertRaises(arvados.errors.KeepWriteError):
1402                 self.keep_client.put('foo', num_retries=1, copies=2)
1403         self.assertEqual(2, req_mock.call_count)
1404
1405 class KeepClientAPIErrorTest(unittest.TestCase):
1406     def test_api_fail(self):
1407         class ApiMock(object):
1408             def __getattr__(self, r):
1409                 if r == "api_token":
1410                     return "abc"
1411                 elif r == "insecure":
1412                     return False
1413                 elif r == "config":
1414                     return lambda: {}
1415                 else:
1416                     raise arvados.errors.KeepReadError()
1417         keep_client = arvados.KeepClient(api_client=ApiMock(),
1418                                              proxy='', local_store='')
1419
1420         # The bug this is testing for is that if an API (not
1421         # keepstore) exception is thrown as part of a get(), the next
1422         # attempt to get that same block will result in a deadlock.
1423         # This is why there are two get()s in a row.  Unfortunately,
1424         # the failure mode for this test is that the test suite
1425         # deadlocks, there isn't a good way to avoid that without
1426         # adding a special case that has no use except for this test.
1427
1428         with self.assertRaises(arvados.errors.KeepReadError):
1429             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1430         with self.assertRaises(arvados.errors.KeepReadError):
1431             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")