17344: Skip test that relies on Nginx not flagging external clients.
[arvados.git] / sdk / python / tests / test_keep_client.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 standard_library.install_aliases()
9 from builtins import str
10 from builtins import range
11 from builtins import object
12 import hashlib
13 import mock
14 import os
15 import pycurl
16 import random
17 import re
18 import socket
19 import sys
20 import time
21 import unittest
22 import urllib.parse
23
24 import arvados
25 import arvados.retry
26 import arvados.util
27 from . import arvados_testutil as tutil
28 from . import keepstub
29 from . import run_test_server
30
31 class KeepTestCase(run_test_server.TestCaseWithServers):
32     MAIN_SERVER = {}
33     KEEP_SERVER = {}
34
35     @classmethod
36     def setUpClass(cls):
37         super(KeepTestCase, cls).setUpClass()
38         run_test_server.authorize_with("admin")
39         cls.api_client = arvados.api('v1')
40         cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
41                                              proxy='', local_store='')
42
43     def test_KeepBasicRWTest(self):
44         self.assertEqual(0, self.keep_client.upload_counter.get())
45         foo_locator = self.keep_client.put('foo')
46         self.assertRegex(
47             foo_locator,
48             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
49             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
50
51         # 6 bytes because uploaded 2 copies
52         self.assertEqual(6, self.keep_client.upload_counter.get())
53
54         self.assertEqual(0, self.keep_client.download_counter.get())
55         self.assertEqual(self.keep_client.get(foo_locator),
56                          b'foo',
57                          'wrong content from Keep.get(md5("foo"))')
58         self.assertEqual(3, self.keep_client.download_counter.get())
59
60     def test_KeepBinaryRWTest(self):
61         blob_str = b'\xff\xfe\xf7\x00\x01\x02'
62         blob_locator = self.keep_client.put(blob_str)
63         self.assertRegex(
64             blob_locator,
65             '^7fc7c53b45e53926ba52821140fef396\+6',
66             ('wrong locator from Keep.put(<binarydata>):' + blob_locator))
67         self.assertEqual(self.keep_client.get(blob_locator),
68                          blob_str,
69                          'wrong content from Keep.get(md5(<binarydata>))')
70
71     def test_KeepLongBinaryRWTest(self):
72         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
73         for i in range(0, 23):
74             blob_data = blob_data + blob_data
75         blob_locator = self.keep_client.put(blob_data)
76         self.assertRegex(
77             blob_locator,
78             '^84d90fc0d8175dd5dcfab04b999bc956\+67108864',
79             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
80         self.assertEqual(self.keep_client.get(blob_locator),
81                          blob_data,
82                          'wrong content from Keep.get(md5(<binarydata>))')
83
84     @unittest.skip("unreliable test - please fix and close #8752")
85     def test_KeepSingleCopyRWTest(self):
86         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
87         blob_locator = self.keep_client.put(blob_data, copies=1)
88         self.assertRegex(
89             blob_locator,
90             '^c902006bc98a3eb4a3663b65ab4a6fab\+8',
91             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
92         self.assertEqual(self.keep_client.get(blob_locator),
93                          blob_data,
94                          'wrong content from Keep.get(md5(<binarydata>))')
95
96     def test_KeepEmptyCollectionTest(self):
97         blob_locator = self.keep_client.put('', copies=1)
98         self.assertRegex(
99             blob_locator,
100             '^d41d8cd98f00b204e9800998ecf8427e\+0',
101             ('wrong locator from Keep.put(""): ' + blob_locator))
102
103     def test_unicode_must_be_ascii(self):
104         # If unicode type, must only consist of valid ASCII
105         foo_locator = self.keep_client.put(u'foo')
106         self.assertRegex(
107             foo_locator,
108             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
109             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
110
111         if sys.version_info < (3, 0):
112             with self.assertRaises(UnicodeEncodeError):
113                 # Error if it is not ASCII
114                 self.keep_client.put(u'\xe2')
115
116         with self.assertRaises(AttributeError):
117             # Must be bytes or have an encode() method
118             self.keep_client.put({})
119
120     def test_KeepHeadTest(self):
121         locator = self.keep_client.put('test_head')
122         self.assertRegex(
123             locator,
124             '^b9a772c7049325feb7130fff1f8333e9\+9',
125             'wrong md5 hash from Keep.put for "test_head": ' + locator)
126         self.assertEqual(True, self.keep_client.head(locator))
127         self.assertEqual(self.keep_client.get(locator),
128                          b'test_head',
129                          'wrong content from Keep.get for "test_head"')
130
131 class KeepPermissionTestCase(run_test_server.TestCaseWithServers):
132     MAIN_SERVER = {}
133     KEEP_SERVER = {'blob_signing': True}
134
135     def test_KeepBasicRWTest(self):
136         run_test_server.authorize_with('active')
137         keep_client = arvados.KeepClient()
138         foo_locator = keep_client.put('foo')
139         self.assertRegex(
140             foo_locator,
141             r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
142             'invalid locator from Keep.put("foo"): ' + foo_locator)
143         self.assertEqual(keep_client.get(foo_locator),
144                          b'foo',
145                          'wrong content from Keep.get(md5("foo"))')
146
147         # GET with an unsigned locator => NotFound
148         bar_locator = keep_client.put('bar')
149         unsigned_bar_locator = "37b51d194a7513e45b56f6524f2d51f2+3"
150         self.assertRegex(
151             bar_locator,
152             r'^37b51d194a7513e45b56f6524f2d51f2\+3\+A[a-f0-9]+@[a-f0-9]+$',
153             'invalid locator from Keep.put("bar"): ' + bar_locator)
154         self.assertRaises(arvados.errors.NotFoundError,
155                           keep_client.get,
156                           unsigned_bar_locator)
157
158         # GET from a different user => NotFound
159         run_test_server.authorize_with('spectator')
160         self.assertRaises(arvados.errors.NotFoundError,
161                           arvados.Keep.get,
162                           bar_locator)
163
164         # Unauthenticated GET for a signed locator => NotFound
165         # Unauthenticated GET for an unsigned locator => NotFound
166         keep_client.api_token = ''
167         self.assertRaises(arvados.errors.NotFoundError,
168                           keep_client.get,
169                           bar_locator)
170         self.assertRaises(arvados.errors.NotFoundError,
171                           keep_client.get,
172                           unsigned_bar_locator)
173
174
175 class KeepProxyTestCase(run_test_server.TestCaseWithServers):
176     MAIN_SERVER = {}
177     KEEP_SERVER = {}
178     KEEP_PROXY_SERVER = {}
179
180     @classmethod
181     def setUpClass(cls):
182         super(KeepProxyTestCase, cls).setUpClass()
183         run_test_server.authorize_with('active')
184         cls.api_client = arvados.api('v1')
185
186     def tearDown(self):
187         arvados.config.settings().pop('ARVADOS_EXTERNAL_CLIENT', None)
188         super(KeepProxyTestCase, self).tearDown()
189
190     def test_KeepProxyTest1(self):
191         # Will use ARVADOS_KEEP_SERVICES environment variable that
192         # is set by setUpClass().
193         keep_client = arvados.KeepClient(api_client=self.api_client,
194                                          local_store='')
195         baz_locator = keep_client.put('baz')
196         self.assertRegex(
197             baz_locator,
198             '^73feffa4b7f6bb68e44cf984c85f6e88\+3',
199             'wrong md5 hash from Keep.put("baz"): ' + baz_locator)
200         self.assertEqual(keep_client.get(baz_locator),
201                          b'baz',
202                          'wrong content from Keep.get(md5("baz"))')
203         self.assertTrue(keep_client.using_proxy)
204
205     @unittest.skip("https://dev.arvados.org/issues/17344#note-23")
206     def test_KeepProxyTest2(self):
207         # Don't instantiate the proxy directly, but set the X-External-Client
208         # header.  The API server should direct us to the proxy.
209         arvados.config.settings()['ARVADOS_EXTERNAL_CLIENT'] = 'true'
210         keep_client = arvados.KeepClient(api_client=self.api_client,
211                                          proxy='', local_store='')
212         baz_locator = keep_client.put('baz2')
213         self.assertRegex(
214             baz_locator,
215             '^91f372a266fe2bf2823cb8ec7fda31ce\+4',
216             'wrong md5 hash from Keep.put("baz2"): ' + baz_locator)
217         self.assertEqual(keep_client.get(baz_locator),
218                          b'baz2',
219                          'wrong content from Keep.get(md5("baz2"))')
220         self.assertTrue(keep_client.using_proxy)
221
222     def test_KeepProxyTestMultipleURIs(self):
223         # Test using ARVADOS_KEEP_SERVICES env var overriding any
224         # existing proxy setting and setting multiple proxies
225         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
226         keep_client = arvados.KeepClient(api_client=self.api_client,
227                                          local_store='')
228         uris = [x['_service_root'] for x in keep_client._keep_services]
229         self.assertEqual(uris, ['http://10.0.0.1/',
230                                 'https://foo.example.org:1234/'])
231
232     def test_KeepProxyTestInvalidURI(self):
233         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org'
234         with self.assertRaises(arvados.errors.ArgumentError):
235             keep_client = arvados.KeepClient(api_client=self.api_client,
236                                              local_store='')
237
238
239 class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
240     def get_service_roots(self, api_client):
241         keep_client = arvados.KeepClient(api_client=api_client)
242         services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
243         return [urllib.parse.urlparse(url) for url in sorted(services)]
244
245     def test_ssl_flag_respected_in_roots(self):
246         for ssl_flag in [False, True]:
247             services = self.get_service_roots(self.mock_keep_services(
248                 service_ssl_flag=ssl_flag))
249             self.assertEqual(
250                 ('https' if ssl_flag else 'http'), services[0].scheme)
251
252     def test_correct_ports_with_ipv6_addresses(self):
253         service = self.get_service_roots(self.mock_keep_services(
254             service_type='proxy', service_host='100::1', service_port=10, count=1))[0]
255         self.assertEqual('100::1', service.hostname)
256         self.assertEqual(10, service.port)
257
258     def test_insecure_disables_tls_verify(self):
259         api_client = self.mock_keep_services(count=1)
260         force_timeout = socket.timeout("timed out")
261
262         api_client.insecure = True
263         with tutil.mock_keep_responses(b'foo', 200) as mock:
264             keep_client = arvados.KeepClient(api_client=api_client)
265             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
266             self.assertEqual(
267                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
268                 0)
269             self.assertEqual(
270                 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
271                 0)
272
273         api_client.insecure = False
274         with tutil.mock_keep_responses(b'foo', 200) as mock:
275             keep_client = arvados.KeepClient(api_client=api_client)
276             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
277             # getopt()==None here means we didn't change the
278             # default. If we were using real pycurl instead of a mock,
279             # it would return the default value 1.
280             self.assertEqual(
281                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
282                 None)
283             self.assertEqual(
284                 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
285                 None)
286
287     def test_refresh_signature(self):
288         blk_digest = '6f5902ac237024bdd0c176cb93063dc4+11'
289         blk_sig = 'da39a3ee5e6b4b0d3255bfef95601890afd80709@53bed294'
290         local_loc = blk_digest+'+A'+blk_sig
291         remote_loc = blk_digest+'+R'+blk_sig
292         api_client = self.mock_keep_services(count=1)
293         headers = {'X-Keep-Locator':local_loc}
294         with tutil.mock_keep_responses('', 200, **headers):
295             # Check that the translated locator gets returned
296             keep_client = arvados.KeepClient(api_client=api_client)
297             self.assertEqual(local_loc, keep_client.refresh_signature(remote_loc))
298             # Check that refresh_signature() uses the correct method and headers
299             keep_client._get_or_head = mock.MagicMock()
300             keep_client.refresh_signature(remote_loc)
301             args, kwargs = keep_client._get_or_head.call_args_list[0]
302             self.assertIn(remote_loc, args)
303             self.assertEqual("HEAD", kwargs['method'])
304             self.assertIn('X-Keep-Signature', kwargs['headers'])
305
306     # test_*_timeout verify that KeepClient instructs pycurl to use
307     # the appropriate connection and read timeouts. They don't care
308     # whether pycurl actually exhibits the expected timeout behavior
309     # -- those tests are in the KeepClientTimeout test class.
310
311     def test_get_timeout(self):
312         api_client = self.mock_keep_services(count=1)
313         force_timeout = socket.timeout("timed out")
314         with tutil.mock_keep_responses(force_timeout, 0) as mock:
315             keep_client = arvados.KeepClient(api_client=api_client)
316             with self.assertRaises(arvados.errors.KeepReadError):
317                 keep_client.get('ffffffffffffffffffffffffffffffff')
318             self.assertEqual(
319                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
320                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
321             self.assertEqual(
322                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
323                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
324             self.assertEqual(
325                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
326                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
327
328     def test_put_timeout(self):
329         api_client = self.mock_keep_services(count=1)
330         force_timeout = socket.timeout("timed out")
331         with tutil.mock_keep_responses(force_timeout, 0) as mock:
332             keep_client = arvados.KeepClient(api_client=api_client)
333             with self.assertRaises(arvados.errors.KeepWriteError):
334                 keep_client.put(b'foo')
335             self.assertEqual(
336                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
337                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
338             self.assertEqual(
339                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
340                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
341             self.assertEqual(
342                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
343                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
344
345     def test_head_timeout(self):
346         api_client = self.mock_keep_services(count=1)
347         force_timeout = socket.timeout("timed out")
348         with tutil.mock_keep_responses(force_timeout, 0) as mock:
349             keep_client = arvados.KeepClient(api_client=api_client)
350             with self.assertRaises(arvados.errors.KeepReadError):
351                 keep_client.head('ffffffffffffffffffffffffffffffff')
352             self.assertEqual(
353                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
354                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
355             self.assertEqual(
356                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
357                 None)
358             self.assertEqual(
359                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
360                 None)
361
362     def test_proxy_get_timeout(self):
363         api_client = self.mock_keep_services(service_type='proxy', count=1)
364         force_timeout = socket.timeout("timed out")
365         with tutil.mock_keep_responses(force_timeout, 0) as mock:
366             keep_client = arvados.KeepClient(api_client=api_client)
367             with self.assertRaises(arvados.errors.KeepReadError):
368                 keep_client.get('ffffffffffffffffffffffffffffffff')
369             self.assertEqual(
370                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
371                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
372             self.assertEqual(
373                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
374                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
375             self.assertEqual(
376                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
377                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
378
379     def test_proxy_head_timeout(self):
380         api_client = self.mock_keep_services(service_type='proxy', count=1)
381         force_timeout = socket.timeout("timed out")
382         with tutil.mock_keep_responses(force_timeout, 0) as mock:
383             keep_client = arvados.KeepClient(api_client=api_client)
384             with self.assertRaises(arvados.errors.KeepReadError):
385                 keep_client.head('ffffffffffffffffffffffffffffffff')
386             self.assertEqual(
387                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
388                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
389             self.assertEqual(
390                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
391                 None)
392             self.assertEqual(
393                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
394                 None)
395
396     def test_proxy_put_timeout(self):
397         api_client = self.mock_keep_services(service_type='proxy', count=1)
398         force_timeout = socket.timeout("timed out")
399         with tutil.mock_keep_responses(force_timeout, 0) as mock:
400             keep_client = arvados.KeepClient(api_client=api_client)
401             with self.assertRaises(arvados.errors.KeepWriteError):
402                 keep_client.put('foo')
403             self.assertEqual(
404                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
405                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
406             self.assertEqual(
407                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
408                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
409             self.assertEqual(
410                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
411                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
412
413     def check_no_services_error(self, verb, exc_class):
414         api_client = mock.MagicMock(name='api_client')
415         api_client.keep_services().accessible().execute.side_effect = (
416             arvados.errors.ApiError)
417         keep_client = arvados.KeepClient(api_client=api_client)
418         with self.assertRaises(exc_class) as err_check:
419             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
420         self.assertEqual(0, len(err_check.exception.request_errors()))
421
422     def test_get_error_with_no_services(self):
423         self.check_no_services_error('get', arvados.errors.KeepReadError)
424
425     def test_head_error_with_no_services(self):
426         self.check_no_services_error('head', arvados.errors.KeepReadError)
427
428     def test_put_error_with_no_services(self):
429         self.check_no_services_error('put', arvados.errors.KeepWriteError)
430
431     def check_errors_from_last_retry(self, verb, exc_class):
432         api_client = self.mock_keep_services(count=2)
433         req_mock = tutil.mock_keep_responses(
434             "retry error reporting test", 500, 500, 500, 500, 500, 500, 502, 502)
435         with req_mock, tutil.skip_sleep, \
436                 self.assertRaises(exc_class) as err_check:
437             keep_client = arvados.KeepClient(api_client=api_client)
438             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
439                                        num_retries=3)
440         self.assertEqual([502, 502], [
441                 getattr(error, 'status_code', None)
442                 for error in err_check.exception.request_errors().values()])
443         self.assertRegex(str(err_check.exception), r'failed to (read|write) .* after 4 attempts')
444
445     def test_get_error_reflects_last_retry(self):
446         self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
447
448     def test_head_error_reflects_last_retry(self):
449         self.check_errors_from_last_retry('head', arvados.errors.KeepReadError)
450
451     def test_put_error_reflects_last_retry(self):
452         self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
453
454     def test_put_error_does_not_include_successful_puts(self):
455         data = 'partial failure test'
456         data_loc = tutil.str_keep_locator(data)
457         api_client = self.mock_keep_services(count=3)
458         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
459                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
460             keep_client = arvados.KeepClient(api_client=api_client)
461             keep_client.put(data)
462         self.assertEqual(2, len(exc_check.exception.request_errors()))
463
464     def test_proxy_put_with_no_writable_services(self):
465         data = 'test with no writable services'
466         data_loc = tutil.str_keep_locator(data)
467         api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
468         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
469                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
470           keep_client = arvados.KeepClient(api_client=api_client)
471           keep_client.put(data)
472         self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
473         self.assertEqual(0, len(exc_check.exception.request_errors()))
474
475     def test_oddball_service_get(self):
476         body = b'oddball service get'
477         api_client = self.mock_keep_services(service_type='fancynewblobstore')
478         with tutil.mock_keep_responses(body, 200):
479             keep_client = arvados.KeepClient(api_client=api_client)
480             actual = keep_client.get(tutil.str_keep_locator(body))
481         self.assertEqual(body, actual)
482
483     def test_oddball_service_put(self):
484         body = b'oddball service put'
485         pdh = tutil.str_keep_locator(body)
486         api_client = self.mock_keep_services(service_type='fancynewblobstore')
487         with tutil.mock_keep_responses(pdh, 200):
488             keep_client = arvados.KeepClient(api_client=api_client)
489             actual = keep_client.put(body, copies=1)
490         self.assertEqual(pdh, actual)
491
492     def test_oddball_service_writer_count(self):
493         body = b'oddball service writer count'
494         pdh = tutil.str_keep_locator(body)
495         api_client = self.mock_keep_services(service_type='fancynewblobstore',
496                                              count=4)
497         headers = {'x-keep-replicas-stored': 3}
498         with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
499                                        **headers) as req_mock:
500             keep_client = arvados.KeepClient(api_client=api_client)
501             actual = keep_client.put(body, copies=2)
502         self.assertEqual(pdh, actual)
503         self.assertEqual(1, req_mock.call_count)
504
505
506 @tutil.skip_sleep
507 class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
508     def setUp(self):
509         self.api_client = self.mock_keep_services(count=2)
510         self.keep_client = arvados.KeepClient(api_client=self.api_client)
511         self.data = b'xyzzy'
512         self.locator = '1271ed5ef305aadabc605b1609e24c52'
513
514     @mock.patch('arvados.KeepClient.KeepService.get')
515     def test_get_request_cache(self, get_mock):
516         with tutil.mock_keep_responses(self.data, 200, 200):
517             self.keep_client.get(self.locator)
518             self.keep_client.get(self.locator)
519         # Request already cached, don't require more than one request
520         get_mock.assert_called_once()
521
522     @mock.patch('arvados.KeepClient.KeepService.get')
523     def test_head_request_cache(self, get_mock):
524         with tutil.mock_keep_responses(self.data, 200, 200):
525             self.keep_client.head(self.locator)
526             self.keep_client.head(self.locator)
527         # Don't cache HEAD requests so that they're not confused with GET reqs
528         self.assertEqual(2, get_mock.call_count)
529
530     @mock.patch('arvados.KeepClient.KeepService.get')
531     def test_head_and_then_get_return_different_responses(self, get_mock):
532         head_resp = None
533         get_resp = None
534         get_mock.side_effect = ['first response', 'second response']
535         with tutil.mock_keep_responses(self.data, 200, 200):
536             head_resp = self.keep_client.head(self.locator)
537             get_resp = self.keep_client.get(self.locator)
538         self.assertEqual('first response', head_resp)
539         # First reponse was not cached because it was from a HEAD request.
540         self.assertNotEqual(head_resp, get_resp)
541
542 @tutil.skip_sleep
543 class KeepStorageClassesTestCase(unittest.TestCase, tutil.ApiClientMock):
544     def setUp(self):
545         self.api_client = self.mock_keep_services(count=2)
546         self.keep_client = arvados.KeepClient(api_client=self.api_client)
547         self.data = b'xyzzy'
548         self.locator = '1271ed5ef305aadabc605b1609e24c52'
549
550     def test_multiple_default_storage_classes_req_header(self):
551         api_mock = self.api_client_mock()
552         api_mock.config.return_value = {
553             'StorageClasses': {
554                 'foo': { 'Default': True },
555                 'bar': { 'Default': True },
556                 'baz': { 'Default': False }
557             }
558         }
559         api_client = self.mock_keep_services(api_mock=api_mock, count=2)
560         keep_client = arvados.KeepClient(api_client=api_client)
561         resp_hdr = {
562             'x-keep-storage-classes-confirmed': 'foo=1, bar=1',
563             'x-keep-replicas-stored': 1
564         }
565         with tutil.mock_keep_responses(self.locator, 200, **resp_hdr) as mock:
566             keep_client.put(self.data, copies=1)
567             req_hdr = mock.responses[0]
568             self.assertIn(
569                 'X-Keep-Storage-Classes: bar, foo', req_hdr.getopt(pycurl.HTTPHEADER))
570
571     def test_storage_classes_req_header(self):
572         self.assertEqual(
573             self.api_client.config()['StorageClasses'],
574             {'default': {'Default': True}})
575         cases = [
576             # requested, expected
577             [['foo'], 'X-Keep-Storage-Classes: foo'],
578             [['bar', 'foo'], 'X-Keep-Storage-Classes: bar, foo'],
579             [[], 'X-Keep-Storage-Classes: default'],
580             [None, 'X-Keep-Storage-Classes: default'],
581         ]
582         for req_classes, expected_header in cases:
583             headers = {'x-keep-replicas-stored': 1}
584             if req_classes is None or len(req_classes) == 0:
585                 confirmed_hdr = 'default=1'
586             elif len(req_classes) > 0:
587                 confirmed_hdr = ', '.join(["{}=1".format(cls) for cls in req_classes])
588             headers.update({'x-keep-storage-classes-confirmed': confirmed_hdr})
589             with tutil.mock_keep_responses(self.locator, 200, **headers) as mock:
590                 self.keep_client.put(self.data, copies=1, classes=req_classes)
591                 req_hdr = mock.responses[0]
592                 self.assertIn(expected_header, req_hdr.getopt(pycurl.HTTPHEADER))
593
594     def test_partial_storage_classes_put(self):
595         headers = {
596             'x-keep-replicas-stored': 1,
597             'x-keep-storage-classes-confirmed': 'foo=1'}
598         with tutil.mock_keep_responses(self.locator, 200, 503, **headers) as mock:
599             with self.assertRaises(arvados.errors.KeepWriteError):
600                 self.keep_client.put(self.data, copies=1, classes=['foo', 'bar'])
601             # 1st request, both classes pending
602             req1_headers = mock.responses[0].getopt(pycurl.HTTPHEADER)
603             self.assertIn('X-Keep-Storage-Classes: bar, foo', req1_headers)
604             # 2nd try, 'foo' class already satisfied
605             req2_headers = mock.responses[1].getopt(pycurl.HTTPHEADER)
606             self.assertIn('X-Keep-Storage-Classes: bar', req2_headers)
607
608     def test_successful_storage_classes_put_requests(self):
609         cases = [
610             # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, expected_requests
611             [ 1, ['foo'], 1, 'foo=1', 1],
612             [ 1, ['foo'], 2, 'foo=2', 1],
613             [ 2, ['foo'], 2, 'foo=2', 1],
614             [ 2, ['foo'], 1, 'foo=1', 2],
615             [ 1, ['foo', 'bar'], 1, 'foo=1, bar=1', 1],
616             [ 1, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
617             [ 2, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
618             [ 2, ['foo', 'bar'], 1, 'foo=1, bar=1', 2],
619             [ 1, ['foo', 'bar'], 1, None, 1],
620             [ 1, ['foo'], 1, None, 1],
621             [ 2, ['foo'], 2, None, 1],
622             [ 2, ['foo'], 1, None, 2],
623         ]
624         for w_copies, w_classes, c_copies, c_classes, e_reqs in cases:
625             headers = {'x-keep-replicas-stored': c_copies}
626             if c_classes is not None:
627                 headers.update({'x-keep-storage-classes-confirmed': c_classes})
628             with tutil.mock_keep_responses(self.locator, 200, 200, **headers) as mock:
629                 case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}", expected_requests={}'.format(w_copies, ', '.join(w_classes), c_copies, c_classes, e_reqs)
630                 self.assertEqual(self.locator,
631                     self.keep_client.put(self.data, copies=w_copies, classes=w_classes),
632                     case_desc)
633                 self.assertEqual(e_reqs, mock.call_count, case_desc)
634
635     def test_failed_storage_classes_put_requests(self):
636         cases = [
637             # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, return_code
638             [ 1, ['foo'], 1, 'bar=1', 200],
639             [ 1, ['foo'], 1, None, 503],
640             [ 2, ['foo'], 1, 'bar=1, foo=0', 200],
641             [ 3, ['foo'], 1, 'bar=1, foo=1', 200],
642             [ 3, ['foo', 'bar'], 1, 'bar=2, foo=1', 200],
643         ]
644         for w_copies, w_classes, c_copies, c_classes, return_code in cases:
645             headers = {'x-keep-replicas-stored': c_copies}
646             if c_classes is not None:
647                 headers.update({'x-keep-storage-classes-confirmed': c_classes})
648             with tutil.mock_keep_responses(self.locator, return_code, return_code, **headers):
649                 case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}"'.format(w_copies, ', '.join(w_classes), c_copies, c_classes)
650                 with self.assertRaises(arvados.errors.KeepWriteError, msg=case_desc):
651                     self.keep_client.put(self.data, copies=w_copies, classes=w_classes)
652
653 @tutil.skip_sleep
654 class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock):
655     def setUp(self):
656         self.api_client = self.mock_keep_services(count=2)
657         self.keep_client = arvados.KeepClient(api_client=self.api_client)
658         self.data = b'xyzzy'
659         self.locator = '1271ed5ef305aadabc605b1609e24c52'
660         self.test_id = arvados.util.new_request_id()
661         self.assertRegex(self.test_id, r'^req-[a-z0-9]{20}$')
662         # If we don't set request_id to None explicitly here, it will
663         # return <MagicMock name='api_client_mock.request_id'
664         # id='123456789'>:
665         self.api_client.request_id = None
666
667     def test_default_to_api_client_request_id(self):
668         self.api_client.request_id = self.test_id
669         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
670             self.keep_client.put(self.data)
671         self.assertEqual(2, len(mock.responses))
672         for resp in mock.responses:
673             self.assertProvidedRequestId(resp)
674
675         with tutil.mock_keep_responses(self.data, 200) as mock:
676             self.keep_client.get(self.locator)
677         self.assertProvidedRequestId(mock.responses[0])
678
679         with tutil.mock_keep_responses(b'', 200) as mock:
680             self.keep_client.head(self.locator)
681         self.assertProvidedRequestId(mock.responses[0])
682
683     def test_explicit_request_id(self):
684         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
685             self.keep_client.put(self.data, request_id=self.test_id)
686         self.assertEqual(2, len(mock.responses))
687         for resp in mock.responses:
688             self.assertProvidedRequestId(resp)
689
690         with tutil.mock_keep_responses(self.data, 200) as mock:
691             self.keep_client.get(self.locator, request_id=self.test_id)
692         self.assertProvidedRequestId(mock.responses[0])
693
694         with tutil.mock_keep_responses(b'', 200) as mock:
695             self.keep_client.head(self.locator, request_id=self.test_id)
696         self.assertProvidedRequestId(mock.responses[0])
697
698     def test_automatic_request_id(self):
699         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
700             self.keep_client.put(self.data)
701         self.assertEqual(2, len(mock.responses))
702         for resp in mock.responses:
703             self.assertAutomaticRequestId(resp)
704
705         with tutil.mock_keep_responses(self.data, 200) as mock:
706             self.keep_client.get(self.locator)
707         self.assertAutomaticRequestId(mock.responses[0])
708
709         with tutil.mock_keep_responses(b'', 200) as mock:
710             self.keep_client.head(self.locator)
711         self.assertAutomaticRequestId(mock.responses[0])
712
713     def test_request_id_in_exception(self):
714         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
715             with self.assertRaisesRegex(arvados.errors.KeepReadError, self.test_id):
716                 self.keep_client.head(self.locator, request_id=self.test_id)
717
718         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
719             with self.assertRaisesRegex(arvados.errors.KeepReadError, r'req-[a-z0-9]{20}'):
720                 self.keep_client.get(self.locator)
721
722         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
723             with self.assertRaisesRegex(arvados.errors.KeepWriteError, self.test_id):
724                 self.keep_client.put(self.data, request_id=self.test_id)
725
726         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
727             with self.assertRaisesRegex(arvados.errors.KeepWriteError, r'req-[a-z0-9]{20}'):
728                 self.keep_client.put(self.data)
729
730     def assertAutomaticRequestId(self, resp):
731         hdr = [x for x in resp.getopt(pycurl.HTTPHEADER)
732                if x.startswith('X-Request-Id: ')][0]
733         self.assertNotEqual(hdr, 'X-Request-Id: '+self.test_id)
734         self.assertRegex(hdr, r'^X-Request-Id: req-[a-z0-9]{20}$')
735
736     def assertProvidedRequestId(self, resp):
737         self.assertIn('X-Request-Id: '+self.test_id,
738                       resp.getopt(pycurl.HTTPHEADER))
739
740
741 @tutil.skip_sleep
742 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
743
744     def setUp(self):
745         # expected_order[i] is the probe order for
746         # hash=md5(sprintf("%064x",i)) where there are 16 services
747         # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
748         # the first probe for the block consisting of 64 "0"
749         # characters is the service whose uuid is
750         # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
751         self.services = 16
752         self.expected_order = [
753             list('3eab2d5fc9681074'),
754             list('097dba52e648f1c3'),
755             list('c5b4e023f8a7d691'),
756             list('9d81c02e76a3bf54'),
757             ]
758         self.blocks = [
759             "{:064x}".format(x).encode()
760             for x in range(len(self.expected_order))]
761         self.hashes = [
762             hashlib.md5(self.blocks[x]).hexdigest()
763             for x in range(len(self.expected_order))]
764         self.api_client = self.mock_keep_services(count=self.services)
765         self.keep_client = arvados.KeepClient(api_client=self.api_client)
766
767     def test_weighted_service_roots_against_reference_set(self):
768         # Confirm weighted_service_roots() returns the correct order
769         for i, hash in enumerate(self.hashes):
770             roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash))
771             got_order = [
772                 re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
773                 for root in roots]
774             self.assertEqual(self.expected_order[i], got_order)
775
776     def test_get_probe_order_against_reference_set(self):
777         self._test_probe_order_against_reference_set(
778             lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
779
780     def test_head_probe_order_against_reference_set(self):
781         self._test_probe_order_against_reference_set(
782             lambda i: self.keep_client.head(self.hashes[i], num_retries=1))
783
784     def test_put_probe_order_against_reference_set(self):
785         # copies=1 prevents the test from being sensitive to races
786         # between writer threads.
787         self._test_probe_order_against_reference_set(
788             lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1))
789
790     def _test_probe_order_against_reference_set(self, op):
791         for i in range(len(self.blocks)):
792             with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \
793                  self.assertRaises(arvados.errors.KeepRequestError):
794                 op(i)
795             got_order = [
796                 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
797                 for resp in mock.responses]
798             self.assertEqual(self.expected_order[i]*2, got_order)
799
800     def test_put_probe_order_multiple_copies(self):
801         for copies in range(2, 4):
802             for i in range(len(self.blocks)):
803                 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \
804                      self.assertRaises(arvados.errors.KeepWriteError):
805                     self.keep_client.put(self.blocks[i], num_retries=2, copies=copies)
806                 got_order = [
807                     re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
808                     for resp in mock.responses]
809                 # With T threads racing to make requests, the position
810                 # of a given server in the sequence of HTTP requests
811                 # (got_order) cannot be more than T-1 positions
812                 # earlier than that server's position in the reference
813                 # probe sequence (expected_order).
814                 #
815                 # Loop invariant: we have accounted for +pos+ expected
816                 # probes, either by seeing them in +got_order+ or by
817                 # putting them in +pending+ in the hope of seeing them
818                 # later. As long as +len(pending)<T+, we haven't
819                 # started a request too early.
820                 pending = []
821                 for pos, expected in enumerate(self.expected_order[i]*3):
822                     got = got_order[pos-len(pending)]
823                     while got in pending:
824                         del pending[pending.index(got)]
825                         got = got_order[pos-len(pending)]
826                     if got != expected:
827                         pending.append(expected)
828                         self.assertLess(
829                             len(pending), copies,
830                             "pending={}, with copies={}, got {}, expected {}".format(
831                                 pending, copies, repr(got_order), repr(self.expected_order[i]*3)))
832
833     def test_probe_waste_adding_one_server(self):
834         hashes = [
835             hashlib.md5("{:064x}".format(x).encode()).hexdigest() for x in range(100)]
836         initial_services = 12
837         self.api_client = self.mock_keep_services(count=initial_services)
838         self.keep_client = arvados.KeepClient(api_client=self.api_client)
839         probes_before = [
840             self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
841         for added_services in range(1, 12):
842             api_client = self.mock_keep_services(count=initial_services+added_services)
843             keep_client = arvados.KeepClient(api_client=api_client)
844             total_penalty = 0
845             for hash_index in range(len(hashes)):
846                 probe_after = keep_client.weighted_service_roots(
847                     arvados.KeepLocator(hashes[hash_index]))
848                 penalty = probe_after.index(probes_before[hash_index][0])
849                 self.assertLessEqual(penalty, added_services)
850                 total_penalty += penalty
851             # Average penalty per block should not exceed
852             # N(added)/N(orig) by more than 20%, and should get closer
853             # to the ideal as we add data points.
854             expect_penalty = (
855                 added_services *
856                 len(hashes) / initial_services)
857             max_penalty = (
858                 expect_penalty *
859                 (120 - added_services)/100)
860             min_penalty = (
861                 expect_penalty * 8/10)
862             self.assertTrue(
863                 min_penalty <= total_penalty <= max_penalty,
864                 "With {}+{} services, {} blocks, penalty {} but expected {}..{}".format(
865                     initial_services,
866                     added_services,
867                     len(hashes),
868                     total_penalty,
869                     min_penalty,
870                     max_penalty))
871
872     def check_64_zeros_error_order(self, verb, exc_class):
873         data = b'0' * 64
874         if verb == 'get':
875             data = tutil.str_keep_locator(data)
876         # Arbitrary port number:
877         aport = random.randint(1024,65535)
878         api_client = self.mock_keep_services(service_port=aport, count=self.services)
879         keep_client = arvados.KeepClient(api_client=api_client)
880         with mock.patch('pycurl.Curl') as curl_mock, \
881              self.assertRaises(exc_class) as err_check:
882             curl_mock.return_value = tutil.FakeCurl.make(code=500, body=b'')
883             getattr(keep_client, verb)(data)
884         urls = [urllib.parse.urlparse(url)
885                 for url in err_check.exception.request_errors()]
886         self.assertEqual([('keep0x' + c, aport) for c in '3eab2d5fc9681074'],
887                          [(url.hostname, url.port) for url in urls])
888
889     def test_get_error_shows_probe_order(self):
890         self.check_64_zeros_error_order('get', arvados.errors.KeepReadError)
891
892     def test_put_error_shows_probe_order(self):
893         self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
894
895
896 class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase):
897     # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
898     # 1s worth of data and then trigger bandwidth errors before running
899     # out of data.
900     DATA = b'x'*2**11
901     BANDWIDTH_LOW_LIM = 1024
902     TIMEOUT_TIME = 1.0
903
904     class assertTakesBetween(unittest.TestCase):
905         def __init__(self, tmin, tmax):
906             self.tmin = tmin
907             self.tmax = tmax
908
909         def __enter__(self):
910             self.t0 = time.time()
911
912         def __exit__(self, *args, **kwargs):
913             # Round times to milliseconds, like CURL. Otherwise, we
914             # fail when CURL reaches a 1s timeout at 0.9998s.
915             delta = round(time.time() - self.t0, 3)
916             self.assertGreaterEqual(delta, self.tmin)
917             self.assertLessEqual(delta, self.tmax)
918
919     class assertTakesGreater(unittest.TestCase):
920         def __init__(self, tmin):
921             self.tmin = tmin
922
923         def __enter__(self):
924             self.t0 = time.time()
925
926         def __exit__(self, *args, **kwargs):
927             delta = round(time.time() - self.t0, 3)
928             self.assertGreaterEqual(delta, self.tmin)
929
930     def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
931         return arvados.KeepClient(
932             api_client=self.api_client,
933             timeout=timeouts)
934
935     def test_timeout_slow_connect(self):
936         # Can't simulate TCP delays with our own socket. Leave our
937         # stub server running uselessly, and try to connect to an
938         # unroutable IP address instead.
939         self.api_client = self.mock_keep_services(
940             count=1,
941             service_host='240.0.0.0',
942         )
943         with self.assertTakesBetween(0.1, 0.5):
944             with self.assertRaises(arvados.errors.KeepWriteError):
945                 self.keepClient().put(self.DATA, copies=1, num_retries=0)
946
947     def test_low_bandwidth_no_delays_success(self):
948         self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
949         kc = self.keepClient()
950         loc = kc.put(self.DATA, copies=1, num_retries=0)
951         self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
952
953     def test_too_low_bandwidth_no_delays_failure(self):
954         # Check that lessening bandwidth corresponds to failing
955         kc = self.keepClient()
956         loc = kc.put(self.DATA, copies=1, num_retries=0)
957         self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
958         with self.assertTakesGreater(self.TIMEOUT_TIME):
959             with self.assertRaises(arvados.errors.KeepReadError):
960                 kc.get(loc, num_retries=0)
961         with self.assertTakesGreater(self.TIMEOUT_TIME):
962             with self.assertRaises(arvados.errors.KeepWriteError):
963                 kc.put(self.DATA, copies=1, num_retries=0)
964
965     def test_low_bandwidth_with_server_response_delay_failure(self):
966         kc = self.keepClient()
967         loc = kc.put(self.DATA, copies=1, num_retries=0)
968         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
969         # Note the actual delay must be 1s longer than the low speed
970         # limit interval in order for curl to detect it reliably.
971         self.server.setdelays(response=self.TIMEOUT_TIME+1)
972         with self.assertTakesGreater(self.TIMEOUT_TIME):
973             with self.assertRaises(arvados.errors.KeepReadError):
974                 kc.get(loc, num_retries=0)
975         with self.assertTakesGreater(self.TIMEOUT_TIME):
976             with self.assertRaises(arvados.errors.KeepWriteError):
977                 kc.put(self.DATA, copies=1, num_retries=0)
978         with self.assertTakesGreater(self.TIMEOUT_TIME):
979             kc.head(loc, num_retries=0)
980
981     def test_low_bandwidth_with_server_mid_delay_failure(self):
982         kc = self.keepClient()
983         loc = kc.put(self.DATA, copies=1, num_retries=0)
984         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
985         # Note the actual delay must be 1s longer than the low speed
986         # limit interval in order for curl to detect it reliably.
987         self.server.setdelays(mid_write=self.TIMEOUT_TIME+1, mid_read=self.TIMEOUT_TIME+1)
988         with self.assertTakesGreater(self.TIMEOUT_TIME):
989             with self.assertRaises(arvados.errors.KeepReadError) as e:
990                 kc.get(loc, num_retries=0)
991         with self.assertTakesGreater(self.TIMEOUT_TIME):
992             with self.assertRaises(arvados.errors.KeepWriteError):
993                 kc.put(self.DATA, copies=1, num_retries=0)
994
995     def test_timeout_slow_request(self):
996         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
997         self.server.setdelays(request=.2)
998         self._test_connect_timeout_under_200ms(loc)
999         self.server.setdelays(request=2)
1000         self._test_response_timeout_under_2s(loc)
1001
1002     def test_timeout_slow_response(self):
1003         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
1004         self.server.setdelays(response=.2)
1005         self._test_connect_timeout_under_200ms(loc)
1006         self.server.setdelays(response=2)
1007         self._test_response_timeout_under_2s(loc)
1008
1009     def test_timeout_slow_response_body(self):
1010         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
1011         self.server.setdelays(response_body=.2)
1012         self._test_connect_timeout_under_200ms(loc)
1013         self.server.setdelays(response_body=2)
1014         self._test_response_timeout_under_2s(loc)
1015
1016     def _test_connect_timeout_under_200ms(self, loc):
1017         # Allow 100ms to connect, then 1s for response. Everything
1018         # should work, and everything should take at least 200ms to
1019         # return.
1020         kc = self.keepClient(timeouts=(.1, 1))
1021         with self.assertTakesBetween(.2, .3):
1022             kc.put(self.DATA, copies=1, num_retries=0)
1023         with self.assertTakesBetween(.2, .3):
1024             self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
1025
1026     def _test_response_timeout_under_2s(self, loc):
1027         # Allow 10s to connect, then 1s for response. Nothing should
1028         # work, and everything should take at least 1s to return.
1029         kc = self.keepClient(timeouts=(10, 1))
1030         with self.assertTakesBetween(1, 9):
1031             with self.assertRaises(arvados.errors.KeepReadError):
1032                 kc.get(loc, num_retries=0)
1033         with self.assertTakesBetween(1, 9):
1034             with self.assertRaises(arvados.errors.KeepWriteError):
1035                 kc.put(self.DATA, copies=1, num_retries=0)
1036
1037
1038 class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
1039     def mock_disks_and_gateways(self, disks=3, gateways=1):
1040         self.gateways = [{
1041                 'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
1042                 'owner_uuid': 'zzzzz-tpzed-000000000000000',
1043                 'service_host': 'gatewayhost{}'.format(i),
1044                 'service_port': 12345,
1045                 'service_ssl_flag': True,
1046                 'service_type': 'gateway:test',
1047         } for i in range(gateways)]
1048         self.gateway_roots = [
1049             "https://{service_host}:{service_port}/".format(**gw)
1050             for gw in self.gateways]
1051         self.api_client = self.mock_keep_services(
1052             count=disks, additional_services=self.gateways)
1053         self.keepClient = arvados.KeepClient(api_client=self.api_client)
1054
1055     @mock.patch('pycurl.Curl')
1056     def test_get_with_gateway_hint_first(self, MockCurl):
1057         MockCurl.return_value = tutil.FakeCurl.make(
1058             code=200, body='foo', headers={'Content-Length': 3})
1059         self.mock_disks_and_gateways()
1060         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@' + self.gateways[0]['uuid']
1061         self.assertEqual(b'foo', self.keepClient.get(locator))
1062         self.assertEqual(self.gateway_roots[0]+locator,
1063                          MockCurl.return_value.getopt(pycurl.URL).decode())
1064         self.assertEqual(True, self.keepClient.head(locator))
1065
1066     @mock.patch('pycurl.Curl')
1067     def test_get_with_gateway_hints_in_order(self, MockCurl):
1068         gateways = 4
1069         disks = 3
1070         mocks = [
1071             tutil.FakeCurl.make(code=404, body='')
1072             for _ in range(gateways+disks)
1073         ]
1074         MockCurl.side_effect = tutil.queue_with(mocks)
1075         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1076         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1077                            ['K@'+gw['uuid'] for gw in self.gateways])
1078         with self.assertRaises(arvados.errors.NotFoundError):
1079             self.keepClient.get(locator)
1080         # Gateways are tried first, in the order given.
1081         for i, root in enumerate(self.gateway_roots):
1082             self.assertEqual(root+locator,
1083                              mocks[i].getopt(pycurl.URL).decode())
1084         # Disk services are tried next.
1085         for i in range(gateways, gateways+disks):
1086             self.assertRegex(
1087                 mocks[i].getopt(pycurl.URL).decode(),
1088                 r'keep0x')
1089
1090     @mock.patch('pycurl.Curl')
1091     def test_head_with_gateway_hints_in_order(self, MockCurl):
1092         gateways = 4
1093         disks = 3
1094         mocks = [
1095             tutil.FakeCurl.make(code=404, body=b'')
1096             for _ in range(gateways+disks)
1097         ]
1098         MockCurl.side_effect = tutil.queue_with(mocks)
1099         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1100         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1101                            ['K@'+gw['uuid'] for gw in self.gateways])
1102         with self.assertRaises(arvados.errors.NotFoundError):
1103             self.keepClient.head(locator)
1104         # Gateways are tried first, in the order given.
1105         for i, root in enumerate(self.gateway_roots):
1106             self.assertEqual(root+locator,
1107                              mocks[i].getopt(pycurl.URL).decode())
1108         # Disk services are tried next.
1109         for i in range(gateways, gateways+disks):
1110             self.assertRegex(
1111                 mocks[i].getopt(pycurl.URL).decode(),
1112                 r'keep0x')
1113
1114     @mock.patch('pycurl.Curl')
1115     def test_get_with_remote_proxy_hint(self, MockCurl):
1116         MockCurl.return_value = tutil.FakeCurl.make(
1117             code=200, body=b'foo', headers={'Content-Length': 3})
1118         self.mock_disks_and_gateways()
1119         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1120         self.assertEqual(b'foo', self.keepClient.get(locator))
1121         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1122                          MockCurl.return_value.getopt(pycurl.URL).decode())
1123
1124     @mock.patch('pycurl.Curl')
1125     def test_head_with_remote_proxy_hint(self, MockCurl):
1126         MockCurl.return_value = tutil.FakeCurl.make(
1127             code=200, body=b'foo', headers={'Content-Length': 3})
1128         self.mock_disks_and_gateways()
1129         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1130         self.assertEqual(True, self.keepClient.head(locator))
1131         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1132                          MockCurl.return_value.getopt(pycurl.URL).decode())
1133
1134
1135 class KeepClientRetryTestMixin(object):
1136     # Testing with a local Keep store won't exercise the retry behavior.
1137     # Instead, our strategy is:
1138     # * Create a client with one proxy specified (pointed at a black
1139     #   hole), so there's no need to instantiate an API client, and
1140     #   all HTTP requests come from one place.
1141     # * Mock httplib's request method to provide simulated responses.
1142     # This lets us test the retry logic extensively without relying on any
1143     # supporting servers, and prevents side effects in case something hiccups.
1144     # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
1145     # run_method().
1146     #
1147     # Test classes must define TEST_PATCHER to a method that mocks
1148     # out appropriate methods in the client.
1149
1150     PROXY_ADDR = 'http://[%s]:65535/' % (tutil.TEST_HOST,)
1151     TEST_DATA = b'testdata'
1152     TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
1153
1154     def setUp(self):
1155         self.client_kwargs = {'proxy': self.PROXY_ADDR, 'local_store': ''}
1156
1157     def new_client(self, **caller_kwargs):
1158         kwargs = self.client_kwargs.copy()
1159         kwargs.update(caller_kwargs)
1160         return arvados.KeepClient(**kwargs)
1161
1162     def run_method(self, *args, **kwargs):
1163         raise NotImplementedError("test subclasses must define run_method")
1164
1165     def check_success(self, expected=None, *args, **kwargs):
1166         if expected is None:
1167             expected = self.DEFAULT_EXPECT
1168         self.assertEqual(expected, self.run_method(*args, **kwargs))
1169
1170     def check_exception(self, error_class=None, *args, **kwargs):
1171         if error_class is None:
1172             error_class = self.DEFAULT_EXCEPTION
1173         with self.assertRaises(error_class) as err:
1174             self.run_method(*args, **kwargs)
1175         return err
1176
1177     def test_immediate_success(self):
1178         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 200):
1179             self.check_success()
1180
1181     def test_retry_then_success(self):
1182         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1183             self.check_success(num_retries=3)
1184
1185     def test_exception_then_success(self):
1186         with self.TEST_PATCHER(self.DEFAULT_EXPECT, Exception('mock err'), 200):
1187             self.check_success(num_retries=3)
1188
1189     def test_no_default_retry(self):
1190         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1191             self.check_exception()
1192
1193     def test_no_retry_after_permanent_error(self):
1194         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 403, 200):
1195             self.check_exception(num_retries=3)
1196
1197     def test_error_after_retries_exhausted(self):
1198         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 500, 200):
1199             err = self.check_exception(num_retries=1)
1200         self.assertRegex(str(err.exception), r'failed to .* after 2 attempts')
1201
1202     def test_num_retries_instance_fallback(self):
1203         self.client_kwargs['num_retries'] = 3
1204         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1205             self.check_success()
1206
1207
1208 @tutil.skip_sleep
1209 class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1210     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
1211     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1212     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1213     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1214
1215     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1216                    *args, **kwargs):
1217         return self.new_client().get(locator, *args, **kwargs)
1218
1219     def test_specific_exception_when_not_found(self):
1220         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1221             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1222
1223     def test_general_exception_with_mixed_errors(self):
1224         # get should raise a NotFoundError if no server returns the block,
1225         # and a high threshold of servers report that it's not found.
1226         # This test rigs up 50/50 disagreement between two servers, and
1227         # checks that it does not become a NotFoundError.
1228         client = self.new_client()
1229         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1230             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1231                 client.get(self.HINTED_LOCATOR)
1232             self.assertNotIsInstance(
1233                 exc_check.exception, arvados.errors.NotFoundError,
1234                 "mixed errors raised NotFoundError")
1235
1236     def test_hint_server_can_succeed_without_retries(self):
1237         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1238             self.check_success(locator=self.HINTED_LOCATOR)
1239
1240     def test_try_next_server_after_timeout(self):
1241         with tutil.mock_keep_responses(
1242                 (socket.timeout("timed out"), 200),
1243                 (self.DEFAULT_EXPECT, 200)):
1244             self.check_success(locator=self.HINTED_LOCATOR)
1245
1246     def test_retry_data_with_wrong_checksum(self):
1247         with tutil.mock_keep_responses(
1248                 ('baddata', 200),
1249                 (self.DEFAULT_EXPECT, 200)):
1250             self.check_success(locator=self.HINTED_LOCATOR)
1251
1252 @tutil.skip_sleep
1253 class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1254     DEFAULT_EXPECT = True
1255     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1256     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1257     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1258
1259     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1260                    *args, **kwargs):
1261         return self.new_client().head(locator, *args, **kwargs)
1262
1263     def test_specific_exception_when_not_found(self):
1264         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1265             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1266
1267     def test_general_exception_with_mixed_errors(self):
1268         # head should raise a NotFoundError if no server returns the block,
1269         # and a high threshold of servers report that it's not found.
1270         # This test rigs up 50/50 disagreement between two servers, and
1271         # checks that it does not become a NotFoundError.
1272         client = self.new_client()
1273         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1274             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1275                 client.head(self.HINTED_LOCATOR)
1276             self.assertNotIsInstance(
1277                 exc_check.exception, arvados.errors.NotFoundError,
1278                 "mixed errors raised NotFoundError")
1279
1280     def test_hint_server_can_succeed_without_retries(self):
1281         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1282             self.check_success(locator=self.HINTED_LOCATOR)
1283
1284     def test_try_next_server_after_timeout(self):
1285         with tutil.mock_keep_responses(
1286                 (socket.timeout("timed out"), 200),
1287                 (self.DEFAULT_EXPECT, 200)):
1288             self.check_success(locator=self.HINTED_LOCATOR)
1289
1290 @tutil.skip_sleep
1291 class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1292     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
1293     DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
1294     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1295
1296     def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
1297                    copies=1, *args, **kwargs):
1298         return self.new_client().put(data, copies, *args, **kwargs)
1299
1300     def test_do_not_send_multiple_copies_to_same_server(self):
1301         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200):
1302             self.check_exception(copies=2, num_retries=3)
1303
1304
1305 class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
1306
1307     class FakeKeepService(object):
1308         def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
1309             self.delay = delay
1310             self.will_succeed = will_succeed
1311             self.will_raise = will_raise
1312             self._result = {}
1313             self._result['headers'] = {}
1314             self._result['headers']['x-keep-replicas-stored'] = str(replicas)
1315             self._result['headers']['x-keep-storage-classes-confirmed'] = 'default={}'.format(replicas)
1316             self._result['body'] = 'foobar'
1317
1318         def put(self, data_hash, data, timeout, headers):
1319             time.sleep(self.delay)
1320             if self.will_raise is not None:
1321                 raise self.will_raise
1322             return self.will_succeed
1323
1324         def last_result(self):
1325             if self.will_succeed:
1326                 return self._result
1327             else:
1328                 return {"status_code": 500, "body": "didn't succeed"}
1329
1330         def finished(self):
1331             return False
1332
1333     def setUp(self):
1334         self.copies = 3
1335         self.pool = arvados.KeepClient.KeepWriterThreadPool(
1336             data = 'foo',
1337             data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1338             max_service_replicas = self.copies,
1339             copies = self.copies
1340         )
1341
1342     def test_only_write_enough_on_success(self):
1343         for i in range(10):
1344             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1345             self.pool.add_task(ks, None)
1346         self.pool.join()
1347         self.assertEqual(self.pool.done(), (self.copies, []))
1348
1349     def test_only_write_enough_on_partial_success(self):
1350         for i in range(5):
1351             ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
1352             self.pool.add_task(ks, None)
1353             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1354             self.pool.add_task(ks, None)
1355         self.pool.join()
1356         self.assertEqual(self.pool.done(), (self.copies, []))
1357
1358     def test_only_write_enough_when_some_crash(self):
1359         for i in range(5):
1360             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1361             self.pool.add_task(ks, None)
1362             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1363             self.pool.add_task(ks, None)
1364         self.pool.join()
1365         self.assertEqual(self.pool.done(), (self.copies, []))
1366
1367     def test_fail_when_too_many_crash(self):
1368         for i in range(self.copies+1):
1369             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1370             self.pool.add_task(ks, None)
1371         for i in range(self.copies-1):
1372             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1373             self.pool.add_task(ks, None)
1374         self.pool.join()
1375         self.assertEqual(self.pool.done(), (self.copies-1, []))
1376
1377
1378 @tutil.skip_sleep
1379 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock):
1380     # Test put()s that need two distinct servers to succeed, possibly
1381     # requiring multiple passes through the retry loop.
1382
1383     def setUp(self):
1384         self.api_client = self.mock_keep_services(count=2)
1385         self.keep_client = arvados.KeepClient(api_client=self.api_client)
1386
1387     def test_success_after_exception(self):
1388         with tutil.mock_keep_responses(
1389                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1390                 Exception('mock err'), 200, 200) as req_mock:
1391             self.keep_client.put('foo', num_retries=1, copies=2)
1392         self.assertEqual(3, req_mock.call_count)
1393
1394     def test_success_after_retryable_error(self):
1395         with tutil.mock_keep_responses(
1396                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1397                 500, 200, 200) as req_mock:
1398             self.keep_client.put('foo', num_retries=1, copies=2)
1399         self.assertEqual(3, req_mock.call_count)
1400
1401     def test_fail_after_final_error(self):
1402         # First retry loop gets a 200 (can't achieve replication by
1403         # storing again on that server) and a 400 (can't retry that
1404         # server at all), so we shouldn't try a third request.
1405         with tutil.mock_keep_responses(
1406                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1407                 200, 400, 200) as req_mock:
1408             with self.assertRaises(arvados.errors.KeepWriteError):
1409                 self.keep_client.put('foo', num_retries=1, copies=2)
1410         self.assertEqual(2, req_mock.call_count)
1411
1412 class KeepClientAPIErrorTest(unittest.TestCase):
1413     def test_api_fail(self):
1414         class ApiMock(object):
1415             def __getattr__(self, r):
1416                 if r == "api_token":
1417                     return "abc"
1418                 elif r == "insecure":
1419                     return False
1420                 elif r == "config":
1421                     return lambda: {}
1422                 else:
1423                     raise arvados.errors.KeepReadError()
1424         keep_client = arvados.KeepClient(api_client=ApiMock(),
1425                                              proxy='', local_store='')
1426
1427         # The bug this is testing for is that if an API (not
1428         # keepstore) exception is thrown as part of a get(), the next
1429         # attempt to get that same block will result in a deadlock.
1430         # This is why there are two get()s in a row.  Unfortunately,
1431         # the failure mode for this test is that the test suite
1432         # deadlocks, there isn't a good way to avoid that without
1433         # adding a special case that has no use except for this test.
1434
1435         with self.assertRaises(arvados.errors.KeepReadError):
1436             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1437         with self.assertRaises(arvados.errors.KeepReadError):
1438             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")