17755: Merge branch 'main' into 17755-add-singularity-to-compute-image
[arvados.git] / sdk / python / tests / test_keep_client.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 standard_library.install_aliases()
9 from builtins import str
10 from builtins import range
11 from builtins import object
12 import hashlib
13 import mock
14 import os
15 import pycurl
16 import random
17 import re
18 import socket
19 import sys
20 import time
21 import unittest
22 import urllib.parse
23
24 import arvados
25 import arvados.retry
26 import arvados.util
27 from . import arvados_testutil as tutil
28 from . import keepstub
29 from . import run_test_server
30
31 class KeepTestCase(run_test_server.TestCaseWithServers):
32     MAIN_SERVER = {}
33     KEEP_SERVER = {}
34
35     @classmethod
36     def setUpClass(cls):
37         super(KeepTestCase, cls).setUpClass()
38         run_test_server.authorize_with("admin")
39         cls.api_client = arvados.api('v1')
40         cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
41                                              proxy='', local_store='')
42
43     def test_KeepBasicRWTest(self):
44         self.assertEqual(0, self.keep_client.upload_counter.get())
45         foo_locator = self.keep_client.put('foo')
46         self.assertRegex(
47             foo_locator,
48             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
49             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
50
51         # 6 bytes because uploaded 2 copies
52         self.assertEqual(6, self.keep_client.upload_counter.get())
53
54         self.assertEqual(0, self.keep_client.download_counter.get())
55         self.assertEqual(self.keep_client.get(foo_locator),
56                          b'foo',
57                          'wrong content from Keep.get(md5("foo"))')
58         self.assertEqual(3, self.keep_client.download_counter.get())
59
60     def test_KeepBinaryRWTest(self):
61         blob_str = b'\xff\xfe\xf7\x00\x01\x02'
62         blob_locator = self.keep_client.put(blob_str)
63         self.assertRegex(
64             blob_locator,
65             '^7fc7c53b45e53926ba52821140fef396\+6',
66             ('wrong locator from Keep.put(<binarydata>):' + blob_locator))
67         self.assertEqual(self.keep_client.get(blob_locator),
68                          blob_str,
69                          'wrong content from Keep.get(md5(<binarydata>))')
70
71     def test_KeepLongBinaryRWTest(self):
72         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
73         for i in range(0, 23):
74             blob_data = blob_data + blob_data
75         blob_locator = self.keep_client.put(blob_data)
76         self.assertRegex(
77             blob_locator,
78             '^84d90fc0d8175dd5dcfab04b999bc956\+67108864',
79             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
80         self.assertEqual(self.keep_client.get(blob_locator),
81                          blob_data,
82                          'wrong content from Keep.get(md5(<binarydata>))')
83
84     @unittest.skip("unreliable test - please fix and close #8752")
85     def test_KeepSingleCopyRWTest(self):
86         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
87         blob_locator = self.keep_client.put(blob_data, copies=1)
88         self.assertRegex(
89             blob_locator,
90             '^c902006bc98a3eb4a3663b65ab4a6fab\+8',
91             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
92         self.assertEqual(self.keep_client.get(blob_locator),
93                          blob_data,
94                          'wrong content from Keep.get(md5(<binarydata>))')
95
96     def test_KeepEmptyCollectionTest(self):
97         blob_locator = self.keep_client.put('', copies=1)
98         self.assertRegex(
99             blob_locator,
100             '^d41d8cd98f00b204e9800998ecf8427e\+0',
101             ('wrong locator from Keep.put(""): ' + blob_locator))
102
103     def test_unicode_must_be_ascii(self):
104         # If unicode type, must only consist of valid ASCII
105         foo_locator = self.keep_client.put(u'foo')
106         self.assertRegex(
107             foo_locator,
108             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
109             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
110
111         if sys.version_info < (3, 0):
112             with self.assertRaises(UnicodeEncodeError):
113                 # Error if it is not ASCII
114                 self.keep_client.put(u'\xe2')
115
116         with self.assertRaises(AttributeError):
117             # Must be bytes or have an encode() method
118             self.keep_client.put({})
119
120     def test_KeepHeadTest(self):
121         locator = self.keep_client.put('test_head')
122         self.assertRegex(
123             locator,
124             '^b9a772c7049325feb7130fff1f8333e9\+9',
125             'wrong md5 hash from Keep.put for "test_head": ' + locator)
126         self.assertEqual(True, self.keep_client.head(locator))
127         self.assertEqual(self.keep_client.get(locator),
128                          b'test_head',
129                          'wrong content from Keep.get for "test_head"')
130
131 class KeepPermissionTestCase(run_test_server.TestCaseWithServers):
132     MAIN_SERVER = {}
133     KEEP_SERVER = {'blob_signing': True}
134
135     def test_KeepBasicRWTest(self):
136         run_test_server.authorize_with('active')
137         keep_client = arvados.KeepClient()
138         foo_locator = keep_client.put('foo')
139         self.assertRegex(
140             foo_locator,
141             r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
142             'invalid locator from Keep.put("foo"): ' + foo_locator)
143         self.assertEqual(keep_client.get(foo_locator),
144                          b'foo',
145                          'wrong content from Keep.get(md5("foo"))')
146
147         # GET with an unsigned locator => NotFound
148         bar_locator = keep_client.put('bar')
149         unsigned_bar_locator = "37b51d194a7513e45b56f6524f2d51f2+3"
150         self.assertRegex(
151             bar_locator,
152             r'^37b51d194a7513e45b56f6524f2d51f2\+3\+A[a-f0-9]+@[a-f0-9]+$',
153             'invalid locator from Keep.put("bar"): ' + bar_locator)
154         self.assertRaises(arvados.errors.NotFoundError,
155                           keep_client.get,
156                           unsigned_bar_locator)
157
158         # GET from a different user => NotFound
159         run_test_server.authorize_with('spectator')
160         self.assertRaises(arvados.errors.NotFoundError,
161                           arvados.Keep.get,
162                           bar_locator)
163
164         # Unauthenticated GET for a signed locator => NotFound
165         # Unauthenticated GET for an unsigned locator => NotFound
166         keep_client.api_token = ''
167         self.assertRaises(arvados.errors.NotFoundError,
168                           keep_client.get,
169                           bar_locator)
170         self.assertRaises(arvados.errors.NotFoundError,
171                           keep_client.get,
172                           unsigned_bar_locator)
173
174
175 class KeepProxyTestCase(run_test_server.TestCaseWithServers):
176     MAIN_SERVER = {}
177     KEEP_SERVER = {}
178     KEEP_PROXY_SERVER = {}
179
180     @classmethod
181     def setUpClass(cls):
182         super(KeepProxyTestCase, cls).setUpClass()
183         run_test_server.authorize_with('active')
184         cls.api_client = arvados.api('v1')
185
186     def tearDown(self):
187         arvados.config.settings().pop('ARVADOS_EXTERNAL_CLIENT', None)
188         super(KeepProxyTestCase, self).tearDown()
189
190     def test_KeepProxyTest1(self):
191         # Will use ARVADOS_KEEP_SERVICES environment variable that
192         # is set by setUpClass().
193         keep_client = arvados.KeepClient(api_client=self.api_client,
194                                          local_store='')
195         baz_locator = keep_client.put('baz')
196         self.assertRegex(
197             baz_locator,
198             '^73feffa4b7f6bb68e44cf984c85f6e88\+3',
199             'wrong md5 hash from Keep.put("baz"): ' + baz_locator)
200         self.assertEqual(keep_client.get(baz_locator),
201                          b'baz',
202                          'wrong content from Keep.get(md5("baz"))')
203         self.assertTrue(keep_client.using_proxy)
204
205     def test_KeepProxyTest2(self):
206         # Don't instantiate the proxy directly, but set the X-External-Client
207         # header.  The API server should direct us to the proxy.
208         arvados.config.settings()['ARVADOS_EXTERNAL_CLIENT'] = 'true'
209         keep_client = arvados.KeepClient(api_client=self.api_client,
210                                          proxy='', local_store='')
211         baz_locator = keep_client.put('baz2')
212         self.assertRegex(
213             baz_locator,
214             '^91f372a266fe2bf2823cb8ec7fda31ce\+4',
215             'wrong md5 hash from Keep.put("baz2"): ' + baz_locator)
216         self.assertEqual(keep_client.get(baz_locator),
217                          b'baz2',
218                          'wrong content from Keep.get(md5("baz2"))')
219         self.assertTrue(keep_client.using_proxy)
220
221     def test_KeepProxyTestMultipleURIs(self):
222         # Test using ARVADOS_KEEP_SERVICES env var overriding any
223         # existing proxy setting and setting multiple proxies
224         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
225         keep_client = arvados.KeepClient(api_client=self.api_client,
226                                          local_store='')
227         uris = [x['_service_root'] for x in keep_client._keep_services]
228         self.assertEqual(uris, ['http://10.0.0.1/',
229                                 'https://foo.example.org:1234/'])
230
231     def test_KeepProxyTestInvalidURI(self):
232         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org'
233         with self.assertRaises(arvados.errors.ArgumentError):
234             keep_client = arvados.KeepClient(api_client=self.api_client,
235                                              local_store='')
236
237
238 class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
239     def get_service_roots(self, api_client):
240         keep_client = arvados.KeepClient(api_client=api_client)
241         services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
242         return [urllib.parse.urlparse(url) for url in sorted(services)]
243
244     def test_ssl_flag_respected_in_roots(self):
245         for ssl_flag in [False, True]:
246             services = self.get_service_roots(self.mock_keep_services(
247                 service_ssl_flag=ssl_flag))
248             self.assertEqual(
249                 ('https' if ssl_flag else 'http'), services[0].scheme)
250
251     def test_correct_ports_with_ipv6_addresses(self):
252         service = self.get_service_roots(self.mock_keep_services(
253             service_type='proxy', service_host='100::1', service_port=10, count=1))[0]
254         self.assertEqual('100::1', service.hostname)
255         self.assertEqual(10, service.port)
256
257     def test_insecure_disables_tls_verify(self):
258         api_client = self.mock_keep_services(count=1)
259         force_timeout = socket.timeout("timed out")
260
261         api_client.insecure = True
262         with tutil.mock_keep_responses(b'foo', 200) as mock:
263             keep_client = arvados.KeepClient(api_client=api_client)
264             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
265             self.assertEqual(
266                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
267                 0)
268
269         api_client.insecure = False
270         with tutil.mock_keep_responses(b'foo', 200) as mock:
271             keep_client = arvados.KeepClient(api_client=api_client)
272             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
273             # getopt()==None here means we didn't change the
274             # default. If we were using real pycurl instead of a mock,
275             # it would return the default value 1.
276             self.assertEqual(
277                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
278                 None)
279
280     def test_refresh_signature(self):
281         blk_digest = '6f5902ac237024bdd0c176cb93063dc4+11'
282         blk_sig = 'da39a3ee5e6b4b0d3255bfef95601890afd80709@53bed294'
283         local_loc = blk_digest+'+A'+blk_sig
284         remote_loc = blk_digest+'+R'+blk_sig
285         api_client = self.mock_keep_services(count=1)
286         headers = {'X-Keep-Locator':local_loc}
287         with tutil.mock_keep_responses('', 200, **headers):
288             # Check that the translated locator gets returned
289             keep_client = arvados.KeepClient(api_client=api_client)
290             self.assertEqual(local_loc, keep_client.refresh_signature(remote_loc))
291             # Check that refresh_signature() uses the correct method and headers
292             keep_client._get_or_head = mock.MagicMock()
293             keep_client.refresh_signature(remote_loc)
294             args, kwargs = keep_client._get_or_head.call_args_list[0]
295             self.assertIn(remote_loc, args)
296             self.assertEqual("HEAD", kwargs['method'])
297             self.assertIn('X-Keep-Signature', kwargs['headers'])
298
299     # test_*_timeout verify that KeepClient instructs pycurl to use
300     # the appropriate connection and read timeouts. They don't care
301     # whether pycurl actually exhibits the expected timeout behavior
302     # -- those tests are in the KeepClientTimeout test class.
303
304     def test_get_timeout(self):
305         api_client = self.mock_keep_services(count=1)
306         force_timeout = socket.timeout("timed out")
307         with tutil.mock_keep_responses(force_timeout, 0) as mock:
308             keep_client = arvados.KeepClient(api_client=api_client)
309             with self.assertRaises(arvados.errors.KeepReadError):
310                 keep_client.get('ffffffffffffffffffffffffffffffff')
311             self.assertEqual(
312                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
313                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
314             self.assertEqual(
315                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
316                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
317             self.assertEqual(
318                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
319                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
320
321     def test_put_timeout(self):
322         api_client = self.mock_keep_services(count=1)
323         force_timeout = socket.timeout("timed out")
324         with tutil.mock_keep_responses(force_timeout, 0) as mock:
325             keep_client = arvados.KeepClient(api_client=api_client)
326             with self.assertRaises(arvados.errors.KeepWriteError):
327                 keep_client.put(b'foo')
328             self.assertEqual(
329                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
330                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
331             self.assertEqual(
332                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
333                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
334             self.assertEqual(
335                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
336                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
337
338     def test_head_timeout(self):
339         api_client = self.mock_keep_services(count=1)
340         force_timeout = socket.timeout("timed out")
341         with tutil.mock_keep_responses(force_timeout, 0) as mock:
342             keep_client = arvados.KeepClient(api_client=api_client)
343             with self.assertRaises(arvados.errors.KeepReadError):
344                 keep_client.head('ffffffffffffffffffffffffffffffff')
345             self.assertEqual(
346                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
347                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
348             self.assertEqual(
349                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
350                 None)
351             self.assertEqual(
352                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
353                 None)
354
355     def test_proxy_get_timeout(self):
356         api_client = self.mock_keep_services(service_type='proxy', count=1)
357         force_timeout = socket.timeout("timed out")
358         with tutil.mock_keep_responses(force_timeout, 0) as mock:
359             keep_client = arvados.KeepClient(api_client=api_client)
360             with self.assertRaises(arvados.errors.KeepReadError):
361                 keep_client.get('ffffffffffffffffffffffffffffffff')
362             self.assertEqual(
363                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
364                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
365             self.assertEqual(
366                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
367                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
368             self.assertEqual(
369                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
370                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
371
372     def test_proxy_head_timeout(self):
373         api_client = self.mock_keep_services(service_type='proxy', count=1)
374         force_timeout = socket.timeout("timed out")
375         with tutil.mock_keep_responses(force_timeout, 0) as mock:
376             keep_client = arvados.KeepClient(api_client=api_client)
377             with self.assertRaises(arvados.errors.KeepReadError):
378                 keep_client.head('ffffffffffffffffffffffffffffffff')
379             self.assertEqual(
380                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
381                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
382             self.assertEqual(
383                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
384                 None)
385             self.assertEqual(
386                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
387                 None)
388
389     def test_proxy_put_timeout(self):
390         api_client = self.mock_keep_services(service_type='proxy', count=1)
391         force_timeout = socket.timeout("timed out")
392         with tutil.mock_keep_responses(force_timeout, 0) as mock:
393             keep_client = arvados.KeepClient(api_client=api_client)
394             with self.assertRaises(arvados.errors.KeepWriteError):
395                 keep_client.put('foo')
396             self.assertEqual(
397                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
398                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
399             self.assertEqual(
400                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
401                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
402             self.assertEqual(
403                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
404                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
405
406     def check_no_services_error(self, verb, exc_class):
407         api_client = mock.MagicMock(name='api_client')
408         api_client.keep_services().accessible().execute.side_effect = (
409             arvados.errors.ApiError)
410         keep_client = arvados.KeepClient(api_client=api_client)
411         with self.assertRaises(exc_class) as err_check:
412             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
413         self.assertEqual(0, len(err_check.exception.request_errors()))
414
415     def test_get_error_with_no_services(self):
416         self.check_no_services_error('get', arvados.errors.KeepReadError)
417
418     def test_head_error_with_no_services(self):
419         self.check_no_services_error('head', arvados.errors.KeepReadError)
420
421     def test_put_error_with_no_services(self):
422         self.check_no_services_error('put', arvados.errors.KeepWriteError)
423
424     def check_errors_from_last_retry(self, verb, exc_class):
425         api_client = self.mock_keep_services(count=2)
426         req_mock = tutil.mock_keep_responses(
427             "retry error reporting test", 500, 500, 500, 500, 500, 500, 502, 502)
428         with req_mock, tutil.skip_sleep, \
429                 self.assertRaises(exc_class) as err_check:
430             keep_client = arvados.KeepClient(api_client=api_client)
431             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
432                                        num_retries=3)
433         self.assertEqual([502, 502], [
434                 getattr(error, 'status_code', None)
435                 for error in err_check.exception.request_errors().values()])
436         self.assertRegex(str(err_check.exception), r'failed to (read|write) .* after 4 attempts')
437
438     def test_get_error_reflects_last_retry(self):
439         self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
440
441     def test_head_error_reflects_last_retry(self):
442         self.check_errors_from_last_retry('head', arvados.errors.KeepReadError)
443
444     def test_put_error_reflects_last_retry(self):
445         self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
446
447     def test_put_error_does_not_include_successful_puts(self):
448         data = 'partial failure test'
449         data_loc = tutil.str_keep_locator(data)
450         api_client = self.mock_keep_services(count=3)
451         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
452                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
453             keep_client = arvados.KeepClient(api_client=api_client)
454             keep_client.put(data)
455         self.assertEqual(2, len(exc_check.exception.request_errors()))
456
457     def test_proxy_put_with_no_writable_services(self):
458         data = 'test with no writable services'
459         data_loc = tutil.str_keep_locator(data)
460         api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
461         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
462                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
463           keep_client = arvados.KeepClient(api_client=api_client)
464           keep_client.put(data)
465         self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
466         self.assertEqual(0, len(exc_check.exception.request_errors()))
467
468     def test_oddball_service_get(self):
469         body = b'oddball service get'
470         api_client = self.mock_keep_services(service_type='fancynewblobstore')
471         with tutil.mock_keep_responses(body, 200):
472             keep_client = arvados.KeepClient(api_client=api_client)
473             actual = keep_client.get(tutil.str_keep_locator(body))
474         self.assertEqual(body, actual)
475
476     def test_oddball_service_put(self):
477         body = b'oddball service put'
478         pdh = tutil.str_keep_locator(body)
479         api_client = self.mock_keep_services(service_type='fancynewblobstore')
480         with tutil.mock_keep_responses(pdh, 200):
481             keep_client = arvados.KeepClient(api_client=api_client)
482             actual = keep_client.put(body, copies=1)
483         self.assertEqual(pdh, actual)
484
485     def test_oddball_service_writer_count(self):
486         body = b'oddball service writer count'
487         pdh = tutil.str_keep_locator(body)
488         api_client = self.mock_keep_services(service_type='fancynewblobstore',
489                                              count=4)
490         headers = {'x-keep-replicas-stored': 3}
491         with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
492                                        **headers) as req_mock:
493             keep_client = arvados.KeepClient(api_client=api_client)
494             actual = keep_client.put(body, copies=2)
495         self.assertEqual(pdh, actual)
496         self.assertEqual(1, req_mock.call_count)
497
498
499 @tutil.skip_sleep
500 class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
501     def setUp(self):
502         self.api_client = self.mock_keep_services(count=2)
503         self.keep_client = arvados.KeepClient(api_client=self.api_client)
504         self.data = b'xyzzy'
505         self.locator = '1271ed5ef305aadabc605b1609e24c52'
506
507     @mock.patch('arvados.KeepClient.KeepService.get')
508     def test_get_request_cache(self, get_mock):
509         with tutil.mock_keep_responses(self.data, 200, 200):
510             self.keep_client.get(self.locator)
511             self.keep_client.get(self.locator)
512         # Request already cached, don't require more than one request
513         get_mock.assert_called_once()
514
515     @mock.patch('arvados.KeepClient.KeepService.get')
516     def test_head_request_cache(self, get_mock):
517         with tutil.mock_keep_responses(self.data, 200, 200):
518             self.keep_client.head(self.locator)
519             self.keep_client.head(self.locator)
520         # Don't cache HEAD requests so that they're not confused with GET reqs
521         self.assertEqual(2, get_mock.call_count)
522
523     @mock.patch('arvados.KeepClient.KeepService.get')
524     def test_head_and_then_get_return_different_responses(self, get_mock):
525         head_resp = None
526         get_resp = None
527         get_mock.side_effect = ['first response', 'second response']
528         with tutil.mock_keep_responses(self.data, 200, 200):
529             head_resp = self.keep_client.head(self.locator)
530             get_resp = self.keep_client.get(self.locator)
531         self.assertEqual('first response', head_resp)
532         # First reponse was not cached because it was from a HEAD request.
533         self.assertNotEqual(head_resp, get_resp)
534
535 @tutil.skip_sleep
536 class KeepStorageClassesTestCase(unittest.TestCase, tutil.ApiClientMock):
537     def setUp(self):
538         self.api_client = self.mock_keep_services(count=2)
539         self.keep_client = arvados.KeepClient(api_client=self.api_client)
540         self.data = b'xyzzy'
541         self.locator = '1271ed5ef305aadabc605b1609e24c52'
542
543     def test_storage_classes_req_header(self):
544         cases = [
545             # requested, expected
546             [['foo'], 'X-Keep-Storage-Classes: foo'],
547             [['bar', 'foo'], 'X-Keep-Storage-Classes: bar, foo'],
548             [[], None],
549         ]
550         for req_classes, expected_header in cases:
551             headers = {'x-keep-replicas-stored': 1}
552             if len(req_classes) > 0:
553                 confirmed_hdr = ', '.join(["{}=1".format(cls) for cls in req_classes])
554                 headers.update({'x-keep-storage-classes-confirmed': confirmed_hdr})
555             with tutil.mock_keep_responses(self.locator, 200, **headers) as mock:
556                 self.keep_client.put(self.data, copies=1, classes=req_classes)
557                 resp = mock.responses[0]
558                 if expected_header is not None:
559                     self.assertIn(expected_header, resp.getopt(pycurl.HTTPHEADER))
560                 else:
561                     for hdr in resp.getopt(pycurl.HTTPHEADER):
562                         self.assertNotRegex(hdr, r'^X-Keep-Storage-Classes.*')
563
564     def test_partial_storage_classes_put(self):
565         headers = {
566             'x-keep-replicas-stored': 1,
567             'x-keep-storage-classes-confirmed': 'foo=1'}
568         with tutil.mock_keep_responses(self.locator, 200, 503, **headers) as mock:
569             with self.assertRaises(arvados.errors.KeepWriteError):
570                 self.keep_client.put(self.data, copies=1, classes=['foo', 'bar'])
571             # 1st request, both classes pending
572             req1_headers = mock.responses[0].getopt(pycurl.HTTPHEADER)
573             self.assertIn('X-Keep-Storage-Classes: bar, foo', req1_headers)
574             # 2nd try, 'foo' class already satisfied
575             req2_headers = mock.responses[1].getopt(pycurl.HTTPHEADER)
576             self.assertIn('X-Keep-Storage-Classes: bar', req2_headers)
577
578     def test_successful_storage_classes_put_requests(self):
579         cases = [
580             # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, expected_requests
581             [ 1, ['foo'], 1, 'foo=1', 1],
582             [ 1, ['foo'], 2, 'foo=2', 1],
583             [ 2, ['foo'], 2, 'foo=2', 1],
584             [ 2, ['foo'], 1, 'foo=1', 2],
585             [ 1, ['foo', 'bar'], 1, 'foo=1, bar=1', 1],
586             [ 1, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
587             [ 2, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
588             [ 2, ['foo', 'bar'], 1, 'foo=1, bar=1', 2],
589             [ 1, ['foo', 'bar'], 1, None, 1],
590             [ 1, ['foo'], 1, None, 1],
591             [ 2, ['foo'], 2, None, 1],
592             [ 2, ['foo'], 1, None, 2],
593         ]
594         for w_copies, w_classes, c_copies, c_classes, e_reqs in cases:
595             headers = {'x-keep-replicas-stored': c_copies}
596             if c_classes is not None:
597                 headers.update({'x-keep-storage-classes-confirmed': c_classes})
598             with tutil.mock_keep_responses(self.locator, 200, 200, **headers) as mock:
599                 case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}", expected_requests={}'.format(w_copies, ', '.join(w_classes), c_copies, c_classes, e_reqs)
600                 self.assertEqual(self.locator,
601                     self.keep_client.put(self.data, copies=w_copies, classes=w_classes),
602                     case_desc)
603                 self.assertEqual(e_reqs, mock.call_count, case_desc)
604
605     def test_failed_storage_classes_put_requests(self):
606         cases = [
607             # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, return_code
608             [ 1, ['foo'], 1, 'bar=1', 200],
609             [ 1, ['foo'], 1, None, 503],
610             [ 2, ['foo'], 1, 'bar=1, foo=0', 200],
611             [ 3, ['foo'], 1, 'bar=1, foo=1', 200],
612             [ 3, ['foo', 'bar'], 1, 'bar=2, foo=1', 200],
613         ]
614         for w_copies, w_classes, c_copies, c_classes, return_code in cases:
615             headers = {'x-keep-replicas-stored': c_copies}
616             if c_classes is not None:
617                 headers.update({'x-keep-storage-classes-confirmed': c_classes})
618             with tutil.mock_keep_responses(self.locator, return_code, return_code, **headers):
619                 case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}"'.format(w_copies, ', '.join(w_classes), c_copies, c_classes)
620                 with self.assertRaises(arvados.errors.KeepWriteError, msg=case_desc):
621                     self.keep_client.put(self.data, copies=w_copies, classes=w_classes)
622
623 @tutil.skip_sleep
624 class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock):
625     def setUp(self):
626         self.api_client = self.mock_keep_services(count=2)
627         self.keep_client = arvados.KeepClient(api_client=self.api_client)
628         self.data = b'xyzzy'
629         self.locator = '1271ed5ef305aadabc605b1609e24c52'
630         self.test_id = arvados.util.new_request_id()
631         self.assertRegex(self.test_id, r'^req-[a-z0-9]{20}$')
632         # If we don't set request_id to None explicitly here, it will
633         # return <MagicMock name='api_client_mock.request_id'
634         # id='123456789'>:
635         self.api_client.request_id = None
636
637     def test_default_to_api_client_request_id(self):
638         self.api_client.request_id = self.test_id
639         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
640             self.keep_client.put(self.data)
641         self.assertEqual(2, len(mock.responses))
642         for resp in mock.responses:
643             self.assertProvidedRequestId(resp)
644
645         with tutil.mock_keep_responses(self.data, 200) as mock:
646             self.keep_client.get(self.locator)
647         self.assertProvidedRequestId(mock.responses[0])
648
649         with tutil.mock_keep_responses(b'', 200) as mock:
650             self.keep_client.head(self.locator)
651         self.assertProvidedRequestId(mock.responses[0])
652
653     def test_explicit_request_id(self):
654         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
655             self.keep_client.put(self.data, request_id=self.test_id)
656         self.assertEqual(2, len(mock.responses))
657         for resp in mock.responses:
658             self.assertProvidedRequestId(resp)
659
660         with tutil.mock_keep_responses(self.data, 200) as mock:
661             self.keep_client.get(self.locator, request_id=self.test_id)
662         self.assertProvidedRequestId(mock.responses[0])
663
664         with tutil.mock_keep_responses(b'', 200) as mock:
665             self.keep_client.head(self.locator, request_id=self.test_id)
666         self.assertProvidedRequestId(mock.responses[0])
667
668     def test_automatic_request_id(self):
669         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
670             self.keep_client.put(self.data)
671         self.assertEqual(2, len(mock.responses))
672         for resp in mock.responses:
673             self.assertAutomaticRequestId(resp)
674
675         with tutil.mock_keep_responses(self.data, 200) as mock:
676             self.keep_client.get(self.locator)
677         self.assertAutomaticRequestId(mock.responses[0])
678
679         with tutil.mock_keep_responses(b'', 200) as mock:
680             self.keep_client.head(self.locator)
681         self.assertAutomaticRequestId(mock.responses[0])
682
683     def assertAutomaticRequestId(self, resp):
684         hdr = [x for x in resp.getopt(pycurl.HTTPHEADER)
685                if x.startswith('X-Request-Id: ')][0]
686         self.assertNotEqual(hdr, 'X-Request-Id: '+self.test_id)
687         self.assertRegex(hdr, r'^X-Request-Id: req-[a-z0-9]{20}$')
688
689     def assertProvidedRequestId(self, resp):
690         self.assertIn('X-Request-Id: '+self.test_id,
691                       resp.getopt(pycurl.HTTPHEADER))
692
693
694 @tutil.skip_sleep
695 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
696
697     def setUp(self):
698         # expected_order[i] is the probe order for
699         # hash=md5(sprintf("%064x",i)) where there are 16 services
700         # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
701         # the first probe for the block consisting of 64 "0"
702         # characters is the service whose uuid is
703         # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
704         self.services = 16
705         self.expected_order = [
706             list('3eab2d5fc9681074'),
707             list('097dba52e648f1c3'),
708             list('c5b4e023f8a7d691'),
709             list('9d81c02e76a3bf54'),
710             ]
711         self.blocks = [
712             "{:064x}".format(x).encode()
713             for x in range(len(self.expected_order))]
714         self.hashes = [
715             hashlib.md5(self.blocks[x]).hexdigest()
716             for x in range(len(self.expected_order))]
717         self.api_client = self.mock_keep_services(count=self.services)
718         self.keep_client = arvados.KeepClient(api_client=self.api_client)
719
720     def test_weighted_service_roots_against_reference_set(self):
721         # Confirm weighted_service_roots() returns the correct order
722         for i, hash in enumerate(self.hashes):
723             roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash))
724             got_order = [
725                 re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
726                 for root in roots]
727             self.assertEqual(self.expected_order[i], got_order)
728
729     def test_get_probe_order_against_reference_set(self):
730         self._test_probe_order_against_reference_set(
731             lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
732
733     def test_head_probe_order_against_reference_set(self):
734         self._test_probe_order_against_reference_set(
735             lambda i: self.keep_client.head(self.hashes[i], num_retries=1))
736
737     def test_put_probe_order_against_reference_set(self):
738         # copies=1 prevents the test from being sensitive to races
739         # between writer threads.
740         self._test_probe_order_against_reference_set(
741             lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1))
742
743     def _test_probe_order_against_reference_set(self, op):
744         for i in range(len(self.blocks)):
745             with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \
746                  self.assertRaises(arvados.errors.KeepRequestError):
747                 op(i)
748             got_order = [
749                 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
750                 for resp in mock.responses]
751             self.assertEqual(self.expected_order[i]*2, got_order)
752
753     def test_put_probe_order_multiple_copies(self):
754         for copies in range(2, 4):
755             for i in range(len(self.blocks)):
756                 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \
757                      self.assertRaises(arvados.errors.KeepWriteError):
758                     self.keep_client.put(self.blocks[i], num_retries=2, copies=copies)
759                 got_order = [
760                     re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
761                     for resp in mock.responses]
762                 # With T threads racing to make requests, the position
763                 # of a given server in the sequence of HTTP requests
764                 # (got_order) cannot be more than T-1 positions
765                 # earlier than that server's position in the reference
766                 # probe sequence (expected_order).
767                 #
768                 # Loop invariant: we have accounted for +pos+ expected
769                 # probes, either by seeing them in +got_order+ or by
770                 # putting them in +pending+ in the hope of seeing them
771                 # later. As long as +len(pending)<T+, we haven't
772                 # started a request too early.
773                 pending = []
774                 for pos, expected in enumerate(self.expected_order[i]*3):
775                     got = got_order[pos-len(pending)]
776                     while got in pending:
777                         del pending[pending.index(got)]
778                         got = got_order[pos-len(pending)]
779                     if got != expected:
780                         pending.append(expected)
781                         self.assertLess(
782                             len(pending), copies,
783                             "pending={}, with copies={}, got {}, expected {}".format(
784                                 pending, copies, repr(got_order), repr(self.expected_order[i]*3)))
785
786     def test_probe_waste_adding_one_server(self):
787         hashes = [
788             hashlib.md5("{:064x}".format(x).encode()).hexdigest() for x in range(100)]
789         initial_services = 12
790         self.api_client = self.mock_keep_services(count=initial_services)
791         self.keep_client = arvados.KeepClient(api_client=self.api_client)
792         probes_before = [
793             self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
794         for added_services in range(1, 12):
795             api_client = self.mock_keep_services(count=initial_services+added_services)
796             keep_client = arvados.KeepClient(api_client=api_client)
797             total_penalty = 0
798             for hash_index in range(len(hashes)):
799                 probe_after = keep_client.weighted_service_roots(
800                     arvados.KeepLocator(hashes[hash_index]))
801                 penalty = probe_after.index(probes_before[hash_index][0])
802                 self.assertLessEqual(penalty, added_services)
803                 total_penalty += penalty
804             # Average penalty per block should not exceed
805             # N(added)/N(orig) by more than 20%, and should get closer
806             # to the ideal as we add data points.
807             expect_penalty = (
808                 added_services *
809                 len(hashes) / initial_services)
810             max_penalty = (
811                 expect_penalty *
812                 (120 - added_services)/100)
813             min_penalty = (
814                 expect_penalty * 8/10)
815             self.assertTrue(
816                 min_penalty <= total_penalty <= max_penalty,
817                 "With {}+{} services, {} blocks, penalty {} but expected {}..{}".format(
818                     initial_services,
819                     added_services,
820                     len(hashes),
821                     total_penalty,
822                     min_penalty,
823                     max_penalty))
824
825     def check_64_zeros_error_order(self, verb, exc_class):
826         data = b'0' * 64
827         if verb == 'get':
828             data = tutil.str_keep_locator(data)
829         # Arbitrary port number:
830         aport = random.randint(1024,65535)
831         api_client = self.mock_keep_services(service_port=aport, count=self.services)
832         keep_client = arvados.KeepClient(api_client=api_client)
833         with mock.patch('pycurl.Curl') as curl_mock, \
834              self.assertRaises(exc_class) as err_check:
835             curl_mock.return_value = tutil.FakeCurl.make(code=500, body=b'')
836             getattr(keep_client, verb)(data)
837         urls = [urllib.parse.urlparse(url)
838                 for url in err_check.exception.request_errors()]
839         self.assertEqual([('keep0x' + c, aport) for c in '3eab2d5fc9681074'],
840                          [(url.hostname, url.port) for url in urls])
841
842     def test_get_error_shows_probe_order(self):
843         self.check_64_zeros_error_order('get', arvados.errors.KeepReadError)
844
845     def test_put_error_shows_probe_order(self):
846         self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
847
848
849 class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase):
850     # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
851     # 1s worth of data and then trigger bandwidth errors before running
852     # out of data.
853     DATA = b'x'*2**11
854     BANDWIDTH_LOW_LIM = 1024
855     TIMEOUT_TIME = 1.0
856
857     class assertTakesBetween(unittest.TestCase):
858         def __init__(self, tmin, tmax):
859             self.tmin = tmin
860             self.tmax = tmax
861
862         def __enter__(self):
863             self.t0 = time.time()
864
865         def __exit__(self, *args, **kwargs):
866             # Round times to milliseconds, like CURL. Otherwise, we
867             # fail when CURL reaches a 1s timeout at 0.9998s.
868             delta = round(time.time() - self.t0, 3)
869             self.assertGreaterEqual(delta, self.tmin)
870             self.assertLessEqual(delta, self.tmax)
871
872     class assertTakesGreater(unittest.TestCase):
873         def __init__(self, tmin):
874             self.tmin = tmin
875
876         def __enter__(self):
877             self.t0 = time.time()
878
879         def __exit__(self, *args, **kwargs):
880             delta = round(time.time() - self.t0, 3)
881             self.assertGreaterEqual(delta, self.tmin)
882
883     def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
884         return arvados.KeepClient(
885             api_client=self.api_client,
886             timeout=timeouts)
887
888     def test_timeout_slow_connect(self):
889         # Can't simulate TCP delays with our own socket. Leave our
890         # stub server running uselessly, and try to connect to an
891         # unroutable IP address instead.
892         self.api_client = self.mock_keep_services(
893             count=1,
894             service_host='240.0.0.0',
895         )
896         with self.assertTakesBetween(0.1, 0.5):
897             with self.assertRaises(arvados.errors.KeepWriteError):
898                 self.keepClient().put(self.DATA, copies=1, num_retries=0)
899
900     def test_low_bandwidth_no_delays_success(self):
901         self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
902         kc = self.keepClient()
903         loc = kc.put(self.DATA, copies=1, num_retries=0)
904         self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
905
906     def test_too_low_bandwidth_no_delays_failure(self):
907         # Check that lessening bandwidth corresponds to failing
908         kc = self.keepClient()
909         loc = kc.put(self.DATA, copies=1, num_retries=0)
910         self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
911         with self.assertTakesGreater(self.TIMEOUT_TIME):
912             with self.assertRaises(arvados.errors.KeepReadError):
913                 kc.get(loc, num_retries=0)
914         with self.assertTakesGreater(self.TIMEOUT_TIME):
915             with self.assertRaises(arvados.errors.KeepWriteError):
916                 kc.put(self.DATA, copies=1, num_retries=0)
917
918     def test_low_bandwidth_with_server_response_delay_failure(self):
919         kc = self.keepClient()
920         loc = kc.put(self.DATA, copies=1, num_retries=0)
921         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
922         # Note the actual delay must be 1s longer than the low speed
923         # limit interval in order for curl to detect it reliably.
924         self.server.setdelays(response=self.TIMEOUT_TIME+1)
925         with self.assertTakesGreater(self.TIMEOUT_TIME):
926             with self.assertRaises(arvados.errors.KeepReadError):
927                 kc.get(loc, num_retries=0)
928         with self.assertTakesGreater(self.TIMEOUT_TIME):
929             with self.assertRaises(arvados.errors.KeepWriteError):
930                 kc.put(self.DATA, copies=1, num_retries=0)
931         with self.assertTakesGreater(self.TIMEOUT_TIME):
932             kc.head(loc, num_retries=0)
933
934     def test_low_bandwidth_with_server_mid_delay_failure(self):
935         kc = self.keepClient()
936         loc = kc.put(self.DATA, copies=1, num_retries=0)
937         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
938         # Note the actual delay must be 1s longer than the low speed
939         # limit interval in order for curl to detect it reliably.
940         self.server.setdelays(mid_write=self.TIMEOUT_TIME+1, mid_read=self.TIMEOUT_TIME+1)
941         with self.assertTakesGreater(self.TIMEOUT_TIME):
942             with self.assertRaises(arvados.errors.KeepReadError) as e:
943                 kc.get(loc, num_retries=0)
944         with self.assertTakesGreater(self.TIMEOUT_TIME):
945             with self.assertRaises(arvados.errors.KeepWriteError):
946                 kc.put(self.DATA, copies=1, num_retries=0)
947
948     def test_timeout_slow_request(self):
949         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
950         self.server.setdelays(request=.2)
951         self._test_connect_timeout_under_200ms(loc)
952         self.server.setdelays(request=2)
953         self._test_response_timeout_under_2s(loc)
954
955     def test_timeout_slow_response(self):
956         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
957         self.server.setdelays(response=.2)
958         self._test_connect_timeout_under_200ms(loc)
959         self.server.setdelays(response=2)
960         self._test_response_timeout_under_2s(loc)
961
962     def test_timeout_slow_response_body(self):
963         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
964         self.server.setdelays(response_body=.2)
965         self._test_connect_timeout_under_200ms(loc)
966         self.server.setdelays(response_body=2)
967         self._test_response_timeout_under_2s(loc)
968
969     def _test_connect_timeout_under_200ms(self, loc):
970         # Allow 100ms to connect, then 1s for response. Everything
971         # should work, and everything should take at least 200ms to
972         # return.
973         kc = self.keepClient(timeouts=(.1, 1))
974         with self.assertTakesBetween(.2, .3):
975             kc.put(self.DATA, copies=1, num_retries=0)
976         with self.assertTakesBetween(.2, .3):
977             self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
978
979     def _test_response_timeout_under_2s(self, loc):
980         # Allow 10s to connect, then 1s for response. Nothing should
981         # work, and everything should take at least 1s to return.
982         kc = self.keepClient(timeouts=(10, 1))
983         with self.assertTakesBetween(1, 9):
984             with self.assertRaises(arvados.errors.KeepReadError):
985                 kc.get(loc, num_retries=0)
986         with self.assertTakesBetween(1, 9):
987             with self.assertRaises(arvados.errors.KeepWriteError):
988                 kc.put(self.DATA, copies=1, num_retries=0)
989
990
991 class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
992     def mock_disks_and_gateways(self, disks=3, gateways=1):
993         self.gateways = [{
994                 'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
995                 'owner_uuid': 'zzzzz-tpzed-000000000000000',
996                 'service_host': 'gatewayhost{}'.format(i),
997                 'service_port': 12345,
998                 'service_ssl_flag': True,
999                 'service_type': 'gateway:test',
1000         } for i in range(gateways)]
1001         self.gateway_roots = [
1002             "https://{service_host}:{service_port}/".format(**gw)
1003             for gw in self.gateways]
1004         self.api_client = self.mock_keep_services(
1005             count=disks, additional_services=self.gateways)
1006         self.keepClient = arvados.KeepClient(api_client=self.api_client)
1007
1008     @mock.patch('pycurl.Curl')
1009     def test_get_with_gateway_hint_first(self, MockCurl):
1010         MockCurl.return_value = tutil.FakeCurl.make(
1011             code=200, body='foo', headers={'Content-Length': 3})
1012         self.mock_disks_and_gateways()
1013         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@' + self.gateways[0]['uuid']
1014         self.assertEqual(b'foo', self.keepClient.get(locator))
1015         self.assertEqual(self.gateway_roots[0]+locator,
1016                          MockCurl.return_value.getopt(pycurl.URL).decode())
1017         self.assertEqual(True, self.keepClient.head(locator))
1018
1019     @mock.patch('pycurl.Curl')
1020     def test_get_with_gateway_hints_in_order(self, MockCurl):
1021         gateways = 4
1022         disks = 3
1023         mocks = [
1024             tutil.FakeCurl.make(code=404, body='')
1025             for _ in range(gateways+disks)
1026         ]
1027         MockCurl.side_effect = tutil.queue_with(mocks)
1028         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1029         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1030                            ['K@'+gw['uuid'] for gw in self.gateways])
1031         with self.assertRaises(arvados.errors.NotFoundError):
1032             self.keepClient.get(locator)
1033         # Gateways are tried first, in the order given.
1034         for i, root in enumerate(self.gateway_roots):
1035             self.assertEqual(root+locator,
1036                              mocks[i].getopt(pycurl.URL).decode())
1037         # Disk services are tried next.
1038         for i in range(gateways, gateways+disks):
1039             self.assertRegex(
1040                 mocks[i].getopt(pycurl.URL).decode(),
1041                 r'keep0x')
1042
1043     @mock.patch('pycurl.Curl')
1044     def test_head_with_gateway_hints_in_order(self, MockCurl):
1045         gateways = 4
1046         disks = 3
1047         mocks = [
1048             tutil.FakeCurl.make(code=404, body=b'')
1049             for _ in range(gateways+disks)
1050         ]
1051         MockCurl.side_effect = tutil.queue_with(mocks)
1052         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1053         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1054                            ['K@'+gw['uuid'] for gw in self.gateways])
1055         with self.assertRaises(arvados.errors.NotFoundError):
1056             self.keepClient.head(locator)
1057         # Gateways are tried first, in the order given.
1058         for i, root in enumerate(self.gateway_roots):
1059             self.assertEqual(root+locator,
1060                              mocks[i].getopt(pycurl.URL).decode())
1061         # Disk services are tried next.
1062         for i in range(gateways, gateways+disks):
1063             self.assertRegex(
1064                 mocks[i].getopt(pycurl.URL).decode(),
1065                 r'keep0x')
1066
1067     @mock.patch('pycurl.Curl')
1068     def test_get_with_remote_proxy_hint(self, MockCurl):
1069         MockCurl.return_value = tutil.FakeCurl.make(
1070             code=200, body=b'foo', headers={'Content-Length': 3})
1071         self.mock_disks_and_gateways()
1072         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1073         self.assertEqual(b'foo', self.keepClient.get(locator))
1074         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1075                          MockCurl.return_value.getopt(pycurl.URL).decode())
1076
1077     @mock.patch('pycurl.Curl')
1078     def test_head_with_remote_proxy_hint(self, MockCurl):
1079         MockCurl.return_value = tutil.FakeCurl.make(
1080             code=200, body=b'foo', headers={'Content-Length': 3})
1081         self.mock_disks_and_gateways()
1082         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1083         self.assertEqual(True, self.keepClient.head(locator))
1084         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1085                          MockCurl.return_value.getopt(pycurl.URL).decode())
1086
1087
1088 class KeepClientRetryTestMixin(object):
1089     # Testing with a local Keep store won't exercise the retry behavior.
1090     # Instead, our strategy is:
1091     # * Create a client with one proxy specified (pointed at a black
1092     #   hole), so there's no need to instantiate an API client, and
1093     #   all HTTP requests come from one place.
1094     # * Mock httplib's request method to provide simulated responses.
1095     # This lets us test the retry logic extensively without relying on any
1096     # supporting servers, and prevents side effects in case something hiccups.
1097     # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
1098     # run_method().
1099     #
1100     # Test classes must define TEST_PATCHER to a method that mocks
1101     # out appropriate methods in the client.
1102
1103     PROXY_ADDR = 'http://[%s]:65535/' % (tutil.TEST_HOST,)
1104     TEST_DATA = b'testdata'
1105     TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
1106
1107     def setUp(self):
1108         self.client_kwargs = {'proxy': self.PROXY_ADDR, 'local_store': ''}
1109
1110     def new_client(self, **caller_kwargs):
1111         kwargs = self.client_kwargs.copy()
1112         kwargs.update(caller_kwargs)
1113         return arvados.KeepClient(**kwargs)
1114
1115     def run_method(self, *args, **kwargs):
1116         raise NotImplementedError("test subclasses must define run_method")
1117
1118     def check_success(self, expected=None, *args, **kwargs):
1119         if expected is None:
1120             expected = self.DEFAULT_EXPECT
1121         self.assertEqual(expected, self.run_method(*args, **kwargs))
1122
1123     def check_exception(self, error_class=None, *args, **kwargs):
1124         if error_class is None:
1125             error_class = self.DEFAULT_EXCEPTION
1126         with self.assertRaises(error_class) as err:
1127             self.run_method(*args, **kwargs)
1128         return err
1129
1130     def test_immediate_success(self):
1131         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 200):
1132             self.check_success()
1133
1134     def test_retry_then_success(self):
1135         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1136             self.check_success(num_retries=3)
1137
1138     def test_exception_then_success(self):
1139         with self.TEST_PATCHER(self.DEFAULT_EXPECT, Exception('mock err'), 200):
1140             self.check_success(num_retries=3)
1141
1142     def test_no_default_retry(self):
1143         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1144             self.check_exception()
1145
1146     def test_no_retry_after_permanent_error(self):
1147         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 403, 200):
1148             self.check_exception(num_retries=3)
1149
1150     def test_error_after_retries_exhausted(self):
1151         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 500, 200):
1152             err = self.check_exception(num_retries=1)
1153         self.assertRegex(str(err.exception), r'failed to .* after 2 attempts')
1154
1155     def test_num_retries_instance_fallback(self):
1156         self.client_kwargs['num_retries'] = 3
1157         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1158             self.check_success()
1159
1160
1161 @tutil.skip_sleep
1162 class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1163     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
1164     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1165     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1166     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1167
1168     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1169                    *args, **kwargs):
1170         return self.new_client().get(locator, *args, **kwargs)
1171
1172     def test_specific_exception_when_not_found(self):
1173         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1174             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1175
1176     def test_general_exception_with_mixed_errors(self):
1177         # get should raise a NotFoundError if no server returns the block,
1178         # and a high threshold of servers report that it's not found.
1179         # This test rigs up 50/50 disagreement between two servers, and
1180         # checks that it does not become a NotFoundError.
1181         client = self.new_client()
1182         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1183             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1184                 client.get(self.HINTED_LOCATOR)
1185             self.assertNotIsInstance(
1186                 exc_check.exception, arvados.errors.NotFoundError,
1187                 "mixed errors raised NotFoundError")
1188
1189     def test_hint_server_can_succeed_without_retries(self):
1190         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1191             self.check_success(locator=self.HINTED_LOCATOR)
1192
1193     def test_try_next_server_after_timeout(self):
1194         with tutil.mock_keep_responses(
1195                 (socket.timeout("timed out"), 200),
1196                 (self.DEFAULT_EXPECT, 200)):
1197             self.check_success(locator=self.HINTED_LOCATOR)
1198
1199     def test_retry_data_with_wrong_checksum(self):
1200         with tutil.mock_keep_responses(
1201                 ('baddata', 200),
1202                 (self.DEFAULT_EXPECT, 200)):
1203             self.check_success(locator=self.HINTED_LOCATOR)
1204
1205 @tutil.skip_sleep
1206 class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1207     DEFAULT_EXPECT = True
1208     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1209     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1210     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1211
1212     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1213                    *args, **kwargs):
1214         return self.new_client().head(locator, *args, **kwargs)
1215
1216     def test_specific_exception_when_not_found(self):
1217         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1218             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1219
1220     def test_general_exception_with_mixed_errors(self):
1221         # head should raise a NotFoundError if no server returns the block,
1222         # and a high threshold of servers report that it's not found.
1223         # This test rigs up 50/50 disagreement between two servers, and
1224         # checks that it does not become a NotFoundError.
1225         client = self.new_client()
1226         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1227             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1228                 client.head(self.HINTED_LOCATOR)
1229             self.assertNotIsInstance(
1230                 exc_check.exception, arvados.errors.NotFoundError,
1231                 "mixed errors raised NotFoundError")
1232
1233     def test_hint_server_can_succeed_without_retries(self):
1234         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1235             self.check_success(locator=self.HINTED_LOCATOR)
1236
1237     def test_try_next_server_after_timeout(self):
1238         with tutil.mock_keep_responses(
1239                 (socket.timeout("timed out"), 200),
1240                 (self.DEFAULT_EXPECT, 200)):
1241             self.check_success(locator=self.HINTED_LOCATOR)
1242
1243 @tutil.skip_sleep
1244 class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1245     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
1246     DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
1247     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1248
1249     def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
1250                    copies=1, *args, **kwargs):
1251         return self.new_client().put(data, copies, *args, **kwargs)
1252
1253     def test_do_not_send_multiple_copies_to_same_server(self):
1254         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200):
1255             self.check_exception(copies=2, num_retries=3)
1256
1257
1258 class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
1259
1260     class FakeKeepService(object):
1261         def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
1262             self.delay = delay
1263             self.will_succeed = will_succeed
1264             self.will_raise = will_raise
1265             self._result = {}
1266             self._result['headers'] = {}
1267             self._result['headers']['x-keep-replicas-stored'] = str(replicas)
1268             self._result['headers']['x-keep-storage-classes-confirmed'] = 'default={}'.format(replicas)
1269             self._result['body'] = 'foobar'
1270
1271         def put(self, data_hash, data, timeout, headers):
1272             time.sleep(self.delay)
1273             if self.will_raise is not None:
1274                 raise self.will_raise
1275             return self.will_succeed
1276
1277         def last_result(self):
1278             if self.will_succeed:
1279                 return self._result
1280
1281         def finished(self):
1282             return False
1283
1284     def setUp(self):
1285         self.copies = 3
1286         self.pool = arvados.KeepClient.KeepWriterThreadPool(
1287             data = 'foo',
1288             data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1289             max_service_replicas = self.copies,
1290             copies = self.copies
1291         )
1292
1293     def test_only_write_enough_on_success(self):
1294         for i in range(10):
1295             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1296             self.pool.add_task(ks, None)
1297         self.pool.join()
1298         self.assertEqual(self.pool.done(), (self.copies, []))
1299
1300     def test_only_write_enough_on_partial_success(self):
1301         for i in range(5):
1302             ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
1303             self.pool.add_task(ks, None)
1304             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1305             self.pool.add_task(ks, None)
1306         self.pool.join()
1307         self.assertEqual(self.pool.done(), (self.copies, []))
1308
1309     def test_only_write_enough_when_some_crash(self):
1310         for i in range(5):
1311             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1312             self.pool.add_task(ks, None)
1313             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1314             self.pool.add_task(ks, None)
1315         self.pool.join()
1316         self.assertEqual(self.pool.done(), (self.copies, []))
1317
1318     def test_fail_when_too_many_crash(self):
1319         for i in range(self.copies+1):
1320             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1321             self.pool.add_task(ks, None)
1322         for i in range(self.copies-1):
1323             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1324             self.pool.add_task(ks, None)
1325         self.pool.join()
1326         self.assertEqual(self.pool.done(), (self.copies-1, []))
1327
1328
1329 @tutil.skip_sleep
1330 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock):
1331     # Test put()s that need two distinct servers to succeed, possibly
1332     # requiring multiple passes through the retry loop.
1333
1334     def setUp(self):
1335         self.api_client = self.mock_keep_services(count=2)
1336         self.keep_client = arvados.KeepClient(api_client=self.api_client)
1337
1338     def test_success_after_exception(self):
1339         with tutil.mock_keep_responses(
1340                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1341                 Exception('mock err'), 200, 200) as req_mock:
1342             self.keep_client.put('foo', num_retries=1, copies=2)
1343         self.assertEqual(3, req_mock.call_count)
1344
1345     def test_success_after_retryable_error(self):
1346         with tutil.mock_keep_responses(
1347                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1348                 500, 200, 200) as req_mock:
1349             self.keep_client.put('foo', num_retries=1, copies=2)
1350         self.assertEqual(3, req_mock.call_count)
1351
1352     def test_fail_after_final_error(self):
1353         # First retry loop gets a 200 (can't achieve replication by
1354         # storing again on that server) and a 400 (can't retry that
1355         # server at all), so we shouldn't try a third request.
1356         with tutil.mock_keep_responses(
1357                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1358                 200, 400, 200) as req_mock:
1359             with self.assertRaises(arvados.errors.KeepWriteError):
1360                 self.keep_client.put('foo', num_retries=1, copies=2)
1361         self.assertEqual(2, req_mock.call_count)
1362
1363 class KeepClientAPIErrorTest(unittest.TestCase):
1364     def test_api_fail(self):
1365         class ApiMock(object):
1366             def __getattr__(self, r):
1367                 if r == "api_token":
1368                     return "abc"
1369                 elif r == "insecure":
1370                     return False
1371                 else:
1372                     raise arvados.errors.KeepReadError()
1373         keep_client = arvados.KeepClient(api_client=ApiMock(),
1374                                              proxy='', local_store='')
1375
1376         # The bug this is testing for is that if an API (not
1377         # keepstore) exception is thrown as part of a get(), the next
1378         # attempt to get that same block will result in a deadlock.
1379         # This is why there are two get()s in a row.  Unfortunately,
1380         # the failure mode for this test is that the test suite
1381         # deadlocks, there isn't a good way to avoid that without
1382         # adding a special case that has no use except for this test.
1383
1384         with self.assertRaises(arvados.errors.KeepReadError):
1385             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1386         with self.assertRaises(arvados.errors.KeepReadError):
1387             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")