17755: Merge branch 'main' into 17755-add-singularity-to-compute-image
[arvados.git] / sdk / python / tests / test_keep_client.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 standard_library.install_aliases()
9 from builtins import str
10 from builtins import range
11 from builtins import object
12 import hashlib
13 import mock
14 import os
15 import pycurl
16 import random
17 import re
18 import socket
19 import sys
20 import time
21 import unittest
22 import urllib.parse
23
24 import arvados
25 import arvados.retry
26 import arvados.util
27 from . import arvados_testutil as tutil
28 from . import keepstub
29 from . import run_test_server
30
31 class KeepTestCase(run_test_server.TestCaseWithServers):
32     MAIN_SERVER = {}
33     KEEP_SERVER = {}
34
35     @classmethod
36     def setUpClass(cls):
37         super(KeepTestCase, cls).setUpClass()
38         run_test_server.authorize_with("admin")
39         cls.api_client = arvados.api('v1')
40         cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
41                                              proxy='', local_store='')
42
43     def test_KeepBasicRWTest(self):
44         self.assertEqual(0, self.keep_client.upload_counter.get())
45         foo_locator = self.keep_client.put('foo')
46         self.assertRegex(
47             foo_locator,
48             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
49             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
50
51         # 6 bytes because uploaded 2 copies
52         self.assertEqual(6, self.keep_client.upload_counter.get())
53
54         self.assertEqual(0, self.keep_client.download_counter.get())
55         self.assertEqual(self.keep_client.get(foo_locator),
56                          b'foo',
57                          'wrong content from Keep.get(md5("foo"))')
58         self.assertEqual(3, self.keep_client.download_counter.get())
59
60     def test_KeepBinaryRWTest(self):
61         blob_str = b'\xff\xfe\xf7\x00\x01\x02'
62         blob_locator = self.keep_client.put(blob_str)
63         self.assertRegex(
64             blob_locator,
65             '^7fc7c53b45e53926ba52821140fef396\+6',
66             ('wrong locator from Keep.put(<binarydata>):' + blob_locator))
67         self.assertEqual(self.keep_client.get(blob_locator),
68                          blob_str,
69                          'wrong content from Keep.get(md5(<binarydata>))')
70
71     def test_KeepLongBinaryRWTest(self):
72         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
73         for i in range(0, 23):
74             blob_data = blob_data + blob_data
75         blob_locator = self.keep_client.put(blob_data)
76         self.assertRegex(
77             blob_locator,
78             '^84d90fc0d8175dd5dcfab04b999bc956\+67108864',
79             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
80         self.assertEqual(self.keep_client.get(blob_locator),
81                          blob_data,
82                          'wrong content from Keep.get(md5(<binarydata>))')
83
84     @unittest.skip("unreliable test - please fix and close #8752")
85     def test_KeepSingleCopyRWTest(self):
86         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
87         blob_locator = self.keep_client.put(blob_data, copies=1)
88         self.assertRegex(
89             blob_locator,
90             '^c902006bc98a3eb4a3663b65ab4a6fab\+8',
91             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
92         self.assertEqual(self.keep_client.get(blob_locator),
93                          blob_data,
94                          'wrong content from Keep.get(md5(<binarydata>))')
95
96     def test_KeepEmptyCollectionTest(self):
97         blob_locator = self.keep_client.put('', copies=1)
98         self.assertRegex(
99             blob_locator,
100             '^d41d8cd98f00b204e9800998ecf8427e\+0',
101             ('wrong locator from Keep.put(""): ' + blob_locator))
102
103     def test_unicode_must_be_ascii(self):
104         # If unicode type, must only consist of valid ASCII
105         foo_locator = self.keep_client.put(u'foo')
106         self.assertRegex(
107             foo_locator,
108             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
109             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
110
111         if sys.version_info < (3, 0):
112             with self.assertRaises(UnicodeEncodeError):
113                 # Error if it is not ASCII
114                 self.keep_client.put(u'\xe2')
115
116         with self.assertRaises(AttributeError):
117             # Must be bytes or have an encode() method
118             self.keep_client.put({})
119
120     def test_KeepHeadTest(self):
121         locator = self.keep_client.put('test_head')
122         self.assertRegex(
123             locator,
124             '^b9a772c7049325feb7130fff1f8333e9\+9',
125             'wrong md5 hash from Keep.put for "test_head": ' + locator)
126         self.assertEqual(True, self.keep_client.head(locator))
127         self.assertEqual(self.keep_client.get(locator),
128                          b'test_head',
129                          'wrong content from Keep.get for "test_head"')
130
131 class KeepPermissionTestCase(run_test_server.TestCaseWithServers):
132     MAIN_SERVER = {}
133     KEEP_SERVER = {'blob_signing': True}
134
135     def test_KeepBasicRWTest(self):
136         run_test_server.authorize_with('active')
137         keep_client = arvados.KeepClient()
138         foo_locator = keep_client.put('foo')
139         self.assertRegex(
140             foo_locator,
141             r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
142             'invalid locator from Keep.put("foo"): ' + foo_locator)
143         self.assertEqual(keep_client.get(foo_locator),
144                          b'foo',
145                          'wrong content from Keep.get(md5("foo"))')
146
147         # GET with an unsigned locator => NotFound
148         bar_locator = keep_client.put('bar')
149         unsigned_bar_locator = "37b51d194a7513e45b56f6524f2d51f2+3"
150         self.assertRegex(
151             bar_locator,
152             r'^37b51d194a7513e45b56f6524f2d51f2\+3\+A[a-f0-9]+@[a-f0-9]+$',
153             'invalid locator from Keep.put("bar"): ' + bar_locator)
154         self.assertRaises(arvados.errors.NotFoundError,
155                           keep_client.get,
156                           unsigned_bar_locator)
157
158         # GET from a different user => NotFound
159         run_test_server.authorize_with('spectator')
160         self.assertRaises(arvados.errors.NotFoundError,
161                           arvados.Keep.get,
162                           bar_locator)
163
164         # Unauthenticated GET for a signed locator => NotFound
165         # Unauthenticated GET for an unsigned locator => NotFound
166         keep_client.api_token = ''
167         self.assertRaises(arvados.errors.NotFoundError,
168                           keep_client.get,
169                           bar_locator)
170         self.assertRaises(arvados.errors.NotFoundError,
171                           keep_client.get,
172                           unsigned_bar_locator)
173
174
175 class KeepProxyTestCase(run_test_server.TestCaseWithServers):
176     MAIN_SERVER = {}
177     KEEP_SERVER = {}
178     KEEP_PROXY_SERVER = {}
179
180     @classmethod
181     def setUpClass(cls):
182         super(KeepProxyTestCase, cls).setUpClass()
183         run_test_server.authorize_with('active')
184         cls.api_client = arvados.api('v1')
185
186     def tearDown(self):
187         arvados.config.settings().pop('ARVADOS_EXTERNAL_CLIENT', None)
188         super(KeepProxyTestCase, self).tearDown()
189
190     def test_KeepProxyTest1(self):
191         # Will use ARVADOS_KEEP_SERVICES environment variable that
192         # is set by setUpClass().
193         keep_client = arvados.KeepClient(api_client=self.api_client,
194                                          local_store='')
195         baz_locator = keep_client.put('baz')
196         self.assertRegex(
197             baz_locator,
198             '^73feffa4b7f6bb68e44cf984c85f6e88\+3',
199             'wrong md5 hash from Keep.put("baz"): ' + baz_locator)
200         self.assertEqual(keep_client.get(baz_locator),
201                          b'baz',
202                          'wrong content from Keep.get(md5("baz"))')
203         self.assertTrue(keep_client.using_proxy)
204
205     def test_KeepProxyTest2(self):
206         # Don't instantiate the proxy directly, but set the X-External-Client
207         # header.  The API server should direct us to the proxy.
208         arvados.config.settings()['ARVADOS_EXTERNAL_CLIENT'] = 'true'
209         keep_client = arvados.KeepClient(api_client=self.api_client,
210                                          proxy='', local_store='')
211         baz_locator = keep_client.put('baz2')
212         self.assertRegex(
213             baz_locator,
214             '^91f372a266fe2bf2823cb8ec7fda31ce\+4',
215             'wrong md5 hash from Keep.put("baz2"): ' + baz_locator)
216         self.assertEqual(keep_client.get(baz_locator),
217                          b'baz2',
218                          'wrong content from Keep.get(md5("baz2"))')
219         self.assertTrue(keep_client.using_proxy)
220
221     def test_KeepProxyTestMultipleURIs(self):
222         # Test using ARVADOS_KEEP_SERVICES env var overriding any
223         # existing proxy setting and setting multiple proxies
224         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
225         keep_client = arvados.KeepClient(api_client=self.api_client,
226                                          local_store='')
227         uris = [x['_service_root'] for x in keep_client._keep_services]
228         self.assertEqual(uris, ['http://10.0.0.1/',
229                                 'https://foo.example.org:1234/'])
230
231     def test_KeepProxyTestInvalidURI(self):
232         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org'
233         with self.assertRaises(arvados.errors.ArgumentError):
234             keep_client = arvados.KeepClient(api_client=self.api_client,
235                                              local_store='')
236
237
238 class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
239     def get_service_roots(self, api_client):
240         keep_client = arvados.KeepClient(api_client=api_client)
241         services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
242         return [urllib.parse.urlparse(url) for url in sorted(services)]
243
244     def test_ssl_flag_respected_in_roots(self):
245         for ssl_flag in [False, True]:
246             services = self.get_service_roots(self.mock_keep_services(
247                 service_ssl_flag=ssl_flag))
248             self.assertEqual(
249                 ('https' if ssl_flag else 'http'), services[0].scheme)
250
251     def test_correct_ports_with_ipv6_addresses(self):
252         service = self.get_service_roots(self.mock_keep_services(
253             service_type='proxy', service_host='100::1', service_port=10, count=1))[0]
254         self.assertEqual('100::1', service.hostname)
255         self.assertEqual(10, service.port)
256
257     def test_insecure_disables_tls_verify(self):
258         api_client = self.mock_keep_services(count=1)
259         force_timeout = socket.timeout("timed out")
260
261         api_client.insecure = True
262         with tutil.mock_keep_responses(b'foo', 200) as mock:
263             keep_client = arvados.KeepClient(api_client=api_client)
264             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
265             self.assertEqual(
266                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
267                 0)
268
269         api_client.insecure = False
270         with tutil.mock_keep_responses(b'foo', 200) as mock:
271             keep_client = arvados.KeepClient(api_client=api_client)
272             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
273             # getopt()==None here means we didn't change the
274             # default. If we were using real pycurl instead of a mock,
275             # it would return the default value 1.
276             self.assertEqual(
277                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
278                 None)
279
280     def test_refresh_signature(self):
281         blk_digest = '6f5902ac237024bdd0c176cb93063dc4+11'
282         blk_sig = 'da39a3ee5e6b4b0d3255bfef95601890afd80709@53bed294'
283         local_loc = blk_digest+'+A'+blk_sig
284         remote_loc = blk_digest+'+R'+blk_sig
285         api_client = self.mock_keep_services(count=1)
286         headers = {'X-Keep-Locator':local_loc}
287         with tutil.mock_keep_responses('', 200, **headers):
288             # Check that the translated locator gets returned
289             keep_client = arvados.KeepClient(api_client=api_client)
290             self.assertEqual(local_loc, keep_client.refresh_signature(remote_loc))
291             # Check that refresh_signature() uses the correct method and headers
292             keep_client._get_or_head = mock.MagicMock()
293             keep_client.refresh_signature(remote_loc)
294             args, kwargs = keep_client._get_or_head.call_args_list[0]
295             self.assertIn(remote_loc, args)
296             self.assertEqual("HEAD", kwargs['method'])
297             self.assertIn('X-Keep-Signature', kwargs['headers'])
298
299     # test_*_timeout verify that KeepClient instructs pycurl to use
300     # the appropriate connection and read timeouts. They don't care
301     # whether pycurl actually exhibits the expected timeout behavior
302     # -- those tests are in the KeepClientTimeout test class.
303
304     def test_get_timeout(self):
305         api_client = self.mock_keep_services(count=1)
306         force_timeout = socket.timeout("timed out")
307         with tutil.mock_keep_responses(force_timeout, 0) as mock:
308             keep_client = arvados.KeepClient(api_client=api_client)
309             with self.assertRaises(arvados.errors.KeepReadError):
310                 keep_client.get('ffffffffffffffffffffffffffffffff')
311             self.assertEqual(
312                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
313                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
314             self.assertEqual(
315                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
316                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
317             self.assertEqual(
318                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
319                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
320
321     def test_put_timeout(self):
322         api_client = self.mock_keep_services(count=1)
323         force_timeout = socket.timeout("timed out")
324         with tutil.mock_keep_responses(force_timeout, 0) as mock:
325             keep_client = arvados.KeepClient(api_client=api_client)
326             with self.assertRaises(arvados.errors.KeepWriteError):
327                 keep_client.put(b'foo')
328             self.assertEqual(
329                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
330                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
331             self.assertEqual(
332                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
333                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
334             self.assertEqual(
335                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
336                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
337
338     def test_head_timeout(self):
339         api_client = self.mock_keep_services(count=1)
340         force_timeout = socket.timeout("timed out")
341         with tutil.mock_keep_responses(force_timeout, 0) as mock:
342             keep_client = arvados.KeepClient(api_client=api_client)
343             with self.assertRaises(arvados.errors.KeepReadError):
344                 keep_client.head('ffffffffffffffffffffffffffffffff')
345             self.assertEqual(
346                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
347                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
348             self.assertEqual(
349                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
350                 None)
351             self.assertEqual(
352                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
353                 None)
354
355     def test_proxy_get_timeout(self):
356         api_client = self.mock_keep_services(service_type='proxy', count=1)
357         force_timeout = socket.timeout("timed out")
358         with tutil.mock_keep_responses(force_timeout, 0) as mock:
359             keep_client = arvados.KeepClient(api_client=api_client)
360             with self.assertRaises(arvados.errors.KeepReadError):
361                 keep_client.get('ffffffffffffffffffffffffffffffff')
362             self.assertEqual(
363                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
364                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
365             self.assertEqual(
366                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
367                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
368             self.assertEqual(
369                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
370                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
371
372     def test_proxy_head_timeout(self):
373         api_client = self.mock_keep_services(service_type='proxy', count=1)
374         force_timeout = socket.timeout("timed out")
375         with tutil.mock_keep_responses(force_timeout, 0) as mock:
376             keep_client = arvados.KeepClient(api_client=api_client)
377             with self.assertRaises(arvados.errors.KeepReadError):
378                 keep_client.head('ffffffffffffffffffffffffffffffff')
379             self.assertEqual(
380                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
381                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
382             self.assertEqual(
383                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
384                 None)
385             self.assertEqual(
386                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
387                 None)
388
389     def test_proxy_put_timeout(self):
390         api_client = self.mock_keep_services(service_type='proxy', count=1)
391         force_timeout = socket.timeout("timed out")
392         with tutil.mock_keep_responses(force_timeout, 0) as mock:
393             keep_client = arvados.KeepClient(api_client=api_client)
394             with self.assertRaises(arvados.errors.KeepWriteError):
395                 keep_client.put('foo')
396             self.assertEqual(
397                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
398                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
399             self.assertEqual(
400                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
401                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
402             self.assertEqual(
403                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
404                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
405
406     def check_no_services_error(self, verb, exc_class):
407         api_client = mock.MagicMock(name='api_client')
408         api_client.keep_services().accessible().execute.side_effect = (
409             arvados.errors.ApiError)
410         keep_client = arvados.KeepClient(api_client=api_client)
411         with self.assertRaises(exc_class) as err_check:
412             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
413         self.assertEqual(0, len(err_check.exception.request_errors()))
414
415     def test_get_error_with_no_services(self):
416         self.check_no_services_error('get', arvados.errors.KeepReadError)
417
418     def test_head_error_with_no_services(self):
419         self.check_no_services_error('head', arvados.errors.KeepReadError)
420
421     def test_put_error_with_no_services(self):
422         self.check_no_services_error('put', arvados.errors.KeepWriteError)
423
424     def check_errors_from_last_retry(self, verb, exc_class):
425         api_client = self.mock_keep_services(count=2)
426         req_mock = tutil.mock_keep_responses(
427             "retry error reporting test", 500, 500, 500, 500, 500, 500, 502, 502)
428         with req_mock, tutil.skip_sleep, \
429                 self.assertRaises(exc_class) as err_check:
430             keep_client = arvados.KeepClient(api_client=api_client)
431             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
432                                        num_retries=3)
433         self.assertEqual([502, 502], [
434                 getattr(error, 'status_code', None)
435                 for error in err_check.exception.request_errors().values()])
436         self.assertRegex(str(err_check.exception), r'failed to (read|write) .* after 4 attempts')
437
438     def test_get_error_reflects_last_retry(self):
439         self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
440
441     def test_head_error_reflects_last_retry(self):
442         self.check_errors_from_last_retry('head', arvados.errors.KeepReadError)
443
444     def test_put_error_reflects_last_retry(self):
445         self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
446
447     def test_put_error_does_not_include_successful_puts(self):
448         data = 'partial failure test'
449         data_loc = tutil.str_keep_locator(data)
450         api_client = self.mock_keep_services(count=3)
451         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
452                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
453             keep_client = arvados.KeepClient(api_client=api_client)
454             keep_client.put(data)
455         self.assertEqual(2, len(exc_check.exception.request_errors()))
456
457     def test_proxy_put_with_no_writable_services(self):
458         data = 'test with no writable services'
459         data_loc = tutil.str_keep_locator(data)
460         api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
461         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
462                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
463           keep_client = arvados.KeepClient(api_client=api_client)
464           keep_client.put(data)
465         self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
466         self.assertEqual(0, len(exc_check.exception.request_errors()))
467
468     def test_oddball_service_get(self):
469         body = b'oddball service get'
470         api_client = self.mock_keep_services(service_type='fancynewblobstore')
471         with tutil.mock_keep_responses(body, 200):
472             keep_client = arvados.KeepClient(api_client=api_client)
473             actual = keep_client.get(tutil.str_keep_locator(body))
474         self.assertEqual(body, actual)
475
476     def test_oddball_service_put(self):
477         body = b'oddball service put'
478         pdh = tutil.str_keep_locator(body)
479         api_client = self.mock_keep_services(service_type='fancynewblobstore')
480         with tutil.mock_keep_responses(pdh, 200):
481             keep_client = arvados.KeepClient(api_client=api_client)
482             actual = keep_client.put(body, copies=1)
483         self.assertEqual(pdh, actual)
484
485     def test_oddball_service_writer_count(self):
486         body = b'oddball service writer count'
487         pdh = tutil.str_keep_locator(body)
488         api_client = self.mock_keep_services(service_type='fancynewblobstore',
489                                              count=4)
490         headers = {'x-keep-replicas-stored': 3}
491         with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
492                                        **headers) as req_mock:
493             keep_client = arvados.KeepClient(api_client=api_client)
494             actual = keep_client.put(body, copies=2)
495         self.assertEqual(pdh, actual)
496         self.assertEqual(1, req_mock.call_count)
497
498
499 @tutil.skip_sleep
500 class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
501     def setUp(self):
502         self.api_client = self.mock_keep_services(count=2)
503         self.keep_client = arvados.KeepClient(api_client=self.api_client)
504         self.data = b'xyzzy'
505         self.locator = '1271ed5ef305aadabc605b1609e24c52'
506
507     @mock.patch('arvados.KeepClient.KeepService.get')
508     def test_get_request_cache(self, get_mock):
509         with tutil.mock_keep_responses(self.data, 200, 200):
510             self.keep_client.get(self.locator)
511             self.keep_client.get(self.locator)
512         # Request already cached, don't require more than one request
513         get_mock.assert_called_once()
514
515     @mock.patch('arvados.KeepClient.KeepService.get')
516     def test_head_request_cache(self, get_mock):
517         with tutil.mock_keep_responses(self.data, 200, 200):
518             self.keep_client.head(self.locator)
519             self.keep_client.head(self.locator)
520         # Don't cache HEAD requests so that they're not confused with GET reqs
521         self.assertEqual(2, get_mock.call_count)
522
523     @mock.patch('arvados.KeepClient.KeepService.get')
524     def test_head_and_then_get_return_different_responses(self, get_mock):
525         head_resp = None
526         get_resp = None
527         get_mock.side_effect = ['first response', 'second response']
528         with tutil.mock_keep_responses(self.data, 200, 200):
529             head_resp = self.keep_client.head(self.locator)
530             get_resp = self.keep_client.get(self.locator)
531         self.assertEqual('first response', head_resp)
532         # First reponse was not cached because it was from a HEAD request.
533         self.assertNotEqual(head_resp, get_resp)
534
535 @tutil.skip_sleep
536 class KeepStorageClassesTestCase(unittest.TestCase, tutil.ApiClientMock):
537     def setUp(self):
538         self.api_client = self.mock_keep_services(count=2)
539         self.keep_client = arvados.KeepClient(api_client=self.api_client)
540         self.data = b'xyzzy'
541         self.locator = '1271ed5ef305aadabc605b1609e24c52'
542
543     def test_multiple_default_storage_classes_req_header(self):
544         api_mock = self.api_client_mock()
545         api_mock.config.return_value = {
546             'StorageClasses': {
547                 'foo': { 'Default': True },
548                 'bar': { 'Default': True },
549                 'baz': { 'Default': False }
550             }
551         }
552         api_client = self.mock_keep_services(api_mock=api_mock, count=2)
553         keep_client = arvados.KeepClient(api_client=api_client)
554         resp_hdr = {
555             'x-keep-storage-classes-confirmed': 'foo=1, bar=1',
556             'x-keep-replicas-stored': 1
557         }
558         with tutil.mock_keep_responses(self.locator, 200, **resp_hdr) as mock:
559             keep_client.put(self.data, copies=1)
560             req_hdr = mock.responses[0]
561             self.assertIn(
562                 'X-Keep-Storage-Classes: bar, foo', req_hdr.getopt(pycurl.HTTPHEADER))
563
564     def test_storage_classes_req_header(self):
565         self.assertEqual(
566             self.api_client.config()['StorageClasses'],
567             {'default': {'Default': True}})
568         cases = [
569             # requested, expected
570             [['foo'], 'X-Keep-Storage-Classes: foo'],
571             [['bar', 'foo'], 'X-Keep-Storage-Classes: bar, foo'],
572             [[], 'X-Keep-Storage-Classes: default'],
573             [None, 'X-Keep-Storage-Classes: default'],
574         ]
575         for req_classes, expected_header in cases:
576             headers = {'x-keep-replicas-stored': 1}
577             if req_classes is None or len(req_classes) == 0:
578                 confirmed_hdr = 'default=1'
579             elif len(req_classes) > 0:
580                 confirmed_hdr = ', '.join(["{}=1".format(cls) for cls in req_classes])
581             headers.update({'x-keep-storage-classes-confirmed': confirmed_hdr})
582             with tutil.mock_keep_responses(self.locator, 200, **headers) as mock:
583                 self.keep_client.put(self.data, copies=1, classes=req_classes)
584                 req_hdr = mock.responses[0]
585                 self.assertIn(expected_header, req_hdr.getopt(pycurl.HTTPHEADER))
586
587     def test_partial_storage_classes_put(self):
588         headers = {
589             'x-keep-replicas-stored': 1,
590             'x-keep-storage-classes-confirmed': 'foo=1'}
591         with tutil.mock_keep_responses(self.locator, 200, 503, **headers) as mock:
592             with self.assertRaises(arvados.errors.KeepWriteError):
593                 self.keep_client.put(self.data, copies=1, classes=['foo', 'bar'])
594             # 1st request, both classes pending
595             req1_headers = mock.responses[0].getopt(pycurl.HTTPHEADER)
596             self.assertIn('X-Keep-Storage-Classes: bar, foo', req1_headers)
597             # 2nd try, 'foo' class already satisfied
598             req2_headers = mock.responses[1].getopt(pycurl.HTTPHEADER)
599             self.assertIn('X-Keep-Storage-Classes: bar', req2_headers)
600
601     def test_successful_storage_classes_put_requests(self):
602         cases = [
603             # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, expected_requests
604             [ 1, ['foo'], 1, 'foo=1', 1],
605             [ 1, ['foo'], 2, 'foo=2', 1],
606             [ 2, ['foo'], 2, 'foo=2', 1],
607             [ 2, ['foo'], 1, 'foo=1', 2],
608             [ 1, ['foo', 'bar'], 1, 'foo=1, bar=1', 1],
609             [ 1, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
610             [ 2, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
611             [ 2, ['foo', 'bar'], 1, 'foo=1, bar=1', 2],
612             [ 1, ['foo', 'bar'], 1, None, 1],
613             [ 1, ['foo'], 1, None, 1],
614             [ 2, ['foo'], 2, None, 1],
615             [ 2, ['foo'], 1, None, 2],
616         ]
617         for w_copies, w_classes, c_copies, c_classes, e_reqs in cases:
618             headers = {'x-keep-replicas-stored': c_copies}
619             if c_classes is not None:
620                 headers.update({'x-keep-storage-classes-confirmed': c_classes})
621             with tutil.mock_keep_responses(self.locator, 200, 200, **headers) as mock:
622                 case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}", expected_requests={}'.format(w_copies, ', '.join(w_classes), c_copies, c_classes, e_reqs)
623                 self.assertEqual(self.locator,
624                     self.keep_client.put(self.data, copies=w_copies, classes=w_classes),
625                     case_desc)
626                 self.assertEqual(e_reqs, mock.call_count, case_desc)
627
628     def test_failed_storage_classes_put_requests(self):
629         cases = [
630             # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, return_code
631             [ 1, ['foo'], 1, 'bar=1', 200],
632             [ 1, ['foo'], 1, None, 503],
633             [ 2, ['foo'], 1, 'bar=1, foo=0', 200],
634             [ 3, ['foo'], 1, 'bar=1, foo=1', 200],
635             [ 3, ['foo', 'bar'], 1, 'bar=2, foo=1', 200],
636         ]
637         for w_copies, w_classes, c_copies, c_classes, return_code in cases:
638             headers = {'x-keep-replicas-stored': c_copies}
639             if c_classes is not None:
640                 headers.update({'x-keep-storage-classes-confirmed': c_classes})
641             with tutil.mock_keep_responses(self.locator, return_code, return_code, **headers):
642                 case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}"'.format(w_copies, ', '.join(w_classes), c_copies, c_classes)
643                 with self.assertRaises(arvados.errors.KeepWriteError, msg=case_desc):
644                     self.keep_client.put(self.data, copies=w_copies, classes=w_classes)
645
646 @tutil.skip_sleep
647 class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock):
648     def setUp(self):
649         self.api_client = self.mock_keep_services(count=2)
650         self.keep_client = arvados.KeepClient(api_client=self.api_client)
651         self.data = b'xyzzy'
652         self.locator = '1271ed5ef305aadabc605b1609e24c52'
653         self.test_id = arvados.util.new_request_id()
654         self.assertRegex(self.test_id, r'^req-[a-z0-9]{20}$')
655         # If we don't set request_id to None explicitly here, it will
656         # return <MagicMock name='api_client_mock.request_id'
657         # id='123456789'>:
658         self.api_client.request_id = None
659
660     def test_default_to_api_client_request_id(self):
661         self.api_client.request_id = self.test_id
662         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
663             self.keep_client.put(self.data)
664         self.assertEqual(2, len(mock.responses))
665         for resp in mock.responses:
666             self.assertProvidedRequestId(resp)
667
668         with tutil.mock_keep_responses(self.data, 200) as mock:
669             self.keep_client.get(self.locator)
670         self.assertProvidedRequestId(mock.responses[0])
671
672         with tutil.mock_keep_responses(b'', 200) as mock:
673             self.keep_client.head(self.locator)
674         self.assertProvidedRequestId(mock.responses[0])
675
676     def test_explicit_request_id(self):
677         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
678             self.keep_client.put(self.data, request_id=self.test_id)
679         self.assertEqual(2, len(mock.responses))
680         for resp in mock.responses:
681             self.assertProvidedRequestId(resp)
682
683         with tutil.mock_keep_responses(self.data, 200) as mock:
684             self.keep_client.get(self.locator, request_id=self.test_id)
685         self.assertProvidedRequestId(mock.responses[0])
686
687         with tutil.mock_keep_responses(b'', 200) as mock:
688             self.keep_client.head(self.locator, request_id=self.test_id)
689         self.assertProvidedRequestId(mock.responses[0])
690
691     def test_automatic_request_id(self):
692         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
693             self.keep_client.put(self.data)
694         self.assertEqual(2, len(mock.responses))
695         for resp in mock.responses:
696             self.assertAutomaticRequestId(resp)
697
698         with tutil.mock_keep_responses(self.data, 200) as mock:
699             self.keep_client.get(self.locator)
700         self.assertAutomaticRequestId(mock.responses[0])
701
702         with tutil.mock_keep_responses(b'', 200) as mock:
703             self.keep_client.head(self.locator)
704         self.assertAutomaticRequestId(mock.responses[0])
705
706     def assertAutomaticRequestId(self, resp):
707         hdr = [x for x in resp.getopt(pycurl.HTTPHEADER)
708                if x.startswith('X-Request-Id: ')][0]
709         self.assertNotEqual(hdr, 'X-Request-Id: '+self.test_id)
710         self.assertRegex(hdr, r'^X-Request-Id: req-[a-z0-9]{20}$')
711
712     def assertProvidedRequestId(self, resp):
713         self.assertIn('X-Request-Id: '+self.test_id,
714                       resp.getopt(pycurl.HTTPHEADER))
715
716
717 @tutil.skip_sleep
718 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
719
720     def setUp(self):
721         # expected_order[i] is the probe order for
722         # hash=md5(sprintf("%064x",i)) where there are 16 services
723         # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
724         # the first probe for the block consisting of 64 "0"
725         # characters is the service whose uuid is
726         # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
727         self.services = 16
728         self.expected_order = [
729             list('3eab2d5fc9681074'),
730             list('097dba52e648f1c3'),
731             list('c5b4e023f8a7d691'),
732             list('9d81c02e76a3bf54'),
733             ]
734         self.blocks = [
735             "{:064x}".format(x).encode()
736             for x in range(len(self.expected_order))]
737         self.hashes = [
738             hashlib.md5(self.blocks[x]).hexdigest()
739             for x in range(len(self.expected_order))]
740         self.api_client = self.mock_keep_services(count=self.services)
741         self.keep_client = arvados.KeepClient(api_client=self.api_client)
742
743     def test_weighted_service_roots_against_reference_set(self):
744         # Confirm weighted_service_roots() returns the correct order
745         for i, hash in enumerate(self.hashes):
746             roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash))
747             got_order = [
748                 re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
749                 for root in roots]
750             self.assertEqual(self.expected_order[i], got_order)
751
752     def test_get_probe_order_against_reference_set(self):
753         self._test_probe_order_against_reference_set(
754             lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
755
756     def test_head_probe_order_against_reference_set(self):
757         self._test_probe_order_against_reference_set(
758             lambda i: self.keep_client.head(self.hashes[i], num_retries=1))
759
760     def test_put_probe_order_against_reference_set(self):
761         # copies=1 prevents the test from being sensitive to races
762         # between writer threads.
763         self._test_probe_order_against_reference_set(
764             lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1))
765
766     def _test_probe_order_against_reference_set(self, op):
767         for i in range(len(self.blocks)):
768             with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \
769                  self.assertRaises(arvados.errors.KeepRequestError):
770                 op(i)
771             got_order = [
772                 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
773                 for resp in mock.responses]
774             self.assertEqual(self.expected_order[i]*2, got_order)
775
776     def test_put_probe_order_multiple_copies(self):
777         for copies in range(2, 4):
778             for i in range(len(self.blocks)):
779                 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \
780                      self.assertRaises(arvados.errors.KeepWriteError):
781                     self.keep_client.put(self.blocks[i], num_retries=2, copies=copies)
782                 got_order = [
783                     re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
784                     for resp in mock.responses]
785                 # With T threads racing to make requests, the position
786                 # of a given server in the sequence of HTTP requests
787                 # (got_order) cannot be more than T-1 positions
788                 # earlier than that server's position in the reference
789                 # probe sequence (expected_order).
790                 #
791                 # Loop invariant: we have accounted for +pos+ expected
792                 # probes, either by seeing them in +got_order+ or by
793                 # putting them in +pending+ in the hope of seeing them
794                 # later. As long as +len(pending)<T+, we haven't
795                 # started a request too early.
796                 pending = []
797                 for pos, expected in enumerate(self.expected_order[i]*3):
798                     got = got_order[pos-len(pending)]
799                     while got in pending:
800                         del pending[pending.index(got)]
801                         got = got_order[pos-len(pending)]
802                     if got != expected:
803                         pending.append(expected)
804                         self.assertLess(
805                             len(pending), copies,
806                             "pending={}, with copies={}, got {}, expected {}".format(
807                                 pending, copies, repr(got_order), repr(self.expected_order[i]*3)))
808
809     def test_probe_waste_adding_one_server(self):
810         hashes = [
811             hashlib.md5("{:064x}".format(x).encode()).hexdigest() for x in range(100)]
812         initial_services = 12
813         self.api_client = self.mock_keep_services(count=initial_services)
814         self.keep_client = arvados.KeepClient(api_client=self.api_client)
815         probes_before = [
816             self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
817         for added_services in range(1, 12):
818             api_client = self.mock_keep_services(count=initial_services+added_services)
819             keep_client = arvados.KeepClient(api_client=api_client)
820             total_penalty = 0
821             for hash_index in range(len(hashes)):
822                 probe_after = keep_client.weighted_service_roots(
823                     arvados.KeepLocator(hashes[hash_index]))
824                 penalty = probe_after.index(probes_before[hash_index][0])
825                 self.assertLessEqual(penalty, added_services)
826                 total_penalty += penalty
827             # Average penalty per block should not exceed
828             # N(added)/N(orig) by more than 20%, and should get closer
829             # to the ideal as we add data points.
830             expect_penalty = (
831                 added_services *
832                 len(hashes) / initial_services)
833             max_penalty = (
834                 expect_penalty *
835                 (120 - added_services)/100)
836             min_penalty = (
837                 expect_penalty * 8/10)
838             self.assertTrue(
839                 min_penalty <= total_penalty <= max_penalty,
840                 "With {}+{} services, {} blocks, penalty {} but expected {}..{}".format(
841                     initial_services,
842                     added_services,
843                     len(hashes),
844                     total_penalty,
845                     min_penalty,
846                     max_penalty))
847
848     def check_64_zeros_error_order(self, verb, exc_class):
849         data = b'0' * 64
850         if verb == 'get':
851             data = tutil.str_keep_locator(data)
852         # Arbitrary port number:
853         aport = random.randint(1024,65535)
854         api_client = self.mock_keep_services(service_port=aport, count=self.services)
855         keep_client = arvados.KeepClient(api_client=api_client)
856         with mock.patch('pycurl.Curl') as curl_mock, \
857              self.assertRaises(exc_class) as err_check:
858             curl_mock.return_value = tutil.FakeCurl.make(code=500, body=b'')
859             getattr(keep_client, verb)(data)
860         urls = [urllib.parse.urlparse(url)
861                 for url in err_check.exception.request_errors()]
862         self.assertEqual([('keep0x' + c, aport) for c in '3eab2d5fc9681074'],
863                          [(url.hostname, url.port) for url in urls])
864
865     def test_get_error_shows_probe_order(self):
866         self.check_64_zeros_error_order('get', arvados.errors.KeepReadError)
867
868     def test_put_error_shows_probe_order(self):
869         self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
870
871
872 class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase):
873     # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
874     # 1s worth of data and then trigger bandwidth errors before running
875     # out of data.
876     DATA = b'x'*2**11
877     BANDWIDTH_LOW_LIM = 1024
878     TIMEOUT_TIME = 1.0
879
880     class assertTakesBetween(unittest.TestCase):
881         def __init__(self, tmin, tmax):
882             self.tmin = tmin
883             self.tmax = tmax
884
885         def __enter__(self):
886             self.t0 = time.time()
887
888         def __exit__(self, *args, **kwargs):
889             # Round times to milliseconds, like CURL. Otherwise, we
890             # fail when CURL reaches a 1s timeout at 0.9998s.
891             delta = round(time.time() - self.t0, 3)
892             self.assertGreaterEqual(delta, self.tmin)
893             self.assertLessEqual(delta, self.tmax)
894
895     class assertTakesGreater(unittest.TestCase):
896         def __init__(self, tmin):
897             self.tmin = tmin
898
899         def __enter__(self):
900             self.t0 = time.time()
901
902         def __exit__(self, *args, **kwargs):
903             delta = round(time.time() - self.t0, 3)
904             self.assertGreaterEqual(delta, self.tmin)
905
906     def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
907         return arvados.KeepClient(
908             api_client=self.api_client,
909             timeout=timeouts)
910
911     def test_timeout_slow_connect(self):
912         # Can't simulate TCP delays with our own socket. Leave our
913         # stub server running uselessly, and try to connect to an
914         # unroutable IP address instead.
915         self.api_client = self.mock_keep_services(
916             count=1,
917             service_host='240.0.0.0',
918         )
919         with self.assertTakesBetween(0.1, 0.5):
920             with self.assertRaises(arvados.errors.KeepWriteError):
921                 self.keepClient().put(self.DATA, copies=1, num_retries=0)
922
923     def test_low_bandwidth_no_delays_success(self):
924         self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
925         kc = self.keepClient()
926         loc = kc.put(self.DATA, copies=1, num_retries=0)
927         self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
928
929     def test_too_low_bandwidth_no_delays_failure(self):
930         # Check that lessening bandwidth corresponds to failing
931         kc = self.keepClient()
932         loc = kc.put(self.DATA, copies=1, num_retries=0)
933         self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
934         with self.assertTakesGreater(self.TIMEOUT_TIME):
935             with self.assertRaises(arvados.errors.KeepReadError):
936                 kc.get(loc, num_retries=0)
937         with self.assertTakesGreater(self.TIMEOUT_TIME):
938             with self.assertRaises(arvados.errors.KeepWriteError):
939                 kc.put(self.DATA, copies=1, num_retries=0)
940
941     def test_low_bandwidth_with_server_response_delay_failure(self):
942         kc = self.keepClient()
943         loc = kc.put(self.DATA, copies=1, num_retries=0)
944         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
945         # Note the actual delay must be 1s longer than the low speed
946         # limit interval in order for curl to detect it reliably.
947         self.server.setdelays(response=self.TIMEOUT_TIME+1)
948         with self.assertTakesGreater(self.TIMEOUT_TIME):
949             with self.assertRaises(arvados.errors.KeepReadError):
950                 kc.get(loc, num_retries=0)
951         with self.assertTakesGreater(self.TIMEOUT_TIME):
952             with self.assertRaises(arvados.errors.KeepWriteError):
953                 kc.put(self.DATA, copies=1, num_retries=0)
954         with self.assertTakesGreater(self.TIMEOUT_TIME):
955             kc.head(loc, num_retries=0)
956
957     def test_low_bandwidth_with_server_mid_delay_failure(self):
958         kc = self.keepClient()
959         loc = kc.put(self.DATA, copies=1, num_retries=0)
960         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
961         # Note the actual delay must be 1s longer than the low speed
962         # limit interval in order for curl to detect it reliably.
963         self.server.setdelays(mid_write=self.TIMEOUT_TIME+1, mid_read=self.TIMEOUT_TIME+1)
964         with self.assertTakesGreater(self.TIMEOUT_TIME):
965             with self.assertRaises(arvados.errors.KeepReadError) as e:
966                 kc.get(loc, num_retries=0)
967         with self.assertTakesGreater(self.TIMEOUT_TIME):
968             with self.assertRaises(arvados.errors.KeepWriteError):
969                 kc.put(self.DATA, copies=1, num_retries=0)
970
971     def test_timeout_slow_request(self):
972         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
973         self.server.setdelays(request=.2)
974         self._test_connect_timeout_under_200ms(loc)
975         self.server.setdelays(request=2)
976         self._test_response_timeout_under_2s(loc)
977
978     def test_timeout_slow_response(self):
979         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
980         self.server.setdelays(response=.2)
981         self._test_connect_timeout_under_200ms(loc)
982         self.server.setdelays(response=2)
983         self._test_response_timeout_under_2s(loc)
984
985     def test_timeout_slow_response_body(self):
986         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
987         self.server.setdelays(response_body=.2)
988         self._test_connect_timeout_under_200ms(loc)
989         self.server.setdelays(response_body=2)
990         self._test_response_timeout_under_2s(loc)
991
992     def _test_connect_timeout_under_200ms(self, loc):
993         # Allow 100ms to connect, then 1s for response. Everything
994         # should work, and everything should take at least 200ms to
995         # return.
996         kc = self.keepClient(timeouts=(.1, 1))
997         with self.assertTakesBetween(.2, .3):
998             kc.put(self.DATA, copies=1, num_retries=0)
999         with self.assertTakesBetween(.2, .3):
1000             self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
1001
1002     def _test_response_timeout_under_2s(self, loc):
1003         # Allow 10s to connect, then 1s for response. Nothing should
1004         # work, and everything should take at least 1s to return.
1005         kc = self.keepClient(timeouts=(10, 1))
1006         with self.assertTakesBetween(1, 9):
1007             with self.assertRaises(arvados.errors.KeepReadError):
1008                 kc.get(loc, num_retries=0)
1009         with self.assertTakesBetween(1, 9):
1010             with self.assertRaises(arvados.errors.KeepWriteError):
1011                 kc.put(self.DATA, copies=1, num_retries=0)
1012
1013
1014 class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
1015     def mock_disks_and_gateways(self, disks=3, gateways=1):
1016         self.gateways = [{
1017                 'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
1018                 'owner_uuid': 'zzzzz-tpzed-000000000000000',
1019                 'service_host': 'gatewayhost{}'.format(i),
1020                 'service_port': 12345,
1021                 'service_ssl_flag': True,
1022                 'service_type': 'gateway:test',
1023         } for i in range(gateways)]
1024         self.gateway_roots = [
1025             "https://{service_host}:{service_port}/".format(**gw)
1026             for gw in self.gateways]
1027         self.api_client = self.mock_keep_services(
1028             count=disks, additional_services=self.gateways)
1029         self.keepClient = arvados.KeepClient(api_client=self.api_client)
1030
1031     @mock.patch('pycurl.Curl')
1032     def test_get_with_gateway_hint_first(self, MockCurl):
1033         MockCurl.return_value = tutil.FakeCurl.make(
1034             code=200, body='foo', headers={'Content-Length': 3})
1035         self.mock_disks_and_gateways()
1036         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@' + self.gateways[0]['uuid']
1037         self.assertEqual(b'foo', self.keepClient.get(locator))
1038         self.assertEqual(self.gateway_roots[0]+locator,
1039                          MockCurl.return_value.getopt(pycurl.URL).decode())
1040         self.assertEqual(True, self.keepClient.head(locator))
1041
1042     @mock.patch('pycurl.Curl')
1043     def test_get_with_gateway_hints_in_order(self, MockCurl):
1044         gateways = 4
1045         disks = 3
1046         mocks = [
1047             tutil.FakeCurl.make(code=404, body='')
1048             for _ in range(gateways+disks)
1049         ]
1050         MockCurl.side_effect = tutil.queue_with(mocks)
1051         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1052         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1053                            ['K@'+gw['uuid'] for gw in self.gateways])
1054         with self.assertRaises(arvados.errors.NotFoundError):
1055             self.keepClient.get(locator)
1056         # Gateways are tried first, in the order given.
1057         for i, root in enumerate(self.gateway_roots):
1058             self.assertEqual(root+locator,
1059                              mocks[i].getopt(pycurl.URL).decode())
1060         # Disk services are tried next.
1061         for i in range(gateways, gateways+disks):
1062             self.assertRegex(
1063                 mocks[i].getopt(pycurl.URL).decode(),
1064                 r'keep0x')
1065
1066     @mock.patch('pycurl.Curl')
1067     def test_head_with_gateway_hints_in_order(self, MockCurl):
1068         gateways = 4
1069         disks = 3
1070         mocks = [
1071             tutil.FakeCurl.make(code=404, body=b'')
1072             for _ in range(gateways+disks)
1073         ]
1074         MockCurl.side_effect = tutil.queue_with(mocks)
1075         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1076         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1077                            ['K@'+gw['uuid'] for gw in self.gateways])
1078         with self.assertRaises(arvados.errors.NotFoundError):
1079             self.keepClient.head(locator)
1080         # Gateways are tried first, in the order given.
1081         for i, root in enumerate(self.gateway_roots):
1082             self.assertEqual(root+locator,
1083                              mocks[i].getopt(pycurl.URL).decode())
1084         # Disk services are tried next.
1085         for i in range(gateways, gateways+disks):
1086             self.assertRegex(
1087                 mocks[i].getopt(pycurl.URL).decode(),
1088                 r'keep0x')
1089
1090     @mock.patch('pycurl.Curl')
1091     def test_get_with_remote_proxy_hint(self, MockCurl):
1092         MockCurl.return_value = tutil.FakeCurl.make(
1093             code=200, body=b'foo', headers={'Content-Length': 3})
1094         self.mock_disks_and_gateways()
1095         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1096         self.assertEqual(b'foo', self.keepClient.get(locator))
1097         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1098                          MockCurl.return_value.getopt(pycurl.URL).decode())
1099
1100     @mock.patch('pycurl.Curl')
1101     def test_head_with_remote_proxy_hint(self, MockCurl):
1102         MockCurl.return_value = tutil.FakeCurl.make(
1103             code=200, body=b'foo', headers={'Content-Length': 3})
1104         self.mock_disks_and_gateways()
1105         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1106         self.assertEqual(True, self.keepClient.head(locator))
1107         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1108                          MockCurl.return_value.getopt(pycurl.URL).decode())
1109
1110
1111 class KeepClientRetryTestMixin(object):
1112     # Testing with a local Keep store won't exercise the retry behavior.
1113     # Instead, our strategy is:
1114     # * Create a client with one proxy specified (pointed at a black
1115     #   hole), so there's no need to instantiate an API client, and
1116     #   all HTTP requests come from one place.
1117     # * Mock httplib's request method to provide simulated responses.
1118     # This lets us test the retry logic extensively without relying on any
1119     # supporting servers, and prevents side effects in case something hiccups.
1120     # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
1121     # run_method().
1122     #
1123     # Test classes must define TEST_PATCHER to a method that mocks
1124     # out appropriate methods in the client.
1125
1126     PROXY_ADDR = 'http://[%s]:65535/' % (tutil.TEST_HOST,)
1127     TEST_DATA = b'testdata'
1128     TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
1129
1130     def setUp(self):
1131         self.client_kwargs = {'proxy': self.PROXY_ADDR, 'local_store': ''}
1132
1133     def new_client(self, **caller_kwargs):
1134         kwargs = self.client_kwargs.copy()
1135         kwargs.update(caller_kwargs)
1136         return arvados.KeepClient(**kwargs)
1137
1138     def run_method(self, *args, **kwargs):
1139         raise NotImplementedError("test subclasses must define run_method")
1140
1141     def check_success(self, expected=None, *args, **kwargs):
1142         if expected is None:
1143             expected = self.DEFAULT_EXPECT
1144         self.assertEqual(expected, self.run_method(*args, **kwargs))
1145
1146     def check_exception(self, error_class=None, *args, **kwargs):
1147         if error_class is None:
1148             error_class = self.DEFAULT_EXCEPTION
1149         with self.assertRaises(error_class) as err:
1150             self.run_method(*args, **kwargs)
1151         return err
1152
1153     def test_immediate_success(self):
1154         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 200):
1155             self.check_success()
1156
1157     def test_retry_then_success(self):
1158         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1159             self.check_success(num_retries=3)
1160
1161     def test_exception_then_success(self):
1162         with self.TEST_PATCHER(self.DEFAULT_EXPECT, Exception('mock err'), 200):
1163             self.check_success(num_retries=3)
1164
1165     def test_no_default_retry(self):
1166         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1167             self.check_exception()
1168
1169     def test_no_retry_after_permanent_error(self):
1170         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 403, 200):
1171             self.check_exception(num_retries=3)
1172
1173     def test_error_after_retries_exhausted(self):
1174         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 500, 200):
1175             err = self.check_exception(num_retries=1)
1176         self.assertRegex(str(err.exception), r'failed to .* after 2 attempts')
1177
1178     def test_num_retries_instance_fallback(self):
1179         self.client_kwargs['num_retries'] = 3
1180         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1181             self.check_success()
1182
1183
1184 @tutil.skip_sleep
1185 class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1186     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
1187     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1188     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1189     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1190
1191     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1192                    *args, **kwargs):
1193         return self.new_client().get(locator, *args, **kwargs)
1194
1195     def test_specific_exception_when_not_found(self):
1196         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1197             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1198
1199     def test_general_exception_with_mixed_errors(self):
1200         # get should raise a NotFoundError if no server returns the block,
1201         # and a high threshold of servers report that it's not found.
1202         # This test rigs up 50/50 disagreement between two servers, and
1203         # checks that it does not become a NotFoundError.
1204         client = self.new_client()
1205         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1206             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1207                 client.get(self.HINTED_LOCATOR)
1208             self.assertNotIsInstance(
1209                 exc_check.exception, arvados.errors.NotFoundError,
1210                 "mixed errors raised NotFoundError")
1211
1212     def test_hint_server_can_succeed_without_retries(self):
1213         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1214             self.check_success(locator=self.HINTED_LOCATOR)
1215
1216     def test_try_next_server_after_timeout(self):
1217         with tutil.mock_keep_responses(
1218                 (socket.timeout("timed out"), 200),
1219                 (self.DEFAULT_EXPECT, 200)):
1220             self.check_success(locator=self.HINTED_LOCATOR)
1221
1222     def test_retry_data_with_wrong_checksum(self):
1223         with tutil.mock_keep_responses(
1224                 ('baddata', 200),
1225                 (self.DEFAULT_EXPECT, 200)):
1226             self.check_success(locator=self.HINTED_LOCATOR)
1227
1228 @tutil.skip_sleep
1229 class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1230     DEFAULT_EXPECT = True
1231     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1232     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1233     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1234
1235     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1236                    *args, **kwargs):
1237         return self.new_client().head(locator, *args, **kwargs)
1238
1239     def test_specific_exception_when_not_found(self):
1240         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1241             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1242
1243     def test_general_exception_with_mixed_errors(self):
1244         # head should raise a NotFoundError if no server returns the block,
1245         # and a high threshold of servers report that it's not found.
1246         # This test rigs up 50/50 disagreement between two servers, and
1247         # checks that it does not become a NotFoundError.
1248         client = self.new_client()
1249         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1250             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1251                 client.head(self.HINTED_LOCATOR)
1252             self.assertNotIsInstance(
1253                 exc_check.exception, arvados.errors.NotFoundError,
1254                 "mixed errors raised NotFoundError")
1255
1256     def test_hint_server_can_succeed_without_retries(self):
1257         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1258             self.check_success(locator=self.HINTED_LOCATOR)
1259
1260     def test_try_next_server_after_timeout(self):
1261         with tutil.mock_keep_responses(
1262                 (socket.timeout("timed out"), 200),
1263                 (self.DEFAULT_EXPECT, 200)):
1264             self.check_success(locator=self.HINTED_LOCATOR)
1265
1266 @tutil.skip_sleep
1267 class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1268     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
1269     DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
1270     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1271
1272     def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
1273                    copies=1, *args, **kwargs):
1274         return self.new_client().put(data, copies, *args, **kwargs)
1275
1276     def test_do_not_send_multiple_copies_to_same_server(self):
1277         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200):
1278             self.check_exception(copies=2, num_retries=3)
1279
1280
1281 class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
1282
1283     class FakeKeepService(object):
1284         def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
1285             self.delay = delay
1286             self.will_succeed = will_succeed
1287             self.will_raise = will_raise
1288             self._result = {}
1289             self._result['headers'] = {}
1290             self._result['headers']['x-keep-replicas-stored'] = str(replicas)
1291             self._result['headers']['x-keep-storage-classes-confirmed'] = 'default={}'.format(replicas)
1292             self._result['body'] = 'foobar'
1293
1294         def put(self, data_hash, data, timeout, headers):
1295             time.sleep(self.delay)
1296             if self.will_raise is not None:
1297                 raise self.will_raise
1298             return self.will_succeed
1299
1300         def last_result(self):
1301             if self.will_succeed:
1302                 return self._result
1303
1304         def finished(self):
1305             return False
1306
1307     def setUp(self):
1308         self.copies = 3
1309         self.pool = arvados.KeepClient.KeepWriterThreadPool(
1310             data = 'foo',
1311             data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1312             max_service_replicas = self.copies,
1313             copies = self.copies
1314         )
1315
1316     def test_only_write_enough_on_success(self):
1317         for i in range(10):
1318             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1319             self.pool.add_task(ks, None)
1320         self.pool.join()
1321         self.assertEqual(self.pool.done(), (self.copies, []))
1322
1323     def test_only_write_enough_on_partial_success(self):
1324         for i in range(5):
1325             ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
1326             self.pool.add_task(ks, None)
1327             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1328             self.pool.add_task(ks, None)
1329         self.pool.join()
1330         self.assertEqual(self.pool.done(), (self.copies, []))
1331
1332     def test_only_write_enough_when_some_crash(self):
1333         for i in range(5):
1334             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1335             self.pool.add_task(ks, None)
1336             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1337             self.pool.add_task(ks, None)
1338         self.pool.join()
1339         self.assertEqual(self.pool.done(), (self.copies, []))
1340
1341     def test_fail_when_too_many_crash(self):
1342         for i in range(self.copies+1):
1343             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1344             self.pool.add_task(ks, None)
1345         for i in range(self.copies-1):
1346             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1347             self.pool.add_task(ks, None)
1348         self.pool.join()
1349         self.assertEqual(self.pool.done(), (self.copies-1, []))
1350
1351
1352 @tutil.skip_sleep
1353 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock):
1354     # Test put()s that need two distinct servers to succeed, possibly
1355     # requiring multiple passes through the retry loop.
1356
1357     def setUp(self):
1358         self.api_client = self.mock_keep_services(count=2)
1359         self.keep_client = arvados.KeepClient(api_client=self.api_client)
1360
1361     def test_success_after_exception(self):
1362         with tutil.mock_keep_responses(
1363                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1364                 Exception('mock err'), 200, 200) as req_mock:
1365             self.keep_client.put('foo', num_retries=1, copies=2)
1366         self.assertEqual(3, req_mock.call_count)
1367
1368     def test_success_after_retryable_error(self):
1369         with tutil.mock_keep_responses(
1370                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1371                 500, 200, 200) as req_mock:
1372             self.keep_client.put('foo', num_retries=1, copies=2)
1373         self.assertEqual(3, req_mock.call_count)
1374
1375     def test_fail_after_final_error(self):
1376         # First retry loop gets a 200 (can't achieve replication by
1377         # storing again on that server) and a 400 (can't retry that
1378         # server at all), so we shouldn't try a third request.
1379         with tutil.mock_keep_responses(
1380                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1381                 200, 400, 200) as req_mock:
1382             with self.assertRaises(arvados.errors.KeepWriteError):
1383                 self.keep_client.put('foo', num_retries=1, copies=2)
1384         self.assertEqual(2, req_mock.call_count)
1385
1386 class KeepClientAPIErrorTest(unittest.TestCase):
1387     def test_api_fail(self):
1388         class ApiMock(object):
1389             def __getattr__(self, r):
1390                 if r == "api_token":
1391                     return "abc"
1392                 elif r == "insecure":
1393                     return False
1394                 elif r == "config":
1395                     return lambda: {}
1396                 else:
1397                     raise arvados.errors.KeepReadError()
1398         keep_client = arvados.KeepClient(api_client=ApiMock(),
1399                                              proxy='', local_store='')
1400
1401         # The bug this is testing for is that if an API (not
1402         # keepstore) exception is thrown as part of a get(), the next
1403         # attempt to get that same block will result in a deadlock.
1404         # This is why there are two get()s in a row.  Unfortunately,
1405         # the failure mode for this test is that the test suite
1406         # deadlocks, there isn't a good way to avoid that without
1407         # adding a special case that has no use except for this test.
1408
1409         with self.assertRaises(arvados.errors.KeepReadError):
1410             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1411         with self.assertRaises(arvados.errors.KeepReadError):
1412             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")