17344: Fix proxy detection test.
[arvados.git] / sdk / python / tests / test_keep_client.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 standard_library.install_aliases()
9 from builtins import str
10 from builtins import range
11 from builtins import object
12 import hashlib
13 import mock
14 import os
15 import pycurl
16 import random
17 import re
18 import socket
19 import sys
20 import time
21 import unittest
22 import urllib.parse
23
24 import arvados
25 import arvados.retry
26 import arvados.util
27 from . import arvados_testutil as tutil
28 from . import keepstub
29 from . import run_test_server
30
31 class KeepTestCase(run_test_server.TestCaseWithServers):
32     MAIN_SERVER = {}
33     KEEP_SERVER = {}
34
35     @classmethod
36     def setUpClass(cls):
37         super(KeepTestCase, cls).setUpClass()
38         run_test_server.authorize_with("admin")
39         cls.api_client = arvados.api('v1')
40         cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
41                                              proxy='', local_store='')
42
43     def test_KeepBasicRWTest(self):
44         self.assertEqual(0, self.keep_client.upload_counter.get())
45         foo_locator = self.keep_client.put('foo')
46         self.assertRegex(
47             foo_locator,
48             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
49             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
50
51         # 6 bytes because uploaded 2 copies
52         self.assertEqual(6, self.keep_client.upload_counter.get())
53
54         self.assertEqual(0, self.keep_client.download_counter.get())
55         self.assertEqual(self.keep_client.get(foo_locator),
56                          b'foo',
57                          'wrong content from Keep.get(md5("foo"))')
58         self.assertEqual(3, self.keep_client.download_counter.get())
59
60     def test_KeepBinaryRWTest(self):
61         blob_str = b'\xff\xfe\xf7\x00\x01\x02'
62         blob_locator = self.keep_client.put(blob_str)
63         self.assertRegex(
64             blob_locator,
65             '^7fc7c53b45e53926ba52821140fef396\+6',
66             ('wrong locator from Keep.put(<binarydata>):' + blob_locator))
67         self.assertEqual(self.keep_client.get(blob_locator),
68                          blob_str,
69                          'wrong content from Keep.get(md5(<binarydata>))')
70
71     def test_KeepLongBinaryRWTest(self):
72         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
73         for i in range(0, 23):
74             blob_data = blob_data + blob_data
75         blob_locator = self.keep_client.put(blob_data)
76         self.assertRegex(
77             blob_locator,
78             '^84d90fc0d8175dd5dcfab04b999bc956\+67108864',
79             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
80         self.assertEqual(self.keep_client.get(blob_locator),
81                          blob_data,
82                          'wrong content from Keep.get(md5(<binarydata>))')
83
84     @unittest.skip("unreliable test - please fix and close #8752")
85     def test_KeepSingleCopyRWTest(self):
86         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
87         blob_locator = self.keep_client.put(blob_data, copies=1)
88         self.assertRegex(
89             blob_locator,
90             '^c902006bc98a3eb4a3663b65ab4a6fab\+8',
91             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
92         self.assertEqual(self.keep_client.get(blob_locator),
93                          blob_data,
94                          'wrong content from Keep.get(md5(<binarydata>))')
95
96     def test_KeepEmptyCollectionTest(self):
97         blob_locator = self.keep_client.put('', copies=1)
98         self.assertRegex(
99             blob_locator,
100             '^d41d8cd98f00b204e9800998ecf8427e\+0',
101             ('wrong locator from Keep.put(""): ' + blob_locator))
102
103     def test_unicode_must_be_ascii(self):
104         # If unicode type, must only consist of valid ASCII
105         foo_locator = self.keep_client.put(u'foo')
106         self.assertRegex(
107             foo_locator,
108             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
109             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
110
111         if sys.version_info < (3, 0):
112             with self.assertRaises(UnicodeEncodeError):
113                 # Error if it is not ASCII
114                 self.keep_client.put(u'\xe2')
115
116         with self.assertRaises(AttributeError):
117             # Must be bytes or have an encode() method
118             self.keep_client.put({})
119
120     def test_KeepHeadTest(self):
121         locator = self.keep_client.put('test_head')
122         self.assertRegex(
123             locator,
124             '^b9a772c7049325feb7130fff1f8333e9\+9',
125             'wrong md5 hash from Keep.put for "test_head": ' + locator)
126         self.assertEqual(True, self.keep_client.head(locator))
127         self.assertEqual(self.keep_client.get(locator),
128                          b'test_head',
129                          'wrong content from Keep.get for "test_head"')
130
131 class KeepPermissionTestCase(run_test_server.TestCaseWithServers):
132     MAIN_SERVER = {}
133     KEEP_SERVER = {'blob_signing': True}
134
135     def test_KeepBasicRWTest(self):
136         run_test_server.authorize_with('active')
137         keep_client = arvados.KeepClient()
138         foo_locator = keep_client.put('foo')
139         self.assertRegex(
140             foo_locator,
141             r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
142             'invalid locator from Keep.put("foo"): ' + foo_locator)
143         self.assertEqual(keep_client.get(foo_locator),
144                          b'foo',
145                          'wrong content from Keep.get(md5("foo"))')
146
147         # GET with an unsigned locator => NotFound
148         bar_locator = keep_client.put('bar')
149         unsigned_bar_locator = "37b51d194a7513e45b56f6524f2d51f2+3"
150         self.assertRegex(
151             bar_locator,
152             r'^37b51d194a7513e45b56f6524f2d51f2\+3\+A[a-f0-9]+@[a-f0-9]+$',
153             'invalid locator from Keep.put("bar"): ' + bar_locator)
154         self.assertRaises(arvados.errors.NotFoundError,
155                           keep_client.get,
156                           unsigned_bar_locator)
157
158         # GET from a different user => NotFound
159         run_test_server.authorize_with('spectator')
160         self.assertRaises(arvados.errors.NotFoundError,
161                           arvados.Keep.get,
162                           bar_locator)
163
164         # Unauthenticated GET for a signed locator => NotFound
165         # Unauthenticated GET for an unsigned locator => NotFound
166         keep_client.api_token = ''
167         self.assertRaises(arvados.errors.NotFoundError,
168                           keep_client.get,
169                           bar_locator)
170         self.assertRaises(arvados.errors.NotFoundError,
171                           keep_client.get,
172                           unsigned_bar_locator)
173
174
175 class KeepProxyTestCase(run_test_server.TestCaseWithServers):
176     MAIN_SERVER = {}
177     KEEP_SERVER = {}
178     KEEP_PROXY_SERVER = {}
179
180     @classmethod
181     def setUpClass(cls):
182         super(KeepProxyTestCase, cls).setUpClass()
183         run_test_server.authorize_with('active')
184         cls.api_client = arvados.api('v1')
185
186     def tearDown(self):
187         arvados.config.settings().pop('ARVADOS_EXTERNAL_CLIENT', None)
188         super(KeepProxyTestCase, self).tearDown()
189
190     def test_KeepProxyTest1(self):
191         # Will use ARVADOS_KEEP_SERVICES environment variable that
192         # is set by setUpClass().
193         keep_client = arvados.KeepClient(api_client=self.api_client,
194                                          local_store='')
195         baz_locator = keep_client.put('baz')
196         self.assertRegex(
197             baz_locator,
198             '^73feffa4b7f6bb68e44cf984c85f6e88\+3',
199             'wrong md5 hash from Keep.put("baz"): ' + baz_locator)
200         self.assertEqual(keep_client.get(baz_locator),
201                          b'baz',
202                          'wrong content from Keep.get(md5("baz"))')
203         self.assertTrue(keep_client.using_proxy)
204
205     def test_KeepProxyTestMultipleURIs(self):
206         # Test using ARVADOS_KEEP_SERVICES env var overriding any
207         # existing proxy setting and setting multiple proxies
208         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
209         keep_client = arvados.KeepClient(api_client=self.api_client,
210                                          local_store='')
211         uris = [x['_service_root'] for x in keep_client._keep_services]
212         self.assertEqual(uris, ['http://10.0.0.1/',
213                                 'https://foo.example.org:1234/'])
214
215     def test_KeepProxyTestInvalidURI(self):
216         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org'
217         with self.assertRaises(arvados.errors.ArgumentError):
218             keep_client = arvados.KeepClient(api_client=self.api_client,
219                                              local_store='')
220
221
222 class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
223     def get_service_roots(self, api_client):
224         keep_client = arvados.KeepClient(api_client=api_client)
225         services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
226         return [urllib.parse.urlparse(url) for url in sorted(services)]
227
228     def test_ssl_flag_respected_in_roots(self):
229         for ssl_flag in [False, True]:
230             services = self.get_service_roots(self.mock_keep_services(
231                 service_ssl_flag=ssl_flag))
232             self.assertEqual(
233                 ('https' if ssl_flag else 'http'), services[0].scheme)
234
235     def test_correct_ports_with_ipv6_addresses(self):
236         service = self.get_service_roots(self.mock_keep_services(
237             service_type='proxy', service_host='100::1', service_port=10, count=1))[0]
238         self.assertEqual('100::1', service.hostname)
239         self.assertEqual(10, service.port)
240
241     def test_recognize_proxy_services_in_controller_response(self):
242         keep_client = arvados.KeepClient(api_client=self.mock_keep_services(
243             service_type='proxy', service_host='localhost', service_port=9, count=1))
244         try:
245             # this will fail, but it ensures we get the service
246             # discovery response
247             keep_client.put('baz2')
248         except:
249             pass
250         self.assertTrue(keep_client.using_proxy)
251
252     def test_insecure_disables_tls_verify(self):
253         api_client = self.mock_keep_services(count=1)
254         force_timeout = socket.timeout("timed out")
255
256         api_client.insecure = True
257         with tutil.mock_keep_responses(b'foo', 200) as mock:
258             keep_client = arvados.KeepClient(api_client=api_client)
259             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
260             self.assertEqual(
261                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
262                 0)
263             self.assertEqual(
264                 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
265                 0)
266
267         api_client.insecure = False
268         with tutil.mock_keep_responses(b'foo', 200) as mock:
269             keep_client = arvados.KeepClient(api_client=api_client)
270             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
271             # getopt()==None here means we didn't change the
272             # default. If we were using real pycurl instead of a mock,
273             # it would return the default value 1.
274             self.assertEqual(
275                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
276                 None)
277             self.assertEqual(
278                 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
279                 None)
280
281     def test_refresh_signature(self):
282         blk_digest = '6f5902ac237024bdd0c176cb93063dc4+11'
283         blk_sig = 'da39a3ee5e6b4b0d3255bfef95601890afd80709@53bed294'
284         local_loc = blk_digest+'+A'+blk_sig
285         remote_loc = blk_digest+'+R'+blk_sig
286         api_client = self.mock_keep_services(count=1)
287         headers = {'X-Keep-Locator':local_loc}
288         with tutil.mock_keep_responses('', 200, **headers):
289             # Check that the translated locator gets returned
290             keep_client = arvados.KeepClient(api_client=api_client)
291             self.assertEqual(local_loc, keep_client.refresh_signature(remote_loc))
292             # Check that refresh_signature() uses the correct method and headers
293             keep_client._get_or_head = mock.MagicMock()
294             keep_client.refresh_signature(remote_loc)
295             args, kwargs = keep_client._get_or_head.call_args_list[0]
296             self.assertIn(remote_loc, args)
297             self.assertEqual("HEAD", kwargs['method'])
298             self.assertIn('X-Keep-Signature', kwargs['headers'])
299
300     # test_*_timeout verify that KeepClient instructs pycurl to use
301     # the appropriate connection and read timeouts. They don't care
302     # whether pycurl actually exhibits the expected timeout behavior
303     # -- those tests are in the KeepClientTimeout test class.
304
305     def test_get_timeout(self):
306         api_client = self.mock_keep_services(count=1)
307         force_timeout = socket.timeout("timed out")
308         with tutil.mock_keep_responses(force_timeout, 0) as mock:
309             keep_client = arvados.KeepClient(api_client=api_client)
310             with self.assertRaises(arvados.errors.KeepReadError):
311                 keep_client.get('ffffffffffffffffffffffffffffffff')
312             self.assertEqual(
313                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
314                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
315             self.assertEqual(
316                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
317                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
318             self.assertEqual(
319                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
320                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
321
322     def test_put_timeout(self):
323         api_client = self.mock_keep_services(count=1)
324         force_timeout = socket.timeout("timed out")
325         with tutil.mock_keep_responses(force_timeout, 0) as mock:
326             keep_client = arvados.KeepClient(api_client=api_client)
327             with self.assertRaises(arvados.errors.KeepWriteError):
328                 keep_client.put(b'foo')
329             self.assertEqual(
330                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
331                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
332             self.assertEqual(
333                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
334                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
335             self.assertEqual(
336                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
337                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
338
339     def test_head_timeout(self):
340         api_client = self.mock_keep_services(count=1)
341         force_timeout = socket.timeout("timed out")
342         with tutil.mock_keep_responses(force_timeout, 0) as mock:
343             keep_client = arvados.KeepClient(api_client=api_client)
344             with self.assertRaises(arvados.errors.KeepReadError):
345                 keep_client.head('ffffffffffffffffffffffffffffffff')
346             self.assertEqual(
347                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
348                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
349             self.assertEqual(
350                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
351                 None)
352             self.assertEqual(
353                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
354                 None)
355
356     def test_proxy_get_timeout(self):
357         api_client = self.mock_keep_services(service_type='proxy', count=1)
358         force_timeout = socket.timeout("timed out")
359         with tutil.mock_keep_responses(force_timeout, 0) as mock:
360             keep_client = arvados.KeepClient(api_client=api_client)
361             with self.assertRaises(arvados.errors.KeepReadError):
362                 keep_client.get('ffffffffffffffffffffffffffffffff')
363             self.assertEqual(
364                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
365                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
366             self.assertEqual(
367                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
368                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
369             self.assertEqual(
370                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
371                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
372
373     def test_proxy_head_timeout(self):
374         api_client = self.mock_keep_services(service_type='proxy', count=1)
375         force_timeout = socket.timeout("timed out")
376         with tutil.mock_keep_responses(force_timeout, 0) as mock:
377             keep_client = arvados.KeepClient(api_client=api_client)
378             with self.assertRaises(arvados.errors.KeepReadError):
379                 keep_client.head('ffffffffffffffffffffffffffffffff')
380             self.assertEqual(
381                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
382                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
383             self.assertEqual(
384                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
385                 None)
386             self.assertEqual(
387                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
388                 None)
389
390     def test_proxy_put_timeout(self):
391         api_client = self.mock_keep_services(service_type='proxy', count=1)
392         force_timeout = socket.timeout("timed out")
393         with tutil.mock_keep_responses(force_timeout, 0) as mock:
394             keep_client = arvados.KeepClient(api_client=api_client)
395             with self.assertRaises(arvados.errors.KeepWriteError):
396                 keep_client.put('foo')
397             self.assertEqual(
398                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
399                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
400             self.assertEqual(
401                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
402                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
403             self.assertEqual(
404                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
405                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
406
407     def check_no_services_error(self, verb, exc_class):
408         api_client = mock.MagicMock(name='api_client')
409         api_client.keep_services().accessible().execute.side_effect = (
410             arvados.errors.ApiError)
411         keep_client = arvados.KeepClient(api_client=api_client)
412         with self.assertRaises(exc_class) as err_check:
413             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
414         self.assertEqual(0, len(err_check.exception.request_errors()))
415
416     def test_get_error_with_no_services(self):
417         self.check_no_services_error('get', arvados.errors.KeepReadError)
418
419     def test_head_error_with_no_services(self):
420         self.check_no_services_error('head', arvados.errors.KeepReadError)
421
422     def test_put_error_with_no_services(self):
423         self.check_no_services_error('put', arvados.errors.KeepWriteError)
424
425     def check_errors_from_last_retry(self, verb, exc_class):
426         api_client = self.mock_keep_services(count=2)
427         req_mock = tutil.mock_keep_responses(
428             "retry error reporting test", 500, 500, 500, 500, 500, 500, 502, 502)
429         with req_mock, tutil.skip_sleep, \
430                 self.assertRaises(exc_class) as err_check:
431             keep_client = arvados.KeepClient(api_client=api_client)
432             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
433                                        num_retries=3)
434         self.assertEqual([502, 502], [
435                 getattr(error, 'status_code', None)
436                 for error in err_check.exception.request_errors().values()])
437         self.assertRegex(str(err_check.exception), r'failed to (read|write) .* after 4 attempts')
438
439     def test_get_error_reflects_last_retry(self):
440         self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
441
442     def test_head_error_reflects_last_retry(self):
443         self.check_errors_from_last_retry('head', arvados.errors.KeepReadError)
444
445     def test_put_error_reflects_last_retry(self):
446         self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
447
448     def test_put_error_does_not_include_successful_puts(self):
449         data = 'partial failure test'
450         data_loc = tutil.str_keep_locator(data)
451         api_client = self.mock_keep_services(count=3)
452         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
453                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
454             keep_client = arvados.KeepClient(api_client=api_client)
455             keep_client.put(data)
456         self.assertEqual(2, len(exc_check.exception.request_errors()))
457
458     def test_proxy_put_with_no_writable_services(self):
459         data = 'test with no writable services'
460         data_loc = tutil.str_keep_locator(data)
461         api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
462         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
463                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
464           keep_client = arvados.KeepClient(api_client=api_client)
465           keep_client.put(data)
466         self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
467         self.assertEqual(0, len(exc_check.exception.request_errors()))
468
469     def test_oddball_service_get(self):
470         body = b'oddball service get'
471         api_client = self.mock_keep_services(service_type='fancynewblobstore')
472         with tutil.mock_keep_responses(body, 200):
473             keep_client = arvados.KeepClient(api_client=api_client)
474             actual = keep_client.get(tutil.str_keep_locator(body))
475         self.assertEqual(body, actual)
476
477     def test_oddball_service_put(self):
478         body = b'oddball service put'
479         pdh = tutil.str_keep_locator(body)
480         api_client = self.mock_keep_services(service_type='fancynewblobstore')
481         with tutil.mock_keep_responses(pdh, 200):
482             keep_client = arvados.KeepClient(api_client=api_client)
483             actual = keep_client.put(body, copies=1)
484         self.assertEqual(pdh, actual)
485
486     def test_oddball_service_writer_count(self):
487         body = b'oddball service writer count'
488         pdh = tutil.str_keep_locator(body)
489         api_client = self.mock_keep_services(service_type='fancynewblobstore',
490                                              count=4)
491         headers = {'x-keep-replicas-stored': 3}
492         with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
493                                        **headers) as req_mock:
494             keep_client = arvados.KeepClient(api_client=api_client)
495             actual = keep_client.put(body, copies=2)
496         self.assertEqual(pdh, actual)
497         self.assertEqual(1, req_mock.call_count)
498
499
500 @tutil.skip_sleep
501 class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
502     def setUp(self):
503         self.api_client = self.mock_keep_services(count=2)
504         self.keep_client = arvados.KeepClient(api_client=self.api_client)
505         self.data = b'xyzzy'
506         self.locator = '1271ed5ef305aadabc605b1609e24c52'
507
508     @mock.patch('arvados.KeepClient.KeepService.get')
509     def test_get_request_cache(self, get_mock):
510         with tutil.mock_keep_responses(self.data, 200, 200):
511             self.keep_client.get(self.locator)
512             self.keep_client.get(self.locator)
513         # Request already cached, don't require more than one request
514         get_mock.assert_called_once()
515
516     @mock.patch('arvados.KeepClient.KeepService.get')
517     def test_head_request_cache(self, get_mock):
518         with tutil.mock_keep_responses(self.data, 200, 200):
519             self.keep_client.head(self.locator)
520             self.keep_client.head(self.locator)
521         # Don't cache HEAD requests so that they're not confused with GET reqs
522         self.assertEqual(2, get_mock.call_count)
523
524     @mock.patch('arvados.KeepClient.KeepService.get')
525     def test_head_and_then_get_return_different_responses(self, get_mock):
526         head_resp = None
527         get_resp = None
528         get_mock.side_effect = ['first response', 'second response']
529         with tutil.mock_keep_responses(self.data, 200, 200):
530             head_resp = self.keep_client.head(self.locator)
531             get_resp = self.keep_client.get(self.locator)
532         self.assertEqual('first response', head_resp)
533         # First reponse was not cached because it was from a HEAD request.
534         self.assertNotEqual(head_resp, get_resp)
535
536 @tutil.skip_sleep
537 class KeepStorageClassesTestCase(unittest.TestCase, tutil.ApiClientMock):
538     def setUp(self):
539         self.api_client = self.mock_keep_services(count=2)
540         self.keep_client = arvados.KeepClient(api_client=self.api_client)
541         self.data = b'xyzzy'
542         self.locator = '1271ed5ef305aadabc605b1609e24c52'
543
544     def test_multiple_default_storage_classes_req_header(self):
545         api_mock = self.api_client_mock()
546         api_mock.config.return_value = {
547             'StorageClasses': {
548                 'foo': { 'Default': True },
549                 'bar': { 'Default': True },
550                 'baz': { 'Default': False }
551             }
552         }
553         api_client = self.mock_keep_services(api_mock=api_mock, count=2)
554         keep_client = arvados.KeepClient(api_client=api_client)
555         resp_hdr = {
556             'x-keep-storage-classes-confirmed': 'foo=1, bar=1',
557             'x-keep-replicas-stored': 1
558         }
559         with tutil.mock_keep_responses(self.locator, 200, **resp_hdr) as mock:
560             keep_client.put(self.data, copies=1)
561             req_hdr = mock.responses[0]
562             self.assertIn(
563                 'X-Keep-Storage-Classes: bar, foo', req_hdr.getopt(pycurl.HTTPHEADER))
564
565     def test_storage_classes_req_header(self):
566         self.assertEqual(
567             self.api_client.config()['StorageClasses'],
568             {'default': {'Default': True}})
569         cases = [
570             # requested, expected
571             [['foo'], 'X-Keep-Storage-Classes: foo'],
572             [['bar', 'foo'], 'X-Keep-Storage-Classes: bar, foo'],
573             [[], 'X-Keep-Storage-Classes: default'],
574             [None, 'X-Keep-Storage-Classes: default'],
575         ]
576         for req_classes, expected_header in cases:
577             headers = {'x-keep-replicas-stored': 1}
578             if req_classes is None or len(req_classes) == 0:
579                 confirmed_hdr = 'default=1'
580             elif len(req_classes) > 0:
581                 confirmed_hdr = ', '.join(["{}=1".format(cls) for cls in req_classes])
582             headers.update({'x-keep-storage-classes-confirmed': confirmed_hdr})
583             with tutil.mock_keep_responses(self.locator, 200, **headers) as mock:
584                 self.keep_client.put(self.data, copies=1, classes=req_classes)
585                 req_hdr = mock.responses[0]
586                 self.assertIn(expected_header, req_hdr.getopt(pycurl.HTTPHEADER))
587
588     def test_partial_storage_classes_put(self):
589         headers = {
590             'x-keep-replicas-stored': 1,
591             'x-keep-storage-classes-confirmed': 'foo=1'}
592         with tutil.mock_keep_responses(self.locator, 200, 503, **headers) as mock:
593             with self.assertRaises(arvados.errors.KeepWriteError):
594                 self.keep_client.put(self.data, copies=1, classes=['foo', 'bar'])
595             # 1st request, both classes pending
596             req1_headers = mock.responses[0].getopt(pycurl.HTTPHEADER)
597             self.assertIn('X-Keep-Storage-Classes: bar, foo', req1_headers)
598             # 2nd try, 'foo' class already satisfied
599             req2_headers = mock.responses[1].getopt(pycurl.HTTPHEADER)
600             self.assertIn('X-Keep-Storage-Classes: bar', req2_headers)
601
602     def test_successful_storage_classes_put_requests(self):
603         cases = [
604             # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, expected_requests
605             [ 1, ['foo'], 1, 'foo=1', 1],
606             [ 1, ['foo'], 2, 'foo=2', 1],
607             [ 2, ['foo'], 2, 'foo=2', 1],
608             [ 2, ['foo'], 1, 'foo=1', 2],
609             [ 1, ['foo', 'bar'], 1, 'foo=1, bar=1', 1],
610             [ 1, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
611             [ 2, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
612             [ 2, ['foo', 'bar'], 1, 'foo=1, bar=1', 2],
613             [ 1, ['foo', 'bar'], 1, None, 1],
614             [ 1, ['foo'], 1, None, 1],
615             [ 2, ['foo'], 2, None, 1],
616             [ 2, ['foo'], 1, None, 2],
617         ]
618         for w_copies, w_classes, c_copies, c_classes, e_reqs in cases:
619             headers = {'x-keep-replicas-stored': c_copies}
620             if c_classes is not None:
621                 headers.update({'x-keep-storage-classes-confirmed': c_classes})
622             with tutil.mock_keep_responses(self.locator, 200, 200, **headers) as mock:
623                 case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}", expected_requests={}'.format(w_copies, ', '.join(w_classes), c_copies, c_classes, e_reqs)
624                 self.assertEqual(self.locator,
625                     self.keep_client.put(self.data, copies=w_copies, classes=w_classes),
626                     case_desc)
627                 self.assertEqual(e_reqs, mock.call_count, case_desc)
628
629     def test_failed_storage_classes_put_requests(self):
630         cases = [
631             # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, return_code
632             [ 1, ['foo'], 1, 'bar=1', 200],
633             [ 1, ['foo'], 1, None, 503],
634             [ 2, ['foo'], 1, 'bar=1, foo=0', 200],
635             [ 3, ['foo'], 1, 'bar=1, foo=1', 200],
636             [ 3, ['foo', 'bar'], 1, 'bar=2, foo=1', 200],
637         ]
638         for w_copies, w_classes, c_copies, c_classes, return_code in cases:
639             headers = {'x-keep-replicas-stored': c_copies}
640             if c_classes is not None:
641                 headers.update({'x-keep-storage-classes-confirmed': c_classes})
642             with tutil.mock_keep_responses(self.locator, return_code, return_code, **headers):
643                 case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}"'.format(w_copies, ', '.join(w_classes), c_copies, c_classes)
644                 with self.assertRaises(arvados.errors.KeepWriteError, msg=case_desc):
645                     self.keep_client.put(self.data, copies=w_copies, classes=w_classes)
646
647 @tutil.skip_sleep
648 class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock):
649     def setUp(self):
650         self.api_client = self.mock_keep_services(count=2)
651         self.keep_client = arvados.KeepClient(api_client=self.api_client)
652         self.data = b'xyzzy'
653         self.locator = '1271ed5ef305aadabc605b1609e24c52'
654         self.test_id = arvados.util.new_request_id()
655         self.assertRegex(self.test_id, r'^req-[a-z0-9]{20}$')
656         # If we don't set request_id to None explicitly here, it will
657         # return <MagicMock name='api_client_mock.request_id'
658         # id='123456789'>:
659         self.api_client.request_id = None
660
661     def test_default_to_api_client_request_id(self):
662         self.api_client.request_id = self.test_id
663         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
664             self.keep_client.put(self.data)
665         self.assertEqual(2, len(mock.responses))
666         for resp in mock.responses:
667             self.assertProvidedRequestId(resp)
668
669         with tutil.mock_keep_responses(self.data, 200) as mock:
670             self.keep_client.get(self.locator)
671         self.assertProvidedRequestId(mock.responses[0])
672
673         with tutil.mock_keep_responses(b'', 200) as mock:
674             self.keep_client.head(self.locator)
675         self.assertProvidedRequestId(mock.responses[0])
676
677     def test_explicit_request_id(self):
678         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
679             self.keep_client.put(self.data, request_id=self.test_id)
680         self.assertEqual(2, len(mock.responses))
681         for resp in mock.responses:
682             self.assertProvidedRequestId(resp)
683
684         with tutil.mock_keep_responses(self.data, 200) as mock:
685             self.keep_client.get(self.locator, request_id=self.test_id)
686         self.assertProvidedRequestId(mock.responses[0])
687
688         with tutil.mock_keep_responses(b'', 200) as mock:
689             self.keep_client.head(self.locator, request_id=self.test_id)
690         self.assertProvidedRequestId(mock.responses[0])
691
692     def test_automatic_request_id(self):
693         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
694             self.keep_client.put(self.data)
695         self.assertEqual(2, len(mock.responses))
696         for resp in mock.responses:
697             self.assertAutomaticRequestId(resp)
698
699         with tutil.mock_keep_responses(self.data, 200) as mock:
700             self.keep_client.get(self.locator)
701         self.assertAutomaticRequestId(mock.responses[0])
702
703         with tutil.mock_keep_responses(b'', 200) as mock:
704             self.keep_client.head(self.locator)
705         self.assertAutomaticRequestId(mock.responses[0])
706
707     def test_request_id_in_exception(self):
708         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
709             with self.assertRaisesRegex(arvados.errors.KeepReadError, self.test_id):
710                 self.keep_client.head(self.locator, request_id=self.test_id)
711
712         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
713             with self.assertRaisesRegex(arvados.errors.KeepReadError, r'req-[a-z0-9]{20}'):
714                 self.keep_client.get(self.locator)
715
716         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
717             with self.assertRaisesRegex(arvados.errors.KeepWriteError, self.test_id):
718                 self.keep_client.put(self.data, request_id=self.test_id)
719
720         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
721             with self.assertRaisesRegex(arvados.errors.KeepWriteError, r'req-[a-z0-9]{20}'):
722                 self.keep_client.put(self.data)
723
724     def assertAutomaticRequestId(self, resp):
725         hdr = [x for x in resp.getopt(pycurl.HTTPHEADER)
726                if x.startswith('X-Request-Id: ')][0]
727         self.assertNotEqual(hdr, 'X-Request-Id: '+self.test_id)
728         self.assertRegex(hdr, r'^X-Request-Id: req-[a-z0-9]{20}$')
729
730     def assertProvidedRequestId(self, resp):
731         self.assertIn('X-Request-Id: '+self.test_id,
732                       resp.getopt(pycurl.HTTPHEADER))
733
734
735 @tutil.skip_sleep
736 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
737
738     def setUp(self):
739         # expected_order[i] is the probe order for
740         # hash=md5(sprintf("%064x",i)) where there are 16 services
741         # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
742         # the first probe for the block consisting of 64 "0"
743         # characters is the service whose uuid is
744         # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
745         self.services = 16
746         self.expected_order = [
747             list('3eab2d5fc9681074'),
748             list('097dba52e648f1c3'),
749             list('c5b4e023f8a7d691'),
750             list('9d81c02e76a3bf54'),
751             ]
752         self.blocks = [
753             "{:064x}".format(x).encode()
754             for x in range(len(self.expected_order))]
755         self.hashes = [
756             hashlib.md5(self.blocks[x]).hexdigest()
757             for x in range(len(self.expected_order))]
758         self.api_client = self.mock_keep_services(count=self.services)
759         self.keep_client = arvados.KeepClient(api_client=self.api_client)
760
761     def test_weighted_service_roots_against_reference_set(self):
762         # Confirm weighted_service_roots() returns the correct order
763         for i, hash in enumerate(self.hashes):
764             roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash))
765             got_order = [
766                 re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
767                 for root in roots]
768             self.assertEqual(self.expected_order[i], got_order)
769
770     def test_get_probe_order_against_reference_set(self):
771         self._test_probe_order_against_reference_set(
772             lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
773
774     def test_head_probe_order_against_reference_set(self):
775         self._test_probe_order_against_reference_set(
776             lambda i: self.keep_client.head(self.hashes[i], num_retries=1))
777
778     def test_put_probe_order_against_reference_set(self):
779         # copies=1 prevents the test from being sensitive to races
780         # between writer threads.
781         self._test_probe_order_against_reference_set(
782             lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1))
783
784     def _test_probe_order_against_reference_set(self, op):
785         for i in range(len(self.blocks)):
786             with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \
787                  self.assertRaises(arvados.errors.KeepRequestError):
788                 op(i)
789             got_order = [
790                 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
791                 for resp in mock.responses]
792             self.assertEqual(self.expected_order[i]*2, got_order)
793
794     def test_put_probe_order_multiple_copies(self):
795         for copies in range(2, 4):
796             for i in range(len(self.blocks)):
797                 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \
798                      self.assertRaises(arvados.errors.KeepWriteError):
799                     self.keep_client.put(self.blocks[i], num_retries=2, copies=copies)
800                 got_order = [
801                     re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
802                     for resp in mock.responses]
803                 # With T threads racing to make requests, the position
804                 # of a given server in the sequence of HTTP requests
805                 # (got_order) cannot be more than T-1 positions
806                 # earlier than that server's position in the reference
807                 # probe sequence (expected_order).
808                 #
809                 # Loop invariant: we have accounted for +pos+ expected
810                 # probes, either by seeing them in +got_order+ or by
811                 # putting them in +pending+ in the hope of seeing them
812                 # later. As long as +len(pending)<T+, we haven't
813                 # started a request too early.
814                 pending = []
815                 for pos, expected in enumerate(self.expected_order[i]*3):
816                     got = got_order[pos-len(pending)]
817                     while got in pending:
818                         del pending[pending.index(got)]
819                         got = got_order[pos-len(pending)]
820                     if got != expected:
821                         pending.append(expected)
822                         self.assertLess(
823                             len(pending), copies,
824                             "pending={}, with copies={}, got {}, expected {}".format(
825                                 pending, copies, repr(got_order), repr(self.expected_order[i]*3)))
826
827     def test_probe_waste_adding_one_server(self):
828         hashes = [
829             hashlib.md5("{:064x}".format(x).encode()).hexdigest() for x in range(100)]
830         initial_services = 12
831         self.api_client = self.mock_keep_services(count=initial_services)
832         self.keep_client = arvados.KeepClient(api_client=self.api_client)
833         probes_before = [
834             self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
835         for added_services in range(1, 12):
836             api_client = self.mock_keep_services(count=initial_services+added_services)
837             keep_client = arvados.KeepClient(api_client=api_client)
838             total_penalty = 0
839             for hash_index in range(len(hashes)):
840                 probe_after = keep_client.weighted_service_roots(
841                     arvados.KeepLocator(hashes[hash_index]))
842                 penalty = probe_after.index(probes_before[hash_index][0])
843                 self.assertLessEqual(penalty, added_services)
844                 total_penalty += penalty
845             # Average penalty per block should not exceed
846             # N(added)/N(orig) by more than 20%, and should get closer
847             # to the ideal as we add data points.
848             expect_penalty = (
849                 added_services *
850                 len(hashes) / initial_services)
851             max_penalty = (
852                 expect_penalty *
853                 (120 - added_services)/100)
854             min_penalty = (
855                 expect_penalty * 8/10)
856             self.assertTrue(
857                 min_penalty <= total_penalty <= max_penalty,
858                 "With {}+{} services, {} blocks, penalty {} but expected {}..{}".format(
859                     initial_services,
860                     added_services,
861                     len(hashes),
862                     total_penalty,
863                     min_penalty,
864                     max_penalty))
865
866     def check_64_zeros_error_order(self, verb, exc_class):
867         data = b'0' * 64
868         if verb == 'get':
869             data = tutil.str_keep_locator(data)
870         # Arbitrary port number:
871         aport = random.randint(1024,65535)
872         api_client = self.mock_keep_services(service_port=aport, count=self.services)
873         keep_client = arvados.KeepClient(api_client=api_client)
874         with mock.patch('pycurl.Curl') as curl_mock, \
875              self.assertRaises(exc_class) as err_check:
876             curl_mock.return_value = tutil.FakeCurl.make(code=500, body=b'')
877             getattr(keep_client, verb)(data)
878         urls = [urllib.parse.urlparse(url)
879                 for url in err_check.exception.request_errors()]
880         self.assertEqual([('keep0x' + c, aport) for c in '3eab2d5fc9681074'],
881                          [(url.hostname, url.port) for url in urls])
882
883     def test_get_error_shows_probe_order(self):
884         self.check_64_zeros_error_order('get', arvados.errors.KeepReadError)
885
886     def test_put_error_shows_probe_order(self):
887         self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
888
889
890 class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase):
891     # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
892     # 1s worth of data and then trigger bandwidth errors before running
893     # out of data.
894     DATA = b'x'*2**11
895     BANDWIDTH_LOW_LIM = 1024
896     TIMEOUT_TIME = 1.0
897
898     class assertTakesBetween(unittest.TestCase):
899         def __init__(self, tmin, tmax):
900             self.tmin = tmin
901             self.tmax = tmax
902
903         def __enter__(self):
904             self.t0 = time.time()
905
906         def __exit__(self, *args, **kwargs):
907             # Round times to milliseconds, like CURL. Otherwise, we
908             # fail when CURL reaches a 1s timeout at 0.9998s.
909             delta = round(time.time() - self.t0, 3)
910             self.assertGreaterEqual(delta, self.tmin)
911             self.assertLessEqual(delta, self.tmax)
912
913     class assertTakesGreater(unittest.TestCase):
914         def __init__(self, tmin):
915             self.tmin = tmin
916
917         def __enter__(self):
918             self.t0 = time.time()
919
920         def __exit__(self, *args, **kwargs):
921             delta = round(time.time() - self.t0, 3)
922             self.assertGreaterEqual(delta, self.tmin)
923
924     def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
925         return arvados.KeepClient(
926             api_client=self.api_client,
927             timeout=timeouts)
928
929     def test_timeout_slow_connect(self):
930         # Can't simulate TCP delays with our own socket. Leave our
931         # stub server running uselessly, and try to connect to an
932         # unroutable IP address instead.
933         self.api_client = self.mock_keep_services(
934             count=1,
935             service_host='240.0.0.0',
936         )
937         with self.assertTakesBetween(0.1, 0.5):
938             with self.assertRaises(arvados.errors.KeepWriteError):
939                 self.keepClient().put(self.DATA, copies=1, num_retries=0)
940
941     def test_low_bandwidth_no_delays_success(self):
942         self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
943         kc = self.keepClient()
944         loc = kc.put(self.DATA, copies=1, num_retries=0)
945         self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
946
947     def test_too_low_bandwidth_no_delays_failure(self):
948         # Check that lessening bandwidth corresponds to failing
949         kc = self.keepClient()
950         loc = kc.put(self.DATA, copies=1, num_retries=0)
951         self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
952         with self.assertTakesGreater(self.TIMEOUT_TIME):
953             with self.assertRaises(arvados.errors.KeepReadError):
954                 kc.get(loc, num_retries=0)
955         with self.assertTakesGreater(self.TIMEOUT_TIME):
956             with self.assertRaises(arvados.errors.KeepWriteError):
957                 kc.put(self.DATA, copies=1, num_retries=0)
958
959     def test_low_bandwidth_with_server_response_delay_failure(self):
960         kc = self.keepClient()
961         loc = kc.put(self.DATA, copies=1, num_retries=0)
962         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
963         # Note the actual delay must be 1s longer than the low speed
964         # limit interval in order for curl to detect it reliably.
965         self.server.setdelays(response=self.TIMEOUT_TIME+1)
966         with self.assertTakesGreater(self.TIMEOUT_TIME):
967             with self.assertRaises(arvados.errors.KeepReadError):
968                 kc.get(loc, num_retries=0)
969         with self.assertTakesGreater(self.TIMEOUT_TIME):
970             with self.assertRaises(arvados.errors.KeepWriteError):
971                 kc.put(self.DATA, copies=1, num_retries=0)
972         with self.assertTakesGreater(self.TIMEOUT_TIME):
973             kc.head(loc, num_retries=0)
974
975     def test_low_bandwidth_with_server_mid_delay_failure(self):
976         kc = self.keepClient()
977         loc = kc.put(self.DATA, copies=1, num_retries=0)
978         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
979         # Note the actual delay must be 1s longer than the low speed
980         # limit interval in order for curl to detect it reliably.
981         self.server.setdelays(mid_write=self.TIMEOUT_TIME+1, mid_read=self.TIMEOUT_TIME+1)
982         with self.assertTakesGreater(self.TIMEOUT_TIME):
983             with self.assertRaises(arvados.errors.KeepReadError) as e:
984                 kc.get(loc, num_retries=0)
985         with self.assertTakesGreater(self.TIMEOUT_TIME):
986             with self.assertRaises(arvados.errors.KeepWriteError):
987                 kc.put(self.DATA, copies=1, num_retries=0)
988
989     def test_timeout_slow_request(self):
990         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
991         self.server.setdelays(request=.2)
992         self._test_connect_timeout_under_200ms(loc)
993         self.server.setdelays(request=2)
994         self._test_response_timeout_under_2s(loc)
995
996     def test_timeout_slow_response(self):
997         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
998         self.server.setdelays(response=.2)
999         self._test_connect_timeout_under_200ms(loc)
1000         self.server.setdelays(response=2)
1001         self._test_response_timeout_under_2s(loc)
1002
1003     def test_timeout_slow_response_body(self):
1004         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
1005         self.server.setdelays(response_body=.2)
1006         self._test_connect_timeout_under_200ms(loc)
1007         self.server.setdelays(response_body=2)
1008         self._test_response_timeout_under_2s(loc)
1009
1010     def _test_connect_timeout_under_200ms(self, loc):
1011         # Allow 100ms to connect, then 1s for response. Everything
1012         # should work, and everything should take at least 200ms to
1013         # return.
1014         kc = self.keepClient(timeouts=(.1, 1))
1015         with self.assertTakesBetween(.2, .3):
1016             kc.put(self.DATA, copies=1, num_retries=0)
1017         with self.assertTakesBetween(.2, .3):
1018             self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
1019
1020     def _test_response_timeout_under_2s(self, loc):
1021         # Allow 10s to connect, then 1s for response. Nothing should
1022         # work, and everything should take at least 1s to return.
1023         kc = self.keepClient(timeouts=(10, 1))
1024         with self.assertTakesBetween(1, 9):
1025             with self.assertRaises(arvados.errors.KeepReadError):
1026                 kc.get(loc, num_retries=0)
1027         with self.assertTakesBetween(1, 9):
1028             with self.assertRaises(arvados.errors.KeepWriteError):
1029                 kc.put(self.DATA, copies=1, num_retries=0)
1030
1031
1032 class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
1033     def mock_disks_and_gateways(self, disks=3, gateways=1):
1034         self.gateways = [{
1035                 'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
1036                 'owner_uuid': 'zzzzz-tpzed-000000000000000',
1037                 'service_host': 'gatewayhost{}'.format(i),
1038                 'service_port': 12345,
1039                 'service_ssl_flag': True,
1040                 'service_type': 'gateway:test',
1041         } for i in range(gateways)]
1042         self.gateway_roots = [
1043             "https://{service_host}:{service_port}/".format(**gw)
1044             for gw in self.gateways]
1045         self.api_client = self.mock_keep_services(
1046             count=disks, additional_services=self.gateways)
1047         self.keepClient = arvados.KeepClient(api_client=self.api_client)
1048
1049     @mock.patch('pycurl.Curl')
1050     def test_get_with_gateway_hint_first(self, MockCurl):
1051         MockCurl.return_value = tutil.FakeCurl.make(
1052             code=200, body='foo', headers={'Content-Length': 3})
1053         self.mock_disks_and_gateways()
1054         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@' + self.gateways[0]['uuid']
1055         self.assertEqual(b'foo', self.keepClient.get(locator))
1056         self.assertEqual(self.gateway_roots[0]+locator,
1057                          MockCurl.return_value.getopt(pycurl.URL).decode())
1058         self.assertEqual(True, self.keepClient.head(locator))
1059
1060     @mock.patch('pycurl.Curl')
1061     def test_get_with_gateway_hints_in_order(self, MockCurl):
1062         gateways = 4
1063         disks = 3
1064         mocks = [
1065             tutil.FakeCurl.make(code=404, body='')
1066             for _ in range(gateways+disks)
1067         ]
1068         MockCurl.side_effect = tutil.queue_with(mocks)
1069         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1070         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1071                            ['K@'+gw['uuid'] for gw in self.gateways])
1072         with self.assertRaises(arvados.errors.NotFoundError):
1073             self.keepClient.get(locator)
1074         # Gateways are tried first, in the order given.
1075         for i, root in enumerate(self.gateway_roots):
1076             self.assertEqual(root+locator,
1077                              mocks[i].getopt(pycurl.URL).decode())
1078         # Disk services are tried next.
1079         for i in range(gateways, gateways+disks):
1080             self.assertRegex(
1081                 mocks[i].getopt(pycurl.URL).decode(),
1082                 r'keep0x')
1083
1084     @mock.patch('pycurl.Curl')
1085     def test_head_with_gateway_hints_in_order(self, MockCurl):
1086         gateways = 4
1087         disks = 3
1088         mocks = [
1089             tutil.FakeCurl.make(code=404, body=b'')
1090             for _ in range(gateways+disks)
1091         ]
1092         MockCurl.side_effect = tutil.queue_with(mocks)
1093         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1094         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1095                            ['K@'+gw['uuid'] for gw in self.gateways])
1096         with self.assertRaises(arvados.errors.NotFoundError):
1097             self.keepClient.head(locator)
1098         # Gateways are tried first, in the order given.
1099         for i, root in enumerate(self.gateway_roots):
1100             self.assertEqual(root+locator,
1101                              mocks[i].getopt(pycurl.URL).decode())
1102         # Disk services are tried next.
1103         for i in range(gateways, gateways+disks):
1104             self.assertRegex(
1105                 mocks[i].getopt(pycurl.URL).decode(),
1106                 r'keep0x')
1107
1108     @mock.patch('pycurl.Curl')
1109     def test_get_with_remote_proxy_hint(self, MockCurl):
1110         MockCurl.return_value = tutil.FakeCurl.make(
1111             code=200, body=b'foo', headers={'Content-Length': 3})
1112         self.mock_disks_and_gateways()
1113         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1114         self.assertEqual(b'foo', self.keepClient.get(locator))
1115         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1116                          MockCurl.return_value.getopt(pycurl.URL).decode())
1117
1118     @mock.patch('pycurl.Curl')
1119     def test_head_with_remote_proxy_hint(self, MockCurl):
1120         MockCurl.return_value = tutil.FakeCurl.make(
1121             code=200, body=b'foo', headers={'Content-Length': 3})
1122         self.mock_disks_and_gateways()
1123         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1124         self.assertEqual(True, self.keepClient.head(locator))
1125         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1126                          MockCurl.return_value.getopt(pycurl.URL).decode())
1127
1128
1129 class KeepClientRetryTestMixin(object):
1130     # Testing with a local Keep store won't exercise the retry behavior.
1131     # Instead, our strategy is:
1132     # * Create a client with one proxy specified (pointed at a black
1133     #   hole), so there's no need to instantiate an API client, and
1134     #   all HTTP requests come from one place.
1135     # * Mock httplib's request method to provide simulated responses.
1136     # This lets us test the retry logic extensively without relying on any
1137     # supporting servers, and prevents side effects in case something hiccups.
1138     # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
1139     # run_method().
1140     #
1141     # Test classes must define TEST_PATCHER to a method that mocks
1142     # out appropriate methods in the client.
1143
1144     PROXY_ADDR = 'http://[%s]:65535/' % (tutil.TEST_HOST,)
1145     TEST_DATA = b'testdata'
1146     TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
1147
1148     def setUp(self):
1149         self.client_kwargs = {'proxy': self.PROXY_ADDR, 'local_store': ''}
1150
1151     def new_client(self, **caller_kwargs):
1152         kwargs = self.client_kwargs.copy()
1153         kwargs.update(caller_kwargs)
1154         return arvados.KeepClient(**kwargs)
1155
1156     def run_method(self, *args, **kwargs):
1157         raise NotImplementedError("test subclasses must define run_method")
1158
1159     def check_success(self, expected=None, *args, **kwargs):
1160         if expected is None:
1161             expected = self.DEFAULT_EXPECT
1162         self.assertEqual(expected, self.run_method(*args, **kwargs))
1163
1164     def check_exception(self, error_class=None, *args, **kwargs):
1165         if error_class is None:
1166             error_class = self.DEFAULT_EXCEPTION
1167         with self.assertRaises(error_class) as err:
1168             self.run_method(*args, **kwargs)
1169         return err
1170
1171     def test_immediate_success(self):
1172         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 200):
1173             self.check_success()
1174
1175     def test_retry_then_success(self):
1176         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1177             self.check_success(num_retries=3)
1178
1179     def test_exception_then_success(self):
1180         with self.TEST_PATCHER(self.DEFAULT_EXPECT, Exception('mock err'), 200):
1181             self.check_success(num_retries=3)
1182
1183     def test_no_default_retry(self):
1184         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1185             self.check_exception()
1186
1187     def test_no_retry_after_permanent_error(self):
1188         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 403, 200):
1189             self.check_exception(num_retries=3)
1190
1191     def test_error_after_retries_exhausted(self):
1192         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 500, 200):
1193             err = self.check_exception(num_retries=1)
1194         self.assertRegex(str(err.exception), r'failed to .* after 2 attempts')
1195
1196     def test_num_retries_instance_fallback(self):
1197         self.client_kwargs['num_retries'] = 3
1198         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1199             self.check_success()
1200
1201
1202 @tutil.skip_sleep
1203 class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1204     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
1205     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1206     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1207     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1208
1209     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1210                    *args, **kwargs):
1211         return self.new_client().get(locator, *args, **kwargs)
1212
1213     def test_specific_exception_when_not_found(self):
1214         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1215             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1216
1217     def test_general_exception_with_mixed_errors(self):
1218         # get should raise a NotFoundError if no server returns the block,
1219         # and a high threshold of servers report that it's not found.
1220         # This test rigs up 50/50 disagreement between two servers, and
1221         # checks that it does not become a NotFoundError.
1222         client = self.new_client()
1223         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1224             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1225                 client.get(self.HINTED_LOCATOR)
1226             self.assertNotIsInstance(
1227                 exc_check.exception, arvados.errors.NotFoundError,
1228                 "mixed errors raised NotFoundError")
1229
1230     def test_hint_server_can_succeed_without_retries(self):
1231         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1232             self.check_success(locator=self.HINTED_LOCATOR)
1233
1234     def test_try_next_server_after_timeout(self):
1235         with tutil.mock_keep_responses(
1236                 (socket.timeout("timed out"), 200),
1237                 (self.DEFAULT_EXPECT, 200)):
1238             self.check_success(locator=self.HINTED_LOCATOR)
1239
1240     def test_retry_data_with_wrong_checksum(self):
1241         with tutil.mock_keep_responses(
1242                 ('baddata', 200),
1243                 (self.DEFAULT_EXPECT, 200)):
1244             self.check_success(locator=self.HINTED_LOCATOR)
1245
1246 @tutil.skip_sleep
1247 class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1248     DEFAULT_EXPECT = True
1249     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1250     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1251     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1252
1253     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1254                    *args, **kwargs):
1255         return self.new_client().head(locator, *args, **kwargs)
1256
1257     def test_specific_exception_when_not_found(self):
1258         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1259             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1260
1261     def test_general_exception_with_mixed_errors(self):
1262         # head should raise a NotFoundError if no server returns the block,
1263         # and a high threshold of servers report that it's not found.
1264         # This test rigs up 50/50 disagreement between two servers, and
1265         # checks that it does not become a NotFoundError.
1266         client = self.new_client()
1267         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1268             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1269                 client.head(self.HINTED_LOCATOR)
1270             self.assertNotIsInstance(
1271                 exc_check.exception, arvados.errors.NotFoundError,
1272                 "mixed errors raised NotFoundError")
1273
1274     def test_hint_server_can_succeed_without_retries(self):
1275         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1276             self.check_success(locator=self.HINTED_LOCATOR)
1277
1278     def test_try_next_server_after_timeout(self):
1279         with tutil.mock_keep_responses(
1280                 (socket.timeout("timed out"), 200),
1281                 (self.DEFAULT_EXPECT, 200)):
1282             self.check_success(locator=self.HINTED_LOCATOR)
1283
1284 @tutil.skip_sleep
1285 class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1286     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
1287     DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
1288     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1289
1290     def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
1291                    copies=1, *args, **kwargs):
1292         return self.new_client().put(data, copies, *args, **kwargs)
1293
1294     def test_do_not_send_multiple_copies_to_same_server(self):
1295         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200):
1296             self.check_exception(copies=2, num_retries=3)
1297
1298
1299 class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
1300
1301     class FakeKeepService(object):
1302         def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
1303             self.delay = delay
1304             self.will_succeed = will_succeed
1305             self.will_raise = will_raise
1306             self._result = {}
1307             self._result['headers'] = {}
1308             self._result['headers']['x-keep-replicas-stored'] = str(replicas)
1309             self._result['headers']['x-keep-storage-classes-confirmed'] = 'default={}'.format(replicas)
1310             self._result['body'] = 'foobar'
1311
1312         def put(self, data_hash, data, timeout, headers):
1313             time.sleep(self.delay)
1314             if self.will_raise is not None:
1315                 raise self.will_raise
1316             return self.will_succeed
1317
1318         def last_result(self):
1319             if self.will_succeed:
1320                 return self._result
1321             else:
1322                 return {"status_code": 500, "body": "didn't succeed"}
1323
1324         def finished(self):
1325             return False
1326
1327     def setUp(self):
1328         self.copies = 3
1329         self.pool = arvados.KeepClient.KeepWriterThreadPool(
1330             data = 'foo',
1331             data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1332             max_service_replicas = self.copies,
1333             copies = self.copies
1334         )
1335
1336     def test_only_write_enough_on_success(self):
1337         for i in range(10):
1338             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1339             self.pool.add_task(ks, None)
1340         self.pool.join()
1341         self.assertEqual(self.pool.done(), (self.copies, []))
1342
1343     def test_only_write_enough_on_partial_success(self):
1344         for i in range(5):
1345             ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
1346             self.pool.add_task(ks, None)
1347             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1348             self.pool.add_task(ks, None)
1349         self.pool.join()
1350         self.assertEqual(self.pool.done(), (self.copies, []))
1351
1352     def test_only_write_enough_when_some_crash(self):
1353         for i in range(5):
1354             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1355             self.pool.add_task(ks, None)
1356             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1357             self.pool.add_task(ks, None)
1358         self.pool.join()
1359         self.assertEqual(self.pool.done(), (self.copies, []))
1360
1361     def test_fail_when_too_many_crash(self):
1362         for i in range(self.copies+1):
1363             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1364             self.pool.add_task(ks, None)
1365         for i in range(self.copies-1):
1366             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1367             self.pool.add_task(ks, None)
1368         self.pool.join()
1369         self.assertEqual(self.pool.done(), (self.copies-1, []))
1370
1371
1372 @tutil.skip_sleep
1373 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock):
1374     # Test put()s that need two distinct servers to succeed, possibly
1375     # requiring multiple passes through the retry loop.
1376
1377     def setUp(self):
1378         self.api_client = self.mock_keep_services(count=2)
1379         self.keep_client = arvados.KeepClient(api_client=self.api_client)
1380
1381     def test_success_after_exception(self):
1382         with tutil.mock_keep_responses(
1383                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1384                 Exception('mock err'), 200, 200) as req_mock:
1385             self.keep_client.put('foo', num_retries=1, copies=2)
1386         self.assertEqual(3, req_mock.call_count)
1387
1388     def test_success_after_retryable_error(self):
1389         with tutil.mock_keep_responses(
1390                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1391                 500, 200, 200) as req_mock:
1392             self.keep_client.put('foo', num_retries=1, copies=2)
1393         self.assertEqual(3, req_mock.call_count)
1394
1395     def test_fail_after_final_error(self):
1396         # First retry loop gets a 200 (can't achieve replication by
1397         # storing again on that server) and a 400 (can't retry that
1398         # server at all), so we shouldn't try a third request.
1399         with tutil.mock_keep_responses(
1400                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1401                 200, 400, 200) as req_mock:
1402             with self.assertRaises(arvados.errors.KeepWriteError):
1403                 self.keep_client.put('foo', num_retries=1, copies=2)
1404         self.assertEqual(2, req_mock.call_count)
1405
1406 class KeepClientAPIErrorTest(unittest.TestCase):
1407     def test_api_fail(self):
1408         class ApiMock(object):
1409             def __getattr__(self, r):
1410                 if r == "api_token":
1411                     return "abc"
1412                 elif r == "insecure":
1413                     return False
1414                 elif r == "config":
1415                     return lambda: {}
1416                 else:
1417                     raise arvados.errors.KeepReadError()
1418         keep_client = arvados.KeepClient(api_client=ApiMock(),
1419                                              proxy='', local_store='')
1420
1421         # The bug this is testing for is that if an API (not
1422         # keepstore) exception is thrown as part of a get(), the next
1423         # attempt to get that same block will result in a deadlock.
1424         # This is why there are two get()s in a row.  Unfortunately,
1425         # the failure mode for this test is that the test suite
1426         # deadlocks, there isn't a good way to avoid that without
1427         # adding a special case that has no use except for this test.
1428
1429         with self.assertRaises(arvados.errors.KeepReadError):
1430             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1431         with self.assertRaises(arvados.errors.KeepReadError):
1432             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")