17465: Adds tests for class storage support.
[arvados.git] / sdk / python / tests / test_keep_client.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 standard_library.install_aliases()
9 from builtins import str
10 from builtins import range
11 from builtins import object
12 import hashlib
13 import mock
14 import os
15 import pycurl
16 import random
17 import re
18 import socket
19 import sys
20 import time
21 import unittest
22 import urllib.parse
23
24 import arvados
25 import arvados.retry
26 import arvados.util
27 from . import arvados_testutil as tutil
28 from . import keepstub
29 from . import run_test_server
30
31 class KeepTestCase(run_test_server.TestCaseWithServers):
32     MAIN_SERVER = {}
33     KEEP_SERVER = {}
34
35     @classmethod
36     def setUpClass(cls):
37         super(KeepTestCase, cls).setUpClass()
38         run_test_server.authorize_with("admin")
39         cls.api_client = arvados.api('v1')
40         cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
41                                              proxy='', local_store='')
42
43     def test_KeepBasicRWTest(self):
44         self.assertEqual(0, self.keep_client.upload_counter.get())
45         foo_locator = self.keep_client.put('foo')
46         self.assertRegex(
47             foo_locator,
48             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
49             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
50
51         # 6 bytes because uploaded 2 copies
52         self.assertEqual(6, self.keep_client.upload_counter.get())
53
54         self.assertEqual(0, self.keep_client.download_counter.get())
55         self.assertEqual(self.keep_client.get(foo_locator),
56                          b'foo',
57                          'wrong content from Keep.get(md5("foo"))')
58         self.assertEqual(3, self.keep_client.download_counter.get())
59
60     def test_KeepBinaryRWTest(self):
61         blob_str = b'\xff\xfe\xf7\x00\x01\x02'
62         blob_locator = self.keep_client.put(blob_str)
63         self.assertRegex(
64             blob_locator,
65             '^7fc7c53b45e53926ba52821140fef396\+6',
66             ('wrong locator from Keep.put(<binarydata>):' + blob_locator))
67         self.assertEqual(self.keep_client.get(blob_locator),
68                          blob_str,
69                          'wrong content from Keep.get(md5(<binarydata>))')
70
71     def test_KeepLongBinaryRWTest(self):
72         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
73         for i in range(0, 23):
74             blob_data = blob_data + blob_data
75         blob_locator = self.keep_client.put(blob_data)
76         self.assertRegex(
77             blob_locator,
78             '^84d90fc0d8175dd5dcfab04b999bc956\+67108864',
79             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
80         self.assertEqual(self.keep_client.get(blob_locator),
81                          blob_data,
82                          'wrong content from Keep.get(md5(<binarydata>))')
83
84     @unittest.skip("unreliable test - please fix and close #8752")
85     def test_KeepSingleCopyRWTest(self):
86         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
87         blob_locator = self.keep_client.put(blob_data, copies=1)
88         self.assertRegex(
89             blob_locator,
90             '^c902006bc98a3eb4a3663b65ab4a6fab\+8',
91             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
92         self.assertEqual(self.keep_client.get(blob_locator),
93                          blob_data,
94                          'wrong content from Keep.get(md5(<binarydata>))')
95
96     def test_KeepEmptyCollectionTest(self):
97         blob_locator = self.keep_client.put('', copies=1)
98         self.assertRegex(
99             blob_locator,
100             '^d41d8cd98f00b204e9800998ecf8427e\+0',
101             ('wrong locator from Keep.put(""): ' + blob_locator))
102
103     def test_unicode_must_be_ascii(self):
104         # If unicode type, must only consist of valid ASCII
105         foo_locator = self.keep_client.put(u'foo')
106         self.assertRegex(
107             foo_locator,
108             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
109             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
110
111         if sys.version_info < (3, 0):
112             with self.assertRaises(UnicodeEncodeError):
113                 # Error if it is not ASCII
114                 self.keep_client.put(u'\xe2')
115
116         with self.assertRaises(AttributeError):
117             # Must be bytes or have an encode() method
118             self.keep_client.put({})
119
120     def test_KeepHeadTest(self):
121         locator = self.keep_client.put('test_head')
122         self.assertRegex(
123             locator,
124             '^b9a772c7049325feb7130fff1f8333e9\+9',
125             'wrong md5 hash from Keep.put for "test_head": ' + locator)
126         self.assertEqual(True, self.keep_client.head(locator))
127         self.assertEqual(self.keep_client.get(locator),
128                          b'test_head',
129                          'wrong content from Keep.get for "test_head"')
130
131 class KeepPermissionTestCase(run_test_server.TestCaseWithServers):
132     MAIN_SERVER = {}
133     KEEP_SERVER = {'blob_signing': True}
134
135     def test_KeepBasicRWTest(self):
136         run_test_server.authorize_with('active')
137         keep_client = arvados.KeepClient()
138         foo_locator = keep_client.put('foo')
139         self.assertRegex(
140             foo_locator,
141             r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
142             'invalid locator from Keep.put("foo"): ' + foo_locator)
143         self.assertEqual(keep_client.get(foo_locator),
144                          b'foo',
145                          'wrong content from Keep.get(md5("foo"))')
146
147         # GET with an unsigned locator => NotFound
148         bar_locator = keep_client.put('bar')
149         unsigned_bar_locator = "37b51d194a7513e45b56f6524f2d51f2+3"
150         self.assertRegex(
151             bar_locator,
152             r'^37b51d194a7513e45b56f6524f2d51f2\+3\+A[a-f0-9]+@[a-f0-9]+$',
153             'invalid locator from Keep.put("bar"): ' + bar_locator)
154         self.assertRaises(arvados.errors.NotFoundError,
155                           keep_client.get,
156                           unsigned_bar_locator)
157
158         # GET from a different user => NotFound
159         run_test_server.authorize_with('spectator')
160         self.assertRaises(arvados.errors.NotFoundError,
161                           arvados.Keep.get,
162                           bar_locator)
163
164         # Unauthenticated GET for a signed locator => NotFound
165         # Unauthenticated GET for an unsigned locator => NotFound
166         keep_client.api_token = ''
167         self.assertRaises(arvados.errors.NotFoundError,
168                           keep_client.get,
169                           bar_locator)
170         self.assertRaises(arvados.errors.NotFoundError,
171                           keep_client.get,
172                           unsigned_bar_locator)
173
174
175 class KeepProxyTestCase(run_test_server.TestCaseWithServers):
176     MAIN_SERVER = {}
177     KEEP_SERVER = {}
178     KEEP_PROXY_SERVER = {}
179
180     @classmethod
181     def setUpClass(cls):
182         super(KeepProxyTestCase, cls).setUpClass()
183         run_test_server.authorize_with('active')
184         cls.api_client = arvados.api('v1')
185
186     def tearDown(self):
187         arvados.config.settings().pop('ARVADOS_EXTERNAL_CLIENT', None)
188         super(KeepProxyTestCase, self).tearDown()
189
190     def test_KeepProxyTest1(self):
191         # Will use ARVADOS_KEEP_SERVICES environment variable that
192         # is set by setUpClass().
193         keep_client = arvados.KeepClient(api_client=self.api_client,
194                                          local_store='')
195         baz_locator = keep_client.put('baz')
196         self.assertRegex(
197             baz_locator,
198             '^73feffa4b7f6bb68e44cf984c85f6e88\+3',
199             'wrong md5 hash from Keep.put("baz"): ' + baz_locator)
200         self.assertEqual(keep_client.get(baz_locator),
201                          b'baz',
202                          'wrong content from Keep.get(md5("baz"))')
203         self.assertTrue(keep_client.using_proxy)
204
205     def test_KeepProxyTest2(self):
206         # Don't instantiate the proxy directly, but set the X-External-Client
207         # header.  The API server should direct us to the proxy.
208         arvados.config.settings()['ARVADOS_EXTERNAL_CLIENT'] = 'true'
209         keep_client = arvados.KeepClient(api_client=self.api_client,
210                                          proxy='', local_store='')
211         baz_locator = keep_client.put('baz2')
212         self.assertRegex(
213             baz_locator,
214             '^91f372a266fe2bf2823cb8ec7fda31ce\+4',
215             'wrong md5 hash from Keep.put("baz2"): ' + baz_locator)
216         self.assertEqual(keep_client.get(baz_locator),
217                          b'baz2',
218                          'wrong content from Keep.get(md5("baz2"))')
219         self.assertTrue(keep_client.using_proxy)
220
221     def test_KeepProxyTestMultipleURIs(self):
222         # Test using ARVADOS_KEEP_SERVICES env var overriding any
223         # existing proxy setting and setting multiple proxies
224         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
225         keep_client = arvados.KeepClient(api_client=self.api_client,
226                                          local_store='')
227         uris = [x['_service_root'] for x in keep_client._keep_services]
228         self.assertEqual(uris, ['http://10.0.0.1/',
229                                 'https://foo.example.org:1234/'])
230
231     def test_KeepProxyTestInvalidURI(self):
232         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org'
233         with self.assertRaises(arvados.errors.ArgumentError):
234             keep_client = arvados.KeepClient(api_client=self.api_client,
235                                              local_store='')
236
237
238 class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
239     def get_service_roots(self, api_client):
240         keep_client = arvados.KeepClient(api_client=api_client)
241         services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
242         return [urllib.parse.urlparse(url) for url in sorted(services)]
243
244     def test_ssl_flag_respected_in_roots(self):
245         for ssl_flag in [False, True]:
246             services = self.get_service_roots(self.mock_keep_services(
247                 service_ssl_flag=ssl_flag))
248             self.assertEqual(
249                 ('https' if ssl_flag else 'http'), services[0].scheme)
250
251     def test_correct_ports_with_ipv6_addresses(self):
252         service = self.get_service_roots(self.mock_keep_services(
253             service_type='proxy', service_host='100::1', service_port=10, count=1))[0]
254         self.assertEqual('100::1', service.hostname)
255         self.assertEqual(10, service.port)
256
257     def test_insecure_disables_tls_verify(self):
258         api_client = self.mock_keep_services(count=1)
259         force_timeout = socket.timeout("timed out")
260
261         api_client.insecure = True
262         with tutil.mock_keep_responses(b'foo', 200) as mock:
263             keep_client = arvados.KeepClient(api_client=api_client)
264             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
265             self.assertEqual(
266                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
267                 0)
268
269         api_client.insecure = False
270         with tutil.mock_keep_responses(b'foo', 200) as mock:
271             keep_client = arvados.KeepClient(api_client=api_client)
272             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
273             # getopt()==None here means we didn't change the
274             # default. If we were using real pycurl instead of a mock,
275             # it would return the default value 1.
276             self.assertEqual(
277                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
278                 None)
279
280     def test_refresh_signature(self):
281         blk_digest = '6f5902ac237024bdd0c176cb93063dc4+11'
282         blk_sig = 'da39a3ee5e6b4b0d3255bfef95601890afd80709@53bed294'
283         local_loc = blk_digest+'+A'+blk_sig
284         remote_loc = blk_digest+'+R'+blk_sig
285         api_client = self.mock_keep_services(count=1)
286         headers = {'X-Keep-Locator':local_loc}
287         with tutil.mock_keep_responses('', 200, **headers):
288             # Check that the translated locator gets returned
289             keep_client = arvados.KeepClient(api_client=api_client)
290             self.assertEqual(local_loc, keep_client.refresh_signature(remote_loc))
291             # Check that refresh_signature() uses the correct method and headers
292             keep_client._get_or_head = mock.MagicMock()
293             keep_client.refresh_signature(remote_loc)
294             args, kwargs = keep_client._get_or_head.call_args_list[0]
295             self.assertIn(remote_loc, args)
296             self.assertEqual("HEAD", kwargs['method'])
297             self.assertIn('X-Keep-Signature', kwargs['headers'])
298
299     # test_*_timeout verify that KeepClient instructs pycurl to use
300     # the appropriate connection and read timeouts. They don't care
301     # whether pycurl actually exhibits the expected timeout behavior
302     # -- those tests are in the KeepClientTimeout test class.
303
304     def test_get_timeout(self):
305         api_client = self.mock_keep_services(count=1)
306         force_timeout = socket.timeout("timed out")
307         with tutil.mock_keep_responses(force_timeout, 0) as mock:
308             keep_client = arvados.KeepClient(api_client=api_client)
309             with self.assertRaises(arvados.errors.KeepReadError):
310                 keep_client.get('ffffffffffffffffffffffffffffffff')
311             self.assertEqual(
312                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
313                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
314             self.assertEqual(
315                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
316                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
317             self.assertEqual(
318                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
319                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
320
321     def test_put_timeout(self):
322         api_client = self.mock_keep_services(count=1)
323         force_timeout = socket.timeout("timed out")
324         with tutil.mock_keep_responses(force_timeout, 0) as mock:
325             keep_client = arvados.KeepClient(api_client=api_client)
326             with self.assertRaises(arvados.errors.KeepWriteError):
327                 keep_client.put(b'foo')
328             self.assertEqual(
329                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
330                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
331             self.assertEqual(
332                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
333                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
334             self.assertEqual(
335                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
336                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
337
338     def test_head_timeout(self):
339         api_client = self.mock_keep_services(count=1)
340         force_timeout = socket.timeout("timed out")
341         with tutil.mock_keep_responses(force_timeout, 0) as mock:
342             keep_client = arvados.KeepClient(api_client=api_client)
343             with self.assertRaises(arvados.errors.KeepReadError):
344                 keep_client.head('ffffffffffffffffffffffffffffffff')
345             self.assertEqual(
346                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
347                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
348             self.assertEqual(
349                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
350                 None)
351             self.assertEqual(
352                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
353                 None)
354
355     def test_proxy_get_timeout(self):
356         api_client = self.mock_keep_services(service_type='proxy', count=1)
357         force_timeout = socket.timeout("timed out")
358         with tutil.mock_keep_responses(force_timeout, 0) as mock:
359             keep_client = arvados.KeepClient(api_client=api_client)
360             with self.assertRaises(arvados.errors.KeepReadError):
361                 keep_client.get('ffffffffffffffffffffffffffffffff')
362             self.assertEqual(
363                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
364                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
365             self.assertEqual(
366                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
367                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
368             self.assertEqual(
369                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
370                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
371
372     def test_proxy_head_timeout(self):
373         api_client = self.mock_keep_services(service_type='proxy', count=1)
374         force_timeout = socket.timeout("timed out")
375         with tutil.mock_keep_responses(force_timeout, 0) as mock:
376             keep_client = arvados.KeepClient(api_client=api_client)
377             with self.assertRaises(arvados.errors.KeepReadError):
378                 keep_client.head('ffffffffffffffffffffffffffffffff')
379             self.assertEqual(
380                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
381                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
382             self.assertEqual(
383                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
384                 None)
385             self.assertEqual(
386                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
387                 None)
388
389     def test_proxy_put_timeout(self):
390         api_client = self.mock_keep_services(service_type='proxy', count=1)
391         force_timeout = socket.timeout("timed out")
392         with tutil.mock_keep_responses(force_timeout, 0) as mock:
393             keep_client = arvados.KeepClient(api_client=api_client)
394             with self.assertRaises(arvados.errors.KeepWriteError):
395                 keep_client.put('foo')
396             self.assertEqual(
397                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
398                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
399             self.assertEqual(
400                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
401                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
402             self.assertEqual(
403                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
404                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
405
406     def check_no_services_error(self, verb, exc_class):
407         api_client = mock.MagicMock(name='api_client')
408         api_client.keep_services().accessible().execute.side_effect = (
409             arvados.errors.ApiError)
410         keep_client = arvados.KeepClient(api_client=api_client)
411         with self.assertRaises(exc_class) as err_check:
412             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
413         self.assertEqual(0, len(err_check.exception.request_errors()))
414
415     def test_get_error_with_no_services(self):
416         self.check_no_services_error('get', arvados.errors.KeepReadError)
417
418     def test_head_error_with_no_services(self):
419         self.check_no_services_error('head', arvados.errors.KeepReadError)
420
421     def test_put_error_with_no_services(self):
422         self.check_no_services_error('put', arvados.errors.KeepWriteError)
423
424     def check_errors_from_last_retry(self, verb, exc_class):
425         api_client = self.mock_keep_services(count=2)
426         req_mock = tutil.mock_keep_responses(
427             "retry error reporting test", 500, 500, 500, 500, 500, 500, 502, 502)
428         with req_mock, tutil.skip_sleep, \
429                 self.assertRaises(exc_class) as err_check:
430             keep_client = arvados.KeepClient(api_client=api_client)
431             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
432                                        num_retries=3)
433         self.assertEqual([502, 502], [
434                 getattr(error, 'status_code', None)
435                 for error in err_check.exception.request_errors().values()])
436         self.assertRegex(str(err_check.exception), r'failed to (read|write) .* after 4 attempts')
437
438     def test_get_error_reflects_last_retry(self):
439         self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
440
441     def test_head_error_reflects_last_retry(self):
442         self.check_errors_from_last_retry('head', arvados.errors.KeepReadError)
443
444     def test_put_error_reflects_last_retry(self):
445         self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
446
447     def test_put_error_does_not_include_successful_puts(self):
448         data = 'partial failure test'
449         data_loc = tutil.str_keep_locator(data)
450         api_client = self.mock_keep_services(count=3)
451         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
452                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
453             keep_client = arvados.KeepClient(api_client=api_client)
454             keep_client.put(data)
455         self.assertEqual(2, len(exc_check.exception.request_errors()))
456
457     def test_proxy_put_with_no_writable_services(self):
458         data = 'test with no writable services'
459         data_loc = tutil.str_keep_locator(data)
460         api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
461         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
462                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
463           keep_client = arvados.KeepClient(api_client=api_client)
464           keep_client.put(data)
465         self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
466         self.assertEqual(0, len(exc_check.exception.request_errors()))
467
468     def test_oddball_service_get(self):
469         body = b'oddball service get'
470         api_client = self.mock_keep_services(service_type='fancynewblobstore')
471         with tutil.mock_keep_responses(body, 200):
472             keep_client = arvados.KeepClient(api_client=api_client)
473             actual = keep_client.get(tutil.str_keep_locator(body))
474         self.assertEqual(body, actual)
475
476     def test_oddball_service_put(self):
477         body = b'oddball service put'
478         pdh = tutil.str_keep_locator(body)
479         api_client = self.mock_keep_services(service_type='fancynewblobstore')
480         with tutil.mock_keep_responses(pdh, 200):
481             keep_client = arvados.KeepClient(api_client=api_client)
482             actual = keep_client.put(body, copies=1)
483         self.assertEqual(pdh, actual)
484
485     def test_oddball_service_writer_count(self):
486         body = b'oddball service writer count'
487         pdh = tutil.str_keep_locator(body)
488         api_client = self.mock_keep_services(service_type='fancynewblobstore',
489                                              count=4)
490         headers = {'x-keep-replicas-stored': 3}
491         with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
492                                        **headers) as req_mock:
493             keep_client = arvados.KeepClient(api_client=api_client)
494             actual = keep_client.put(body, copies=2)
495         self.assertEqual(pdh, actual)
496         self.assertEqual(1, req_mock.call_count)
497
498
499 @tutil.skip_sleep
500 class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
501     def setUp(self):
502         self.api_client = self.mock_keep_services(count=2)
503         self.keep_client = arvados.KeepClient(api_client=self.api_client)
504         self.data = b'xyzzy'
505         self.locator = '1271ed5ef305aadabc605b1609e24c52'
506
507     @mock.patch('arvados.KeepClient.KeepService.get')
508     def test_get_request_cache(self, get_mock):
509         with tutil.mock_keep_responses(self.data, 200, 200):
510             self.keep_client.get(self.locator)
511             self.keep_client.get(self.locator)
512         # Request already cached, don't require more than one request
513         get_mock.assert_called_once()
514
515     @mock.patch('arvados.KeepClient.KeepService.get')
516     def test_head_request_cache(self, get_mock):
517         with tutil.mock_keep_responses(self.data, 200, 200):
518             self.keep_client.head(self.locator)
519             self.keep_client.head(self.locator)
520         # Don't cache HEAD requests so that they're not confused with GET reqs
521         self.assertEqual(2, get_mock.call_count)
522
523     @mock.patch('arvados.KeepClient.KeepService.get')
524     def test_head_and_then_get_return_different_responses(self, get_mock):
525         head_resp = None
526         get_resp = None
527         get_mock.side_effect = ['first response', 'second response']
528         with tutil.mock_keep_responses(self.data, 200, 200):
529             head_resp = self.keep_client.head(self.locator)
530             get_resp = self.keep_client.get(self.locator)
531         self.assertEqual('first response', head_resp)
532         # First reponse was not cached because it was from a HEAD request.
533         self.assertNotEqual(head_resp, get_resp)
534
535 @tutil.skip_sleep
536 class KeepStorageClassesTestCase(unittest.TestCase, tutil.ApiClientMock):
537     def setUp(self):
538         self.api_client = self.mock_keep_services(count=2)
539         self.keep_client = arvados.KeepClient(api_client=self.api_client)
540         self.data = b'xyzzy'
541         self.locator = '1271ed5ef305aadabc605b1609e24c52'
542
543     def test_storage_classes_req_header(self):
544         cases = [
545             # requested, expected
546             [['foo'], 'X-Keep-Storage-Classes: foo'],
547             [['bar', 'foo'], 'X-Keep-Storage-Classes: bar, foo'],
548             [[], None],
549         ]
550         for req_classes, expected_header in cases:
551             headers = {'x-keep-replicas-stored': 1}
552             if len(req_classes) > 0:
553                 confirmed_hdr = ', '.join(["{}=1".format(cls) for cls in req_classes])
554                 headers.update({'x-keep-storage-classes-confirmed': confirmed_hdr})
555             with tutil.mock_keep_responses(self.locator, 200, **headers) as mock:
556                 self.keep_client.put(self.data, copies=1, classes=req_classes)
557                 resp = mock.responses[0]
558                 if expected_header is not None:
559                     self.assertIn(expected_header, resp.getopt(pycurl.HTTPHEADER))
560                 else:
561                     for hdr in resp.getopt(pycurl.HTTPHEADER):
562                         self.assertNotRegex(hdr, r'^X-Keep-Storage-Classes.*')
563
564     def test_partial_storage_classes_put(self):
565         headers = {
566             'x-keep-replicas-stored': 1,
567             'x-keep-storage-classes-confirmed': 'foo=1'}
568         with tutil.mock_keep_responses(self.locator, 200, 503, **headers) as mock:
569             with self.assertRaises(arvados.errors.KeepWriteError):
570                 self.keep_client.put(self.data, copies=1, classes=['foo', 'bar'])
571             # 1st request, both classes pending
572             req1_headers = mock.responses[0].getopt(pycurl.HTTPHEADER)
573             self.assertIn('X-Keep-Storage-Classes: bar, foo', req1_headers)
574             # 2nd try, 'foo' class already satisfied
575             req2_headers = mock.responses[1].getopt(pycurl.HTTPHEADER)
576             self.assertIn('X-Keep-Storage-Classes: bar', req2_headers)
577
578     def test_successful_storage_classes_put_requests(self):
579         cases = [
580             # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, expected_requests
581             [ 1, ['foo'], 1, 'foo=1', 1],
582             [ 1, ['foo'], 2, 'foo=2', 1],
583             [ 2, ['foo'], 2, 'foo=2', 1],
584             [ 2, ['foo'], 1, 'foo=1', 2],
585             [ 1, ['foo', 'bar'], 1, 'foo=1, bar=1', 1],
586             [ 1, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
587             [ 2, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
588             [ 2, ['foo', 'bar'], 1, 'foo=1, bar=1', 2],
589             [ 1, ['foo', 'bar'], 1, None, 1],
590             [ 1, ['foo'], 1, None, 1],
591             [ 2, ['foo'], 2, None, 1],
592             [ 2, ['foo'], 1, None, 2],
593         ]
594         for w_copies, w_classes, c_copies, c_classes, e_reqs in cases:
595             headers = {'x-keep-replicas-stored': c_copies}
596             if c_classes is not None:
597                 headers.update({'x-keep-storage-classes-confirmed': c_classes})
598             with tutil.mock_keep_responses(self.locator, 200, 200, **headers) as mock:
599                 case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}", expected_requests={}'.format(w_copies, ', '.join(w_classes), c_copies, c_classes, e_reqs)
600                 self.assertEqual(self.locator,
601                     self.keep_client.put(self.data, copies=w_copies, classes=w_classes),
602                     case_desc)
603                 self.assertEqual(e_reqs, mock.call_count, case_desc)
604
605     def test_failed_storage_classes_put_requests(self):
606         cases = [
607             # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, return_code
608             [ 1, ['foo'], 1, 'bar=1', 200],
609             [ 1, ['foo'], 1, None, 503],
610             [ 2, ['foo'], 1, 'bar=1, foo=1', 200],
611             [ 2, ['foo, bar'], 1, 'bar=2, foo=1', 200],
612         ]
613         for w_copies, w_classes, c_copies, c_classes, return_code in cases:
614             headers = {'x-keep-replicas-stored': c_copies}
615             if c_classes is not None:
616                 headers.update({'x-keep-storage-classes-confirmed': c_classes})
617             with tutil.mock_keep_responses(self.locator, return_code, **headers):
618                 case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}"'.format(w_copies, ', '.join(w_classes), c_copies, c_classes)
619                 with self.assertRaises(arvados.errors.KeepWriteError, msg=case_desc):
620                     self.keep_client.put(self.data, copies=w_copies, classes=w_classes)
621
622 @tutil.skip_sleep
623 class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock):
624     def setUp(self):
625         self.api_client = self.mock_keep_services(count=2)
626         self.keep_client = arvados.KeepClient(api_client=self.api_client)
627         self.data = b'xyzzy'
628         self.locator = '1271ed5ef305aadabc605b1609e24c52'
629         self.test_id = arvados.util.new_request_id()
630         self.assertRegex(self.test_id, r'^req-[a-z0-9]{20}$')
631         # If we don't set request_id to None explicitly here, it will
632         # return <MagicMock name='api_client_mock.request_id'
633         # id='123456789'>:
634         self.api_client.request_id = None
635
636     def test_default_to_api_client_request_id(self):
637         self.api_client.request_id = self.test_id
638         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
639             self.keep_client.put(self.data)
640         self.assertEqual(2, len(mock.responses))
641         for resp in mock.responses:
642             self.assertProvidedRequestId(resp)
643
644         with tutil.mock_keep_responses(self.data, 200) as mock:
645             self.keep_client.get(self.locator)
646         self.assertProvidedRequestId(mock.responses[0])
647
648         with tutil.mock_keep_responses(b'', 200) as mock:
649             self.keep_client.head(self.locator)
650         self.assertProvidedRequestId(mock.responses[0])
651
652     def test_explicit_request_id(self):
653         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
654             self.keep_client.put(self.data, request_id=self.test_id)
655         self.assertEqual(2, len(mock.responses))
656         for resp in mock.responses:
657             self.assertProvidedRequestId(resp)
658
659         with tutil.mock_keep_responses(self.data, 200) as mock:
660             self.keep_client.get(self.locator, request_id=self.test_id)
661         self.assertProvidedRequestId(mock.responses[0])
662
663         with tutil.mock_keep_responses(b'', 200) as mock:
664             self.keep_client.head(self.locator, request_id=self.test_id)
665         self.assertProvidedRequestId(mock.responses[0])
666
667     def test_automatic_request_id(self):
668         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
669             self.keep_client.put(self.data)
670         self.assertEqual(2, len(mock.responses))
671         for resp in mock.responses:
672             self.assertAutomaticRequestId(resp)
673
674         with tutil.mock_keep_responses(self.data, 200) as mock:
675             self.keep_client.get(self.locator)
676         self.assertAutomaticRequestId(mock.responses[0])
677
678         with tutil.mock_keep_responses(b'', 200) as mock:
679             self.keep_client.head(self.locator)
680         self.assertAutomaticRequestId(mock.responses[0])
681
682     def assertAutomaticRequestId(self, resp):
683         hdr = [x for x in resp.getopt(pycurl.HTTPHEADER)
684                if x.startswith('X-Request-Id: ')][0]
685         self.assertNotEqual(hdr, 'X-Request-Id: '+self.test_id)
686         self.assertRegex(hdr, r'^X-Request-Id: req-[a-z0-9]{20}$')
687
688     def assertProvidedRequestId(self, resp):
689         self.assertIn('X-Request-Id: '+self.test_id,
690                       resp.getopt(pycurl.HTTPHEADER))
691
692
693 @tutil.skip_sleep
694 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
695
696     def setUp(self):
697         # expected_order[i] is the probe order for
698         # hash=md5(sprintf("%064x",i)) where there are 16 services
699         # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
700         # the first probe for the block consisting of 64 "0"
701         # characters is the service whose uuid is
702         # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
703         self.services = 16
704         self.expected_order = [
705             list('3eab2d5fc9681074'),
706             list('097dba52e648f1c3'),
707             list('c5b4e023f8a7d691'),
708             list('9d81c02e76a3bf54'),
709             ]
710         self.blocks = [
711             "{:064x}".format(x).encode()
712             for x in range(len(self.expected_order))]
713         self.hashes = [
714             hashlib.md5(self.blocks[x]).hexdigest()
715             for x in range(len(self.expected_order))]
716         self.api_client = self.mock_keep_services(count=self.services)
717         self.keep_client = arvados.KeepClient(api_client=self.api_client)
718
719     def test_weighted_service_roots_against_reference_set(self):
720         # Confirm weighted_service_roots() returns the correct order
721         for i, hash in enumerate(self.hashes):
722             roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash))
723             got_order = [
724                 re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
725                 for root in roots]
726             self.assertEqual(self.expected_order[i], got_order)
727
728     def test_get_probe_order_against_reference_set(self):
729         self._test_probe_order_against_reference_set(
730             lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
731
732     def test_head_probe_order_against_reference_set(self):
733         self._test_probe_order_against_reference_set(
734             lambda i: self.keep_client.head(self.hashes[i], num_retries=1))
735
736     def test_put_probe_order_against_reference_set(self):
737         # copies=1 prevents the test from being sensitive to races
738         # between writer threads.
739         self._test_probe_order_against_reference_set(
740             lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1))
741
742     def _test_probe_order_against_reference_set(self, op):
743         for i in range(len(self.blocks)):
744             with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \
745                  self.assertRaises(arvados.errors.KeepRequestError):
746                 op(i)
747             got_order = [
748                 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
749                 for resp in mock.responses]
750             self.assertEqual(self.expected_order[i]*2, got_order)
751
752     def test_put_probe_order_multiple_copies(self):
753         for copies in range(2, 4):
754             for i in range(len(self.blocks)):
755                 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \
756                      self.assertRaises(arvados.errors.KeepWriteError):
757                     self.keep_client.put(self.blocks[i], num_retries=2, copies=copies)
758                 got_order = [
759                     re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
760                     for resp in mock.responses]
761                 # With T threads racing to make requests, the position
762                 # of a given server in the sequence of HTTP requests
763                 # (got_order) cannot be more than T-1 positions
764                 # earlier than that server's position in the reference
765                 # probe sequence (expected_order).
766                 #
767                 # Loop invariant: we have accounted for +pos+ expected
768                 # probes, either by seeing them in +got_order+ or by
769                 # putting them in +pending+ in the hope of seeing them
770                 # later. As long as +len(pending)<T+, we haven't
771                 # started a request too early.
772                 pending = []
773                 for pos, expected in enumerate(self.expected_order[i]*3):
774                     got = got_order[pos-len(pending)]
775                     while got in pending:
776                         del pending[pending.index(got)]
777                         got = got_order[pos-len(pending)]
778                     if got != expected:
779                         pending.append(expected)
780                         self.assertLess(
781                             len(pending), copies,
782                             "pending={}, with copies={}, got {}, expected {}".format(
783                                 pending, copies, repr(got_order), repr(self.expected_order[i]*3)))
784
785     def test_probe_waste_adding_one_server(self):
786         hashes = [
787             hashlib.md5("{:064x}".format(x).encode()).hexdigest() for x in range(100)]
788         initial_services = 12
789         self.api_client = self.mock_keep_services(count=initial_services)
790         self.keep_client = arvados.KeepClient(api_client=self.api_client)
791         probes_before = [
792             self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
793         for added_services in range(1, 12):
794             api_client = self.mock_keep_services(count=initial_services+added_services)
795             keep_client = arvados.KeepClient(api_client=api_client)
796             total_penalty = 0
797             for hash_index in range(len(hashes)):
798                 probe_after = keep_client.weighted_service_roots(
799                     arvados.KeepLocator(hashes[hash_index]))
800                 penalty = probe_after.index(probes_before[hash_index][0])
801                 self.assertLessEqual(penalty, added_services)
802                 total_penalty += penalty
803             # Average penalty per block should not exceed
804             # N(added)/N(orig) by more than 20%, and should get closer
805             # to the ideal as we add data points.
806             expect_penalty = (
807                 added_services *
808                 len(hashes) / initial_services)
809             max_penalty = (
810                 expect_penalty *
811                 (120 - added_services)/100)
812             min_penalty = (
813                 expect_penalty * 8/10)
814             self.assertTrue(
815                 min_penalty <= total_penalty <= max_penalty,
816                 "With {}+{} services, {} blocks, penalty {} but expected {}..{}".format(
817                     initial_services,
818                     added_services,
819                     len(hashes),
820                     total_penalty,
821                     min_penalty,
822                     max_penalty))
823
824     def check_64_zeros_error_order(self, verb, exc_class):
825         data = b'0' * 64
826         if verb == 'get':
827             data = tutil.str_keep_locator(data)
828         # Arbitrary port number:
829         aport = random.randint(1024,65535)
830         api_client = self.mock_keep_services(service_port=aport, count=self.services)
831         keep_client = arvados.KeepClient(api_client=api_client)
832         with mock.patch('pycurl.Curl') as curl_mock, \
833              self.assertRaises(exc_class) as err_check:
834             curl_mock.return_value = tutil.FakeCurl.make(code=500, body=b'')
835             getattr(keep_client, verb)(data)
836         urls = [urllib.parse.urlparse(url)
837                 for url in err_check.exception.request_errors()]
838         self.assertEqual([('keep0x' + c, aport) for c in '3eab2d5fc9681074'],
839                          [(url.hostname, url.port) for url in urls])
840
841     def test_get_error_shows_probe_order(self):
842         self.check_64_zeros_error_order('get', arvados.errors.KeepReadError)
843
844     def test_put_error_shows_probe_order(self):
845         self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
846
847
848 class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase):
849     # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
850     # 1s worth of data and then trigger bandwidth errors before running
851     # out of data.
852     DATA = b'x'*2**11
853     BANDWIDTH_LOW_LIM = 1024
854     TIMEOUT_TIME = 1.0
855
856     class assertTakesBetween(unittest.TestCase):
857         def __init__(self, tmin, tmax):
858             self.tmin = tmin
859             self.tmax = tmax
860
861         def __enter__(self):
862             self.t0 = time.time()
863
864         def __exit__(self, *args, **kwargs):
865             # Round times to milliseconds, like CURL. Otherwise, we
866             # fail when CURL reaches a 1s timeout at 0.9998s.
867             delta = round(time.time() - self.t0, 3)
868             self.assertGreaterEqual(delta, self.tmin)
869             self.assertLessEqual(delta, self.tmax)
870
871     class assertTakesGreater(unittest.TestCase):
872         def __init__(self, tmin):
873             self.tmin = tmin
874
875         def __enter__(self):
876             self.t0 = time.time()
877
878         def __exit__(self, *args, **kwargs):
879             delta = round(time.time() - self.t0, 3)
880             self.assertGreaterEqual(delta, self.tmin)
881
882     def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
883         return arvados.KeepClient(
884             api_client=self.api_client,
885             timeout=timeouts)
886
887     def test_timeout_slow_connect(self):
888         # Can't simulate TCP delays with our own socket. Leave our
889         # stub server running uselessly, and try to connect to an
890         # unroutable IP address instead.
891         self.api_client = self.mock_keep_services(
892             count=1,
893             service_host='240.0.0.0',
894         )
895         with self.assertTakesBetween(0.1, 0.5):
896             with self.assertRaises(arvados.errors.KeepWriteError):
897                 self.keepClient().put(self.DATA, copies=1, num_retries=0)
898
899     def test_low_bandwidth_no_delays_success(self):
900         self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
901         kc = self.keepClient()
902         loc = kc.put(self.DATA, copies=1, num_retries=0)
903         self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
904
905     def test_too_low_bandwidth_no_delays_failure(self):
906         # Check that lessening bandwidth corresponds to failing
907         kc = self.keepClient()
908         loc = kc.put(self.DATA, copies=1, num_retries=0)
909         self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
910         with self.assertTakesGreater(self.TIMEOUT_TIME):
911             with self.assertRaises(arvados.errors.KeepReadError):
912                 kc.get(loc, num_retries=0)
913         with self.assertTakesGreater(self.TIMEOUT_TIME):
914             with self.assertRaises(arvados.errors.KeepWriteError):
915                 kc.put(self.DATA, copies=1, num_retries=0)
916
917     def test_low_bandwidth_with_server_response_delay_failure(self):
918         kc = self.keepClient()
919         loc = kc.put(self.DATA, copies=1, num_retries=0)
920         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
921         # Note the actual delay must be 1s longer than the low speed
922         # limit interval in order for curl to detect it reliably.
923         self.server.setdelays(response=self.TIMEOUT_TIME+1)
924         with self.assertTakesGreater(self.TIMEOUT_TIME):
925             with self.assertRaises(arvados.errors.KeepReadError):
926                 kc.get(loc, num_retries=0)
927         with self.assertTakesGreater(self.TIMEOUT_TIME):
928             with self.assertRaises(arvados.errors.KeepWriteError):
929                 kc.put(self.DATA, copies=1, num_retries=0)
930         with self.assertTakesGreater(self.TIMEOUT_TIME):
931             kc.head(loc, num_retries=0)
932
933     def test_low_bandwidth_with_server_mid_delay_failure(self):
934         kc = self.keepClient()
935         loc = kc.put(self.DATA, copies=1, num_retries=0)
936         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
937         # Note the actual delay must be 1s longer than the low speed
938         # limit interval in order for curl to detect it reliably.
939         self.server.setdelays(mid_write=self.TIMEOUT_TIME+1, mid_read=self.TIMEOUT_TIME+1)
940         with self.assertTakesGreater(self.TIMEOUT_TIME):
941             with self.assertRaises(arvados.errors.KeepReadError) as e:
942                 kc.get(loc, num_retries=0)
943         with self.assertTakesGreater(self.TIMEOUT_TIME):
944             with self.assertRaises(arvados.errors.KeepWriteError):
945                 kc.put(self.DATA, copies=1, num_retries=0)
946
947     def test_timeout_slow_request(self):
948         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
949         self.server.setdelays(request=.2)
950         self._test_connect_timeout_under_200ms(loc)
951         self.server.setdelays(request=2)
952         self._test_response_timeout_under_2s(loc)
953
954     def test_timeout_slow_response(self):
955         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
956         self.server.setdelays(response=.2)
957         self._test_connect_timeout_under_200ms(loc)
958         self.server.setdelays(response=2)
959         self._test_response_timeout_under_2s(loc)
960
961     def test_timeout_slow_response_body(self):
962         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
963         self.server.setdelays(response_body=.2)
964         self._test_connect_timeout_under_200ms(loc)
965         self.server.setdelays(response_body=2)
966         self._test_response_timeout_under_2s(loc)
967
968     def _test_connect_timeout_under_200ms(self, loc):
969         # Allow 100ms to connect, then 1s for response. Everything
970         # should work, and everything should take at least 200ms to
971         # return.
972         kc = self.keepClient(timeouts=(.1, 1))
973         with self.assertTakesBetween(.2, .3):
974             kc.put(self.DATA, copies=1, num_retries=0)
975         with self.assertTakesBetween(.2, .3):
976             self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
977
978     def _test_response_timeout_under_2s(self, loc):
979         # Allow 10s to connect, then 1s for response. Nothing should
980         # work, and everything should take at least 1s to return.
981         kc = self.keepClient(timeouts=(10, 1))
982         with self.assertTakesBetween(1, 9):
983             with self.assertRaises(arvados.errors.KeepReadError):
984                 kc.get(loc, num_retries=0)
985         with self.assertTakesBetween(1, 9):
986             with self.assertRaises(arvados.errors.KeepWriteError):
987                 kc.put(self.DATA, copies=1, num_retries=0)
988
989
990 class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
991     def mock_disks_and_gateways(self, disks=3, gateways=1):
992         self.gateways = [{
993                 'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
994                 'owner_uuid': 'zzzzz-tpzed-000000000000000',
995                 'service_host': 'gatewayhost{}'.format(i),
996                 'service_port': 12345,
997                 'service_ssl_flag': True,
998                 'service_type': 'gateway:test',
999         } for i in range(gateways)]
1000         self.gateway_roots = [
1001             "https://{service_host}:{service_port}/".format(**gw)
1002             for gw in self.gateways]
1003         self.api_client = self.mock_keep_services(
1004             count=disks, additional_services=self.gateways)
1005         self.keepClient = arvados.KeepClient(api_client=self.api_client)
1006
1007     @mock.patch('pycurl.Curl')
1008     def test_get_with_gateway_hint_first(self, MockCurl):
1009         MockCurl.return_value = tutil.FakeCurl.make(
1010             code=200, body='foo', headers={'Content-Length': 3})
1011         self.mock_disks_and_gateways()
1012         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@' + self.gateways[0]['uuid']
1013         self.assertEqual(b'foo', self.keepClient.get(locator))
1014         self.assertEqual(self.gateway_roots[0]+locator,
1015                          MockCurl.return_value.getopt(pycurl.URL).decode())
1016         self.assertEqual(True, self.keepClient.head(locator))
1017
1018     @mock.patch('pycurl.Curl')
1019     def test_get_with_gateway_hints_in_order(self, MockCurl):
1020         gateways = 4
1021         disks = 3
1022         mocks = [
1023             tutil.FakeCurl.make(code=404, body='')
1024             for _ in range(gateways+disks)
1025         ]
1026         MockCurl.side_effect = tutil.queue_with(mocks)
1027         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1028         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1029                            ['K@'+gw['uuid'] for gw in self.gateways])
1030         with self.assertRaises(arvados.errors.NotFoundError):
1031             self.keepClient.get(locator)
1032         # Gateways are tried first, in the order given.
1033         for i, root in enumerate(self.gateway_roots):
1034             self.assertEqual(root+locator,
1035                              mocks[i].getopt(pycurl.URL).decode())
1036         # Disk services are tried next.
1037         for i in range(gateways, gateways+disks):
1038             self.assertRegex(
1039                 mocks[i].getopt(pycurl.URL).decode(),
1040                 r'keep0x')
1041
1042     @mock.patch('pycurl.Curl')
1043     def test_head_with_gateway_hints_in_order(self, MockCurl):
1044         gateways = 4
1045         disks = 3
1046         mocks = [
1047             tutil.FakeCurl.make(code=404, body=b'')
1048             for _ in range(gateways+disks)
1049         ]
1050         MockCurl.side_effect = tutil.queue_with(mocks)
1051         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1052         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1053                            ['K@'+gw['uuid'] for gw in self.gateways])
1054         with self.assertRaises(arvados.errors.NotFoundError):
1055             self.keepClient.head(locator)
1056         # Gateways are tried first, in the order given.
1057         for i, root in enumerate(self.gateway_roots):
1058             self.assertEqual(root+locator,
1059                              mocks[i].getopt(pycurl.URL).decode())
1060         # Disk services are tried next.
1061         for i in range(gateways, gateways+disks):
1062             self.assertRegex(
1063                 mocks[i].getopt(pycurl.URL).decode(),
1064                 r'keep0x')
1065
1066     @mock.patch('pycurl.Curl')
1067     def test_get_with_remote_proxy_hint(self, MockCurl):
1068         MockCurl.return_value = tutil.FakeCurl.make(
1069             code=200, body=b'foo', headers={'Content-Length': 3})
1070         self.mock_disks_and_gateways()
1071         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1072         self.assertEqual(b'foo', self.keepClient.get(locator))
1073         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1074                          MockCurl.return_value.getopt(pycurl.URL).decode())
1075
1076     @mock.patch('pycurl.Curl')
1077     def test_head_with_remote_proxy_hint(self, MockCurl):
1078         MockCurl.return_value = tutil.FakeCurl.make(
1079             code=200, body=b'foo', headers={'Content-Length': 3})
1080         self.mock_disks_and_gateways()
1081         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1082         self.assertEqual(True, self.keepClient.head(locator))
1083         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1084                          MockCurl.return_value.getopt(pycurl.URL).decode())
1085
1086
1087 class KeepClientRetryTestMixin(object):
1088     # Testing with a local Keep store won't exercise the retry behavior.
1089     # Instead, our strategy is:
1090     # * Create a client with one proxy specified (pointed at a black
1091     #   hole), so there's no need to instantiate an API client, and
1092     #   all HTTP requests come from one place.
1093     # * Mock httplib's request method to provide simulated responses.
1094     # This lets us test the retry logic extensively without relying on any
1095     # supporting servers, and prevents side effects in case something hiccups.
1096     # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
1097     # run_method().
1098     #
1099     # Test classes must define TEST_PATCHER to a method that mocks
1100     # out appropriate methods in the client.
1101
1102     PROXY_ADDR = 'http://[%s]:65535/' % (tutil.TEST_HOST,)
1103     TEST_DATA = b'testdata'
1104     TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
1105
1106     def setUp(self):
1107         self.client_kwargs = {'proxy': self.PROXY_ADDR, 'local_store': ''}
1108
1109     def new_client(self, **caller_kwargs):
1110         kwargs = self.client_kwargs.copy()
1111         kwargs.update(caller_kwargs)
1112         return arvados.KeepClient(**kwargs)
1113
1114     def run_method(self, *args, **kwargs):
1115         raise NotImplementedError("test subclasses must define run_method")
1116
1117     def check_success(self, expected=None, *args, **kwargs):
1118         if expected is None:
1119             expected = self.DEFAULT_EXPECT
1120         self.assertEqual(expected, self.run_method(*args, **kwargs))
1121
1122     def check_exception(self, error_class=None, *args, **kwargs):
1123         if error_class is None:
1124             error_class = self.DEFAULT_EXCEPTION
1125         with self.assertRaises(error_class) as err:
1126             self.run_method(*args, **kwargs)
1127         return err
1128
1129     def test_immediate_success(self):
1130         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 200):
1131             self.check_success()
1132
1133     def test_retry_then_success(self):
1134         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1135             self.check_success(num_retries=3)
1136
1137     def test_exception_then_success(self):
1138         with self.TEST_PATCHER(self.DEFAULT_EXPECT, Exception('mock err'), 200):
1139             self.check_success(num_retries=3)
1140
1141     def test_no_default_retry(self):
1142         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1143             self.check_exception()
1144
1145     def test_no_retry_after_permanent_error(self):
1146         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 403, 200):
1147             self.check_exception(num_retries=3)
1148
1149     def test_error_after_retries_exhausted(self):
1150         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 500, 200):
1151             err = self.check_exception(num_retries=1)
1152         self.assertRegex(str(err.exception), r'failed to .* after 2 attempts')
1153
1154     def test_num_retries_instance_fallback(self):
1155         self.client_kwargs['num_retries'] = 3
1156         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1157             self.check_success()
1158
1159
1160 @tutil.skip_sleep
1161 class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1162     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
1163     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1164     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1165     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1166
1167     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1168                    *args, **kwargs):
1169         return self.new_client().get(locator, *args, **kwargs)
1170
1171     def test_specific_exception_when_not_found(self):
1172         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1173             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1174
1175     def test_general_exception_with_mixed_errors(self):
1176         # get should raise a NotFoundError if no server returns the block,
1177         # and a high threshold of servers report that it's not found.
1178         # This test rigs up 50/50 disagreement between two servers, and
1179         # checks that it does not become a NotFoundError.
1180         client = self.new_client()
1181         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1182             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1183                 client.get(self.HINTED_LOCATOR)
1184             self.assertNotIsInstance(
1185                 exc_check.exception, arvados.errors.NotFoundError,
1186                 "mixed errors raised NotFoundError")
1187
1188     def test_hint_server_can_succeed_without_retries(self):
1189         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1190             self.check_success(locator=self.HINTED_LOCATOR)
1191
1192     def test_try_next_server_after_timeout(self):
1193         with tutil.mock_keep_responses(
1194                 (socket.timeout("timed out"), 200),
1195                 (self.DEFAULT_EXPECT, 200)):
1196             self.check_success(locator=self.HINTED_LOCATOR)
1197
1198     def test_retry_data_with_wrong_checksum(self):
1199         with tutil.mock_keep_responses(
1200                 ('baddata', 200),
1201                 (self.DEFAULT_EXPECT, 200)):
1202             self.check_success(locator=self.HINTED_LOCATOR)
1203
1204 @tutil.skip_sleep
1205 class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1206     DEFAULT_EXPECT = True
1207     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1208     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1209     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1210
1211     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1212                    *args, **kwargs):
1213         return self.new_client().head(locator, *args, **kwargs)
1214
1215     def test_specific_exception_when_not_found(self):
1216         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1217             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1218
1219     def test_general_exception_with_mixed_errors(self):
1220         # head should raise a NotFoundError if no server returns the block,
1221         # and a high threshold of servers report that it's not found.
1222         # This test rigs up 50/50 disagreement between two servers, and
1223         # checks that it does not become a NotFoundError.
1224         client = self.new_client()
1225         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1226             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1227                 client.head(self.HINTED_LOCATOR)
1228             self.assertNotIsInstance(
1229                 exc_check.exception, arvados.errors.NotFoundError,
1230                 "mixed errors raised NotFoundError")
1231
1232     def test_hint_server_can_succeed_without_retries(self):
1233         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1234             self.check_success(locator=self.HINTED_LOCATOR)
1235
1236     def test_try_next_server_after_timeout(self):
1237         with tutil.mock_keep_responses(
1238                 (socket.timeout("timed out"), 200),
1239                 (self.DEFAULT_EXPECT, 200)):
1240             self.check_success(locator=self.HINTED_LOCATOR)
1241
1242 @tutil.skip_sleep
1243 class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1244     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
1245     DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
1246     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1247
1248     def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
1249                    copies=1, *args, **kwargs):
1250         return self.new_client().put(data, copies, *args, **kwargs)
1251
1252     def test_do_not_send_multiple_copies_to_same_server(self):
1253         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200):
1254             self.check_exception(copies=2, num_retries=3)
1255
1256
1257 class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
1258
1259     class FakeKeepService(object):
1260         def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
1261             self.delay = delay
1262             self.will_succeed = will_succeed
1263             self.will_raise = will_raise
1264             self._result = {}
1265             self._result['headers'] = {}
1266             self._result['headers']['x-keep-replicas-stored'] = str(replicas)
1267             self._result['headers']['x-keep-storage-classes-confirmed'] = 'default={}'.format(replicas)
1268             self._result['body'] = 'foobar'
1269
1270         def put(self, data_hash, data, timeout, headers):
1271             time.sleep(self.delay)
1272             if self.will_raise is not None:
1273                 raise self.will_raise
1274             return self.will_succeed
1275
1276         def last_result(self):
1277             if self.will_succeed:
1278                 return self._result
1279
1280         def finished(self):
1281             return False
1282
1283     def setUp(self):
1284         self.copies = 3
1285         self.pool = arvados.KeepClient.KeepWriterThreadPool(
1286             data = 'foo',
1287             data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1288             max_service_replicas = self.copies,
1289             copies = self.copies
1290         )
1291
1292     def test_only_write_enough_on_success(self):
1293         for i in range(10):
1294             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1295             self.pool.add_task(ks, None)
1296         self.pool.join()
1297         self.assertEqual(self.pool.done(), (self.copies, []))
1298
1299     def test_only_write_enough_on_partial_success(self):
1300         for i in range(5):
1301             ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
1302             self.pool.add_task(ks, None)
1303             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1304             self.pool.add_task(ks, None)
1305         self.pool.join()
1306         self.assertEqual(self.pool.done(), (self.copies, []))
1307
1308     def test_only_write_enough_when_some_crash(self):
1309         for i in range(5):
1310             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1311             self.pool.add_task(ks, None)
1312             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1313             self.pool.add_task(ks, None)
1314         self.pool.join()
1315         self.assertEqual(self.pool.done(), (self.copies, []))
1316
1317     def test_fail_when_too_many_crash(self):
1318         for i in range(self.copies+1):
1319             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1320             self.pool.add_task(ks, None)
1321         for i in range(self.copies-1):
1322             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1323             self.pool.add_task(ks, None)
1324         self.pool.join()
1325         self.assertEqual(self.pool.done(), (self.copies-1, []))
1326
1327
1328 @tutil.skip_sleep
1329 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock):
1330     # Test put()s that need two distinct servers to succeed, possibly
1331     # requiring multiple passes through the retry loop.
1332
1333     def setUp(self):
1334         self.api_client = self.mock_keep_services(count=2)
1335         self.keep_client = arvados.KeepClient(api_client=self.api_client)
1336
1337     def test_success_after_exception(self):
1338         with tutil.mock_keep_responses(
1339                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1340                 Exception('mock err'), 200, 200) as req_mock:
1341             self.keep_client.put('foo', num_retries=1, copies=2)
1342         self.assertEqual(3, req_mock.call_count)
1343
1344     def test_success_after_retryable_error(self):
1345         with tutil.mock_keep_responses(
1346                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1347                 500, 200, 200) as req_mock:
1348             self.keep_client.put('foo', num_retries=1, copies=2)
1349         self.assertEqual(3, req_mock.call_count)
1350
1351     def test_fail_after_final_error(self):
1352         # First retry loop gets a 200 (can't achieve replication by
1353         # storing again on that server) and a 400 (can't retry that
1354         # server at all), so we shouldn't try a third request.
1355         with tutil.mock_keep_responses(
1356                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1357                 200, 400, 200) as req_mock:
1358             with self.assertRaises(arvados.errors.KeepWriteError):
1359                 self.keep_client.put('foo', num_retries=1, copies=2)
1360         self.assertEqual(2, req_mock.call_count)
1361
1362 class KeepClientAPIErrorTest(unittest.TestCase):
1363     def test_api_fail(self):
1364         class ApiMock(object):
1365             def __getattr__(self, r):
1366                 if r == "api_token":
1367                     return "abc"
1368                 elif r == "insecure":
1369                     return False
1370                 else:
1371                     raise arvados.errors.KeepReadError()
1372         keep_client = arvados.KeepClient(api_client=ApiMock(),
1373                                              proxy='', local_store='')
1374
1375         # The bug this is testing for is that if an API (not
1376         # keepstore) exception is thrown as part of a get(), the next
1377         # attempt to get that same block will result in a deadlock.
1378         # This is why there are two get()s in a row.  Unfortunately,
1379         # the failure mode for this test is that the test suite
1380         # deadlocks, there isn't a good way to avoid that without
1381         # adding a special case that has no use except for this test.
1382
1383         with self.assertRaises(arvados.errors.KeepReadError):
1384             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1385         with self.assertRaises(arvados.errors.KeepReadError):
1386             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")