Merge branch '14323-container-collection-mounts'
[arvados.git] / sdk / python / tests / test_keep_client.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 standard_library.install_aliases()
9 from builtins import str
10 from builtins import range
11 from builtins import object
12 import hashlib
13 import mock
14 import os
15 import pycurl
16 import random
17 import re
18 import socket
19 import sys
20 import time
21 import unittest
22 import urllib.parse
23
24 import arvados
25 import arvados.retry
26 import arvados.util
27 from . import arvados_testutil as tutil
28 from . import keepstub
29 from . import run_test_server
30
31 class KeepTestCase(run_test_server.TestCaseWithServers):
32     MAIN_SERVER = {}
33     KEEP_SERVER = {}
34
35     @classmethod
36     def setUpClass(cls):
37         super(KeepTestCase, cls).setUpClass()
38         run_test_server.authorize_with("admin")
39         cls.api_client = arvados.api('v1')
40         cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
41                                              proxy='', local_store='')
42
43     def test_KeepBasicRWTest(self):
44         self.assertEqual(0, self.keep_client.upload_counter.get())
45         foo_locator = self.keep_client.put('foo')
46         self.assertRegex(
47             foo_locator,
48             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
49             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
50
51         # 6 bytes because uploaded 2 copies
52         self.assertEqual(6, self.keep_client.upload_counter.get())
53
54         self.assertEqual(0, self.keep_client.download_counter.get())
55         self.assertEqual(self.keep_client.get(foo_locator),
56                          b'foo',
57                          'wrong content from Keep.get(md5("foo"))')
58         self.assertEqual(3, self.keep_client.download_counter.get())
59
60     def test_KeepBinaryRWTest(self):
61         blob_str = b'\xff\xfe\xf7\x00\x01\x02'
62         blob_locator = self.keep_client.put(blob_str)
63         self.assertRegex(
64             blob_locator,
65             '^7fc7c53b45e53926ba52821140fef396\+6',
66             ('wrong locator from Keep.put(<binarydata>):' + blob_locator))
67         self.assertEqual(self.keep_client.get(blob_locator),
68                          blob_str,
69                          'wrong content from Keep.get(md5(<binarydata>))')
70
71     def test_KeepLongBinaryRWTest(self):
72         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
73         for i in range(0,23):
74             blob_data = blob_data + blob_data
75         blob_locator = self.keep_client.put(blob_data)
76         self.assertRegex(
77             blob_locator,
78             '^84d90fc0d8175dd5dcfab04b999bc956\+67108864',
79             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
80         self.assertEqual(self.keep_client.get(blob_locator),
81                          blob_data,
82                          'wrong content from Keep.get(md5(<binarydata>))')
83
84     @unittest.skip("unreliable test - please fix and close #8752")
85     def test_KeepSingleCopyRWTest(self):
86         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
87         blob_locator = self.keep_client.put(blob_data, copies=1)
88         self.assertRegex(
89             blob_locator,
90             '^c902006bc98a3eb4a3663b65ab4a6fab\+8',
91             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
92         self.assertEqual(self.keep_client.get(blob_locator),
93                          blob_data,
94                          'wrong content from Keep.get(md5(<binarydata>))')
95
96     def test_KeepEmptyCollectionTest(self):
97         blob_locator = self.keep_client.put('', copies=1)
98         self.assertRegex(
99             blob_locator,
100             '^d41d8cd98f00b204e9800998ecf8427e\+0',
101             ('wrong locator from Keep.put(""): ' + blob_locator))
102
103     def test_unicode_must_be_ascii(self):
104         # If unicode type, must only consist of valid ASCII
105         foo_locator = self.keep_client.put(u'foo')
106         self.assertRegex(
107             foo_locator,
108             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
109             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
110
111         if sys.version_info < (3, 0):
112             with self.assertRaises(UnicodeEncodeError):
113                 # Error if it is not ASCII
114                 self.keep_client.put(u'\xe2')
115
116         with self.assertRaises(AttributeError):
117             # Must be bytes or have an encode() method
118             self.keep_client.put({})
119
120     def test_KeepHeadTest(self):
121         locator = self.keep_client.put('test_head')
122         self.assertRegex(
123             locator,
124             '^b9a772c7049325feb7130fff1f8333e9\+9',
125             'wrong md5 hash from Keep.put for "test_head": ' + locator)
126         self.assertEqual(True, self.keep_client.head(locator))
127         self.assertEqual(self.keep_client.get(locator),
128                          b'test_head',
129                          'wrong content from Keep.get for "test_head"')
130
131 class KeepPermissionTestCase(run_test_server.TestCaseWithServers):
132     MAIN_SERVER = {}
133     KEEP_SERVER = {'blob_signing_key': 'abcdefghijk0123456789',
134                    'enforce_permissions': True}
135
136     def test_KeepBasicRWTest(self):
137         run_test_server.authorize_with('active')
138         keep_client = arvados.KeepClient()
139         foo_locator = keep_client.put('foo')
140         self.assertRegex(
141             foo_locator,
142             r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
143             'invalid locator from Keep.put("foo"): ' + foo_locator)
144         self.assertEqual(keep_client.get(foo_locator),
145                          b'foo',
146                          'wrong content from Keep.get(md5("foo"))')
147
148         # GET with an unsigned locator => NotFound
149         bar_locator = keep_client.put('bar')
150         unsigned_bar_locator = "37b51d194a7513e45b56f6524f2d51f2+3"
151         self.assertRegex(
152             bar_locator,
153             r'^37b51d194a7513e45b56f6524f2d51f2\+3\+A[a-f0-9]+@[a-f0-9]+$',
154             'invalid locator from Keep.put("bar"): ' + bar_locator)
155         self.assertRaises(arvados.errors.NotFoundError,
156                           keep_client.get,
157                           unsigned_bar_locator)
158
159         # GET from a different user => NotFound
160         run_test_server.authorize_with('spectator')
161         self.assertRaises(arvados.errors.NotFoundError,
162                           arvados.Keep.get,
163                           bar_locator)
164
165         # Unauthenticated GET for a signed locator => NotFound
166         # Unauthenticated GET for an unsigned locator => NotFound
167         keep_client.api_token = ''
168         self.assertRaises(arvados.errors.NotFoundError,
169                           keep_client.get,
170                           bar_locator)
171         self.assertRaises(arvados.errors.NotFoundError,
172                           keep_client.get,
173                           unsigned_bar_locator)
174
175
176 # KeepOptionalPermission: starts Keep with --permission-key-file
177 # but not --enforce-permissions (i.e. generate signatures on PUT
178 # requests, but do not require them for GET requests)
179 #
180 # All of these requests should succeed when permissions are optional:
181 # * authenticated request, signed locator
182 # * authenticated request, unsigned locator
183 # * unauthenticated request, signed locator
184 # * unauthenticated request, unsigned locator
185 class KeepOptionalPermission(run_test_server.TestCaseWithServers):
186     MAIN_SERVER = {}
187     KEEP_SERVER = {'blob_signing_key': 'abcdefghijk0123456789',
188                    'enforce_permissions': False}
189
190     @classmethod
191     def setUpClass(cls):
192         super(KeepOptionalPermission, cls).setUpClass()
193         run_test_server.authorize_with("admin")
194         cls.api_client = arvados.api('v1')
195
196     def setUp(self):
197         super(KeepOptionalPermission, self).setUp()
198         self.keep_client = arvados.KeepClient(api_client=self.api_client,
199                                               proxy='', local_store='')
200
201     def _put_foo_and_check(self):
202         signed_locator = self.keep_client.put('foo')
203         self.assertRegex(
204             signed_locator,
205             r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
206             'invalid locator from Keep.put("foo"): ' + signed_locator)
207         return signed_locator
208
209     def test_KeepAuthenticatedSignedTest(self):
210         signed_locator = self._put_foo_and_check()
211         self.assertEqual(self.keep_client.get(signed_locator),
212                          b'foo',
213                          'wrong content from Keep.get(md5("foo"))')
214
215     def test_KeepAuthenticatedUnsignedTest(self):
216         signed_locator = self._put_foo_and_check()
217         self.assertEqual(self.keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8"),
218                          b'foo',
219                          'wrong content from Keep.get(md5("foo"))')
220
221     def test_KeepUnauthenticatedSignedTest(self):
222         # Check that signed GET requests work even when permissions
223         # enforcement is off.
224         signed_locator = self._put_foo_and_check()
225         self.keep_client.api_token = ''
226         self.assertEqual(self.keep_client.get(signed_locator),
227                          b'foo',
228                          'wrong content from Keep.get(md5("foo"))')
229
230     def test_KeepUnauthenticatedUnsignedTest(self):
231         # Since --enforce-permissions is not in effect, GET requests
232         # need not be authenticated.
233         signed_locator = self._put_foo_and_check()
234         self.keep_client.api_token = ''
235         self.assertEqual(self.keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8"),
236                          b'foo',
237                          'wrong content from Keep.get(md5("foo"))')
238
239
240 class KeepProxyTestCase(run_test_server.TestCaseWithServers):
241     MAIN_SERVER = {}
242     KEEP_SERVER = {}
243     KEEP_PROXY_SERVER = {}
244
245     @classmethod
246     def setUpClass(cls):
247         super(KeepProxyTestCase, cls).setUpClass()
248         run_test_server.authorize_with('active')
249         cls.api_client = arvados.api('v1')
250
251     def tearDown(self):
252         arvados.config.settings().pop('ARVADOS_EXTERNAL_CLIENT', None)
253         super(KeepProxyTestCase, self).tearDown()
254
255     def test_KeepProxyTest1(self):
256         # Will use ARVADOS_KEEP_SERVICES environment variable that
257         # is set by setUpClass().
258         keep_client = arvados.KeepClient(api_client=self.api_client,
259                                          local_store='')
260         baz_locator = keep_client.put('baz')
261         self.assertRegex(
262             baz_locator,
263             '^73feffa4b7f6bb68e44cf984c85f6e88\+3',
264             'wrong md5 hash from Keep.put("baz"): ' + baz_locator)
265         self.assertEqual(keep_client.get(baz_locator),
266                          b'baz',
267                          'wrong content from Keep.get(md5("baz"))')
268         self.assertTrue(keep_client.using_proxy)
269
270     def test_KeepProxyTest2(self):
271         # Don't instantiate the proxy directly, but set the X-External-Client
272         # header.  The API server should direct us to the proxy.
273         arvados.config.settings()['ARVADOS_EXTERNAL_CLIENT'] = 'true'
274         keep_client = arvados.KeepClient(api_client=self.api_client,
275                                          proxy='', local_store='')
276         baz_locator = keep_client.put('baz2')
277         self.assertRegex(
278             baz_locator,
279             '^91f372a266fe2bf2823cb8ec7fda31ce\+4',
280             'wrong md5 hash from Keep.put("baz2"): ' + baz_locator)
281         self.assertEqual(keep_client.get(baz_locator),
282                          b'baz2',
283                          'wrong content from Keep.get(md5("baz2"))')
284         self.assertTrue(keep_client.using_proxy)
285
286     def test_KeepProxyTestMultipleURIs(self):
287         # Test using ARVADOS_KEEP_SERVICES env var overriding any
288         # existing proxy setting and setting multiple proxies
289         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
290         keep_client = arvados.KeepClient(api_client=self.api_client,
291                                          local_store='')
292         uris = [x['_service_root'] for x in keep_client._keep_services]
293         self.assertEqual(uris, ['http://10.0.0.1/',
294                                 'https://foo.example.org:1234/'])
295
296     def test_KeepProxyTestInvalidURI(self):
297         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org'
298         with self.assertRaises(arvados.errors.ArgumentError):
299             keep_client = arvados.KeepClient(api_client=self.api_client,
300                                              local_store='')
301
302
303 class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
304     def get_service_roots(self, api_client):
305         keep_client = arvados.KeepClient(api_client=api_client)
306         services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
307         return [urllib.parse.urlparse(url) for url in sorted(services)]
308
309     def test_ssl_flag_respected_in_roots(self):
310         for ssl_flag in [False, True]:
311             services = self.get_service_roots(self.mock_keep_services(
312                 service_ssl_flag=ssl_flag))
313             self.assertEqual(
314                 ('https' if ssl_flag else 'http'), services[0].scheme)
315
316     def test_correct_ports_with_ipv6_addresses(self):
317         service = self.get_service_roots(self.mock_keep_services(
318             service_type='proxy', service_host='100::1', service_port=10, count=1))[0]
319         self.assertEqual('100::1', service.hostname)
320         self.assertEqual(10, service.port)
321
322     def test_insecure_disables_tls_verify(self):
323         api_client = self.mock_keep_services(count=1)
324         force_timeout = socket.timeout("timed out")
325
326         api_client.insecure = True
327         with tutil.mock_keep_responses(b'foo', 200) as mock:
328             keep_client = arvados.KeepClient(api_client=api_client)
329             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
330             self.assertEqual(
331                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
332                 0)
333
334         api_client.insecure = False
335         with tutil.mock_keep_responses(b'foo', 200) as mock:
336             keep_client = arvados.KeepClient(api_client=api_client)
337             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
338             # getopt()==None here means we didn't change the
339             # default. If we were using real pycurl instead of a mock,
340             # it would return the default value 1.
341             self.assertEqual(
342                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
343                 None)
344
345     def test_refresh_signature(self):
346         blk_digest = '6f5902ac237024bdd0c176cb93063dc4+11'
347         blk_sig = 'da39a3ee5e6b4b0d3255bfef95601890afd80709@53bed294'
348         local_loc = blk_digest+'+A'+blk_sig
349         remote_loc = blk_digest+'+R'+blk_sig
350         api_client = self.mock_keep_services(count=1)
351         headers = {'X-Keep-Locator':local_loc}
352         with tutil.mock_keep_responses('', 200, **headers):
353             # Check that the translated locator gets returned
354             keep_client = arvados.KeepClient(api_client=api_client)
355             self.assertEqual(local_loc, keep_client.refresh_signature(remote_loc))
356             # Check that refresh_signature() uses the correct method and headers
357             keep_client._get_or_head = mock.MagicMock()
358             keep_client.refresh_signature(remote_loc)
359             args, kwargs = keep_client._get_or_head.call_args_list[0]
360             self.assertIn(remote_loc, args)
361             self.assertEqual("HEAD", kwargs['method'])
362             self.assertIn('X-Keep-Signature', kwargs['headers'])
363
364     # test_*_timeout verify that KeepClient instructs pycurl to use
365     # the appropriate connection and read timeouts. They don't care
366     # whether pycurl actually exhibits the expected timeout behavior
367     # -- those tests are in the KeepClientTimeout test class.
368
369     def test_get_timeout(self):
370         api_client = self.mock_keep_services(count=1)
371         force_timeout = socket.timeout("timed out")
372         with tutil.mock_keep_responses(force_timeout, 0) as mock:
373             keep_client = arvados.KeepClient(api_client=api_client)
374             with self.assertRaises(arvados.errors.KeepReadError):
375                 keep_client.get('ffffffffffffffffffffffffffffffff')
376             self.assertEqual(
377                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
378                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
379             self.assertEqual(
380                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
381                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
382             self.assertEqual(
383                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
384                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
385
386     def test_put_timeout(self):
387         api_client = self.mock_keep_services(count=1)
388         force_timeout = socket.timeout("timed out")
389         with tutil.mock_keep_responses(force_timeout, 0) as mock:
390             keep_client = arvados.KeepClient(api_client=api_client)
391             with self.assertRaises(arvados.errors.KeepWriteError):
392                 keep_client.put(b'foo')
393             self.assertEqual(
394                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
395                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
396             self.assertEqual(
397                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
398                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
399             self.assertEqual(
400                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
401                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
402
403     def test_head_timeout(self):
404         api_client = self.mock_keep_services(count=1)
405         force_timeout = socket.timeout("timed out")
406         with tutil.mock_keep_responses(force_timeout, 0) as mock:
407             keep_client = arvados.KeepClient(api_client=api_client)
408             with self.assertRaises(arvados.errors.KeepReadError):
409                 keep_client.head('ffffffffffffffffffffffffffffffff')
410             self.assertEqual(
411                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
412                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
413             self.assertEqual(
414                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
415                 None)
416             self.assertEqual(
417                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
418                 None)
419
420     def test_proxy_get_timeout(self):
421         api_client = self.mock_keep_services(service_type='proxy', count=1)
422         force_timeout = socket.timeout("timed out")
423         with tutil.mock_keep_responses(force_timeout, 0) as mock:
424             keep_client = arvados.KeepClient(api_client=api_client)
425             with self.assertRaises(arvados.errors.KeepReadError):
426                 keep_client.get('ffffffffffffffffffffffffffffffff')
427             self.assertEqual(
428                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
429                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
430             self.assertEqual(
431                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
432                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
433             self.assertEqual(
434                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
435                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
436
437     def test_proxy_head_timeout(self):
438         api_client = self.mock_keep_services(service_type='proxy', count=1)
439         force_timeout = socket.timeout("timed out")
440         with tutil.mock_keep_responses(force_timeout, 0) as mock:
441             keep_client = arvados.KeepClient(api_client=api_client)
442             with self.assertRaises(arvados.errors.KeepReadError):
443                 keep_client.head('ffffffffffffffffffffffffffffffff')
444             self.assertEqual(
445                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
446                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
447             self.assertEqual(
448                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
449                 None)
450             self.assertEqual(
451                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
452                 None)
453
454     def test_proxy_put_timeout(self):
455         api_client = self.mock_keep_services(service_type='proxy', count=1)
456         force_timeout = socket.timeout("timed out")
457         with tutil.mock_keep_responses(force_timeout, 0) as mock:
458             keep_client = arvados.KeepClient(api_client=api_client)
459             with self.assertRaises(arvados.errors.KeepWriteError):
460                 keep_client.put('foo')
461             self.assertEqual(
462                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
463                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
464             self.assertEqual(
465                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
466                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
467             self.assertEqual(
468                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
469                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
470
471     def check_no_services_error(self, verb, exc_class):
472         api_client = mock.MagicMock(name='api_client')
473         api_client.keep_services().accessible().execute.side_effect = (
474             arvados.errors.ApiError)
475         keep_client = arvados.KeepClient(api_client=api_client)
476         with self.assertRaises(exc_class) as err_check:
477             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
478         self.assertEqual(0, len(err_check.exception.request_errors()))
479
480     def test_get_error_with_no_services(self):
481         self.check_no_services_error('get', arvados.errors.KeepReadError)
482
483     def test_head_error_with_no_services(self):
484         self.check_no_services_error('head', arvados.errors.KeepReadError)
485
486     def test_put_error_with_no_services(self):
487         self.check_no_services_error('put', arvados.errors.KeepWriteError)
488
489     def check_errors_from_last_retry(self, verb, exc_class):
490         api_client = self.mock_keep_services(count=2)
491         req_mock = tutil.mock_keep_responses(
492             "retry error reporting test", 500, 500, 403, 403)
493         with req_mock, tutil.skip_sleep, \
494                 self.assertRaises(exc_class) as err_check:
495             keep_client = arvados.KeepClient(api_client=api_client)
496             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
497                                        num_retries=3)
498         self.assertEqual([403, 403], [
499                 getattr(error, 'status_code', None)
500                 for error in err_check.exception.request_errors().values()])
501
502     def test_get_error_reflects_last_retry(self):
503         self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
504
505     def test_head_error_reflects_last_retry(self):
506         self.check_errors_from_last_retry('head', arvados.errors.KeepReadError)
507
508     def test_put_error_reflects_last_retry(self):
509         self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
510
511     def test_put_error_does_not_include_successful_puts(self):
512         data = 'partial failure test'
513         data_loc = tutil.str_keep_locator(data)
514         api_client = self.mock_keep_services(count=3)
515         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
516                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
517             keep_client = arvados.KeepClient(api_client=api_client)
518             keep_client.put(data)
519         self.assertEqual(2, len(exc_check.exception.request_errors()))
520
521     def test_proxy_put_with_no_writable_services(self):
522         data = 'test with no writable services'
523         data_loc = tutil.str_keep_locator(data)
524         api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
525         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
526                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
527           keep_client = arvados.KeepClient(api_client=api_client)
528           keep_client.put(data)
529         self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
530         self.assertEqual(0, len(exc_check.exception.request_errors()))
531
532     def test_oddball_service_get(self):
533         body = b'oddball service get'
534         api_client = self.mock_keep_services(service_type='fancynewblobstore')
535         with tutil.mock_keep_responses(body, 200):
536             keep_client = arvados.KeepClient(api_client=api_client)
537             actual = keep_client.get(tutil.str_keep_locator(body))
538         self.assertEqual(body, actual)
539
540     def test_oddball_service_put(self):
541         body = b'oddball service put'
542         pdh = tutil.str_keep_locator(body)
543         api_client = self.mock_keep_services(service_type='fancynewblobstore')
544         with tutil.mock_keep_responses(pdh, 200):
545             keep_client = arvados.KeepClient(api_client=api_client)
546             actual = keep_client.put(body, copies=1)
547         self.assertEqual(pdh, actual)
548
549     def test_oddball_service_writer_count(self):
550         body = b'oddball service writer count'
551         pdh = tutil.str_keep_locator(body)
552         api_client = self.mock_keep_services(service_type='fancynewblobstore',
553                                              count=4)
554         headers = {'x-keep-replicas-stored': 3}
555         with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
556                                        **headers) as req_mock:
557             keep_client = arvados.KeepClient(api_client=api_client)
558             actual = keep_client.put(body, copies=2)
559         self.assertEqual(pdh, actual)
560         self.assertEqual(1, req_mock.call_count)
561
562
563 @tutil.skip_sleep
564 class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
565     def setUp(self):
566         self.api_client = self.mock_keep_services(count=2)
567         self.keep_client = arvados.KeepClient(api_client=self.api_client)
568         self.data = b'xyzzy'
569         self.locator = '1271ed5ef305aadabc605b1609e24c52'
570
571     @mock.patch('arvados.KeepClient.KeepService.get')
572     def test_get_request_cache(self, get_mock):
573         with tutil.mock_keep_responses(self.data, 200, 200):
574             self.keep_client.get(self.locator)
575             self.keep_client.get(self.locator)
576         # Request already cached, don't require more than one request
577         get_mock.assert_called_once()
578
579     @mock.patch('arvados.KeepClient.KeepService.get')
580     def test_head_request_cache(self, get_mock):
581         with tutil.mock_keep_responses(self.data, 200, 200):
582             self.keep_client.head(self.locator)
583             self.keep_client.head(self.locator)
584         # Don't cache HEAD requests so that they're not confused with GET reqs
585         self.assertEqual(2, get_mock.call_count)
586
587     @mock.patch('arvados.KeepClient.KeepService.get')
588     def test_head_and_then_get_return_different_responses(self, get_mock):
589         head_resp = None
590         get_resp = None
591         get_mock.side_effect = ['first response', 'second response']
592         with tutil.mock_keep_responses(self.data, 200, 200):
593             head_resp = self.keep_client.head(self.locator)
594             get_resp = self.keep_client.get(self.locator)
595         self.assertEqual('first response', head_resp)
596         # First reponse was not cached because it was from a HEAD request.
597         self.assertNotEqual(head_resp, get_resp)
598
599
600 @tutil.skip_sleep
601 class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock):
602     def setUp(self):
603         self.api_client = self.mock_keep_services(count=2)
604         self.keep_client = arvados.KeepClient(api_client=self.api_client)
605         self.data = b'xyzzy'
606         self.locator = '1271ed5ef305aadabc605b1609e24c52'
607         self.test_id = arvados.util.new_request_id()
608         self.assertRegex(self.test_id, r'^req-[a-z0-9]{20}$')
609         # If we don't set request_id to None explicitly here, it will
610         # return <MagicMock name='api_client_mock.request_id'
611         # id='123456789'>:
612         self.api_client.request_id = None
613
614     def test_default_to_api_client_request_id(self):
615         self.api_client.request_id = self.test_id
616         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
617             self.keep_client.put(self.data)
618         self.assertEqual(2, len(mock.responses))
619         for resp in mock.responses:
620             self.assertProvidedRequestId(resp)
621
622         with tutil.mock_keep_responses(self.data, 200) as mock:
623             self.keep_client.get(self.locator)
624         self.assertProvidedRequestId(mock.responses[0])
625
626         with tutil.mock_keep_responses(b'', 200) as mock:
627             self.keep_client.head(self.locator)
628         self.assertProvidedRequestId(mock.responses[0])
629
630     def test_explicit_request_id(self):
631         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
632             self.keep_client.put(self.data, request_id=self.test_id)
633         self.assertEqual(2, len(mock.responses))
634         for resp in mock.responses:
635             self.assertProvidedRequestId(resp)
636
637         with tutil.mock_keep_responses(self.data, 200) as mock:
638             self.keep_client.get(self.locator, request_id=self.test_id)
639         self.assertProvidedRequestId(mock.responses[0])
640
641         with tutil.mock_keep_responses(b'', 200) as mock:
642             self.keep_client.head(self.locator, request_id=self.test_id)
643         self.assertProvidedRequestId(mock.responses[0])
644
645     def test_automatic_request_id(self):
646         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
647             self.keep_client.put(self.data)
648         self.assertEqual(2, len(mock.responses))
649         for resp in mock.responses:
650             self.assertAutomaticRequestId(resp)
651
652         with tutil.mock_keep_responses(self.data, 200) as mock:
653             self.keep_client.get(self.locator)
654         self.assertAutomaticRequestId(mock.responses[0])
655
656         with tutil.mock_keep_responses(b'', 200) as mock:
657             self.keep_client.head(self.locator)
658         self.assertAutomaticRequestId(mock.responses[0])
659
660     def assertAutomaticRequestId(self, resp):
661         hdr = [x for x in resp.getopt(pycurl.HTTPHEADER)
662                if x.startswith('X-Request-Id: ')][0]
663         self.assertNotEqual(hdr, 'X-Request-Id: '+self.test_id)
664         self.assertRegex(hdr, r'^X-Request-Id: req-[a-z0-9]{20}$')
665
666     def assertProvidedRequestId(self, resp):
667         self.assertIn('X-Request-Id: '+self.test_id,
668                       resp.getopt(pycurl.HTTPHEADER))
669
670
671 @tutil.skip_sleep
672 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
673
674     def setUp(self):
675         # expected_order[i] is the probe order for
676         # hash=md5(sprintf("%064x",i)) where there are 16 services
677         # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
678         # the first probe for the block consisting of 64 "0"
679         # characters is the service whose uuid is
680         # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
681         self.services = 16
682         self.expected_order = [
683             list('3eab2d5fc9681074'),
684             list('097dba52e648f1c3'),
685             list('c5b4e023f8a7d691'),
686             list('9d81c02e76a3bf54'),
687             ]
688         self.blocks = [
689             "{:064x}".format(x).encode()
690             for x in range(len(self.expected_order))]
691         self.hashes = [
692             hashlib.md5(self.blocks[x]).hexdigest()
693             for x in range(len(self.expected_order))]
694         self.api_client = self.mock_keep_services(count=self.services)
695         self.keep_client = arvados.KeepClient(api_client=self.api_client)
696
697     def test_weighted_service_roots_against_reference_set(self):
698         # Confirm weighted_service_roots() returns the correct order
699         for i, hash in enumerate(self.hashes):
700             roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash))
701             got_order = [
702                 re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
703                 for root in roots]
704             self.assertEqual(self.expected_order[i], got_order)
705
706     def test_get_probe_order_against_reference_set(self):
707         self._test_probe_order_against_reference_set(
708             lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
709
710     def test_head_probe_order_against_reference_set(self):
711         self._test_probe_order_against_reference_set(
712             lambda i: self.keep_client.head(self.hashes[i], num_retries=1))
713
714     def test_put_probe_order_against_reference_set(self):
715         # copies=1 prevents the test from being sensitive to races
716         # between writer threads.
717         self._test_probe_order_against_reference_set(
718             lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1))
719
720     def _test_probe_order_against_reference_set(self, op):
721         for i in range(len(self.blocks)):
722             with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \
723                  self.assertRaises(arvados.errors.KeepRequestError):
724                 op(i)
725             got_order = [
726                 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
727                 for resp in mock.responses]
728             self.assertEqual(self.expected_order[i]*2, got_order)
729
730     def test_put_probe_order_multiple_copies(self):
731         for copies in range(2, 4):
732             for i in range(len(self.blocks)):
733                 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \
734                      self.assertRaises(arvados.errors.KeepWriteError):
735                     self.keep_client.put(self.blocks[i], num_retries=2, copies=copies)
736                 got_order = [
737                     re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
738                     for resp in mock.responses]
739                 # With T threads racing to make requests, the position
740                 # of a given server in the sequence of HTTP requests
741                 # (got_order) cannot be more than T-1 positions
742                 # earlier than that server's position in the reference
743                 # probe sequence (expected_order).
744                 #
745                 # Loop invariant: we have accounted for +pos+ expected
746                 # probes, either by seeing them in +got_order+ or by
747                 # putting them in +pending+ in the hope of seeing them
748                 # later. As long as +len(pending)<T+, we haven't
749                 # started a request too early.
750                 pending = []
751                 for pos, expected in enumerate(self.expected_order[i]*3):
752                     got = got_order[pos-len(pending)]
753                     while got in pending:
754                         del pending[pending.index(got)]
755                         got = got_order[pos-len(pending)]
756                     if got != expected:
757                         pending.append(expected)
758                         self.assertLess(
759                             len(pending), copies,
760                             "pending={}, with copies={}, got {}, expected {}".format(
761                                 pending, copies, repr(got_order), repr(self.expected_order[i]*3)))
762
763     def test_probe_waste_adding_one_server(self):
764         hashes = [
765             hashlib.md5("{:064x}".format(x).encode()).hexdigest() for x in range(100)]
766         initial_services = 12
767         self.api_client = self.mock_keep_services(count=initial_services)
768         self.keep_client = arvados.KeepClient(api_client=self.api_client)
769         probes_before = [
770             self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
771         for added_services in range(1, 12):
772             api_client = self.mock_keep_services(count=initial_services+added_services)
773             keep_client = arvados.KeepClient(api_client=api_client)
774             total_penalty = 0
775             for hash_index in range(len(hashes)):
776                 probe_after = keep_client.weighted_service_roots(
777                     arvados.KeepLocator(hashes[hash_index]))
778                 penalty = probe_after.index(probes_before[hash_index][0])
779                 self.assertLessEqual(penalty, added_services)
780                 total_penalty += penalty
781             # Average penalty per block should not exceed
782             # N(added)/N(orig) by more than 20%, and should get closer
783             # to the ideal as we add data points.
784             expect_penalty = (
785                 added_services *
786                 len(hashes) / initial_services)
787             max_penalty = (
788                 expect_penalty *
789                 (120 - added_services)/100)
790             min_penalty = (
791                 expect_penalty * 8/10)
792             self.assertTrue(
793                 min_penalty <= total_penalty <= max_penalty,
794                 "With {}+{} services, {} blocks, penalty {} but expected {}..{}".format(
795                     initial_services,
796                     added_services,
797                     len(hashes),
798                     total_penalty,
799                     min_penalty,
800                     max_penalty))
801
802     def check_64_zeros_error_order(self, verb, exc_class):
803         data = b'0' * 64
804         if verb == 'get':
805             data = tutil.str_keep_locator(data)
806         # Arbitrary port number:
807         aport = random.randint(1024,65535)
808         api_client = self.mock_keep_services(service_port=aport, count=self.services)
809         keep_client = arvados.KeepClient(api_client=api_client)
810         with mock.patch('pycurl.Curl') as curl_mock, \
811              self.assertRaises(exc_class) as err_check:
812             curl_mock.return_value = tutil.FakeCurl.make(code=500, body=b'')
813             getattr(keep_client, verb)(data)
814         urls = [urllib.parse.urlparse(url)
815                 for url in err_check.exception.request_errors()]
816         self.assertEqual([('keep0x' + c, aport) for c in '3eab2d5fc9681074'],
817                          [(url.hostname, url.port) for url in urls])
818
819     def test_get_error_shows_probe_order(self):
820         self.check_64_zeros_error_order('get', arvados.errors.KeepReadError)
821
822     def test_put_error_shows_probe_order(self):
823         self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
824
825
826 class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase):
827     # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
828     # 1s worth of data and then trigger bandwidth errors before running
829     # out of data.
830     DATA = b'x'*2**11
831     BANDWIDTH_LOW_LIM = 1024
832     TIMEOUT_TIME = 1.0
833
834     class assertTakesBetween(unittest.TestCase):
835         def __init__(self, tmin, tmax):
836             self.tmin = tmin
837             self.tmax = tmax
838
839         def __enter__(self):
840             self.t0 = time.time()
841
842         def __exit__(self, *args, **kwargs):
843             # Round times to milliseconds, like CURL. Otherwise, we
844             # fail when CURL reaches a 1s timeout at 0.9998s.
845             delta = round(time.time() - self.t0, 3)
846             self.assertGreaterEqual(delta, self.tmin)
847             self.assertLessEqual(delta, self.tmax)
848
849     class assertTakesGreater(unittest.TestCase):
850         def __init__(self, tmin):
851             self.tmin = tmin
852
853         def __enter__(self):
854             self.t0 = time.time()
855
856         def __exit__(self, *args, **kwargs):
857             delta = round(time.time() - self.t0, 3)
858             self.assertGreaterEqual(delta, self.tmin)
859
860     def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
861         return arvados.KeepClient(
862             api_client=self.api_client,
863             timeout=timeouts)
864
865     def test_timeout_slow_connect(self):
866         # Can't simulate TCP delays with our own socket. Leave our
867         # stub server running uselessly, and try to connect to an
868         # unroutable IP address instead.
869         self.api_client = self.mock_keep_services(
870             count=1,
871             service_host='240.0.0.0',
872         )
873         with self.assertTakesBetween(0.1, 0.5):
874             with self.assertRaises(arvados.errors.KeepWriteError):
875                 self.keepClient().put(self.DATA, copies=1, num_retries=0)
876
877     def test_low_bandwidth_no_delays_success(self):
878         self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
879         kc = self.keepClient()
880         loc = kc.put(self.DATA, copies=1, num_retries=0)
881         self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
882
883     def test_too_low_bandwidth_no_delays_failure(self):
884         # Check that lessening bandwidth corresponds to failing
885         kc = self.keepClient()
886         loc = kc.put(self.DATA, copies=1, num_retries=0)
887         self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
888         with self.assertTakesGreater(self.TIMEOUT_TIME):
889             with self.assertRaises(arvados.errors.KeepReadError):
890                 kc.get(loc, num_retries=0)
891         with self.assertTakesGreater(self.TIMEOUT_TIME):
892             with self.assertRaises(arvados.errors.KeepWriteError):
893                 kc.put(self.DATA, copies=1, num_retries=0)
894
895     def test_low_bandwidth_with_server_response_delay_failure(self):
896         kc = self.keepClient()
897         loc = kc.put(self.DATA, copies=1, num_retries=0)
898         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
899         self.server.setdelays(response=self.TIMEOUT_TIME)
900         with self.assertTakesGreater(self.TIMEOUT_TIME):
901             with self.assertRaises(arvados.errors.KeepReadError):
902                 kc.get(loc, num_retries=0)
903         with self.assertTakesGreater(self.TIMEOUT_TIME):
904             with self.assertRaises(arvados.errors.KeepWriteError):
905                 kc.put(self.DATA, copies=1, num_retries=0)
906         with self.assertTakesGreater(self.TIMEOUT_TIME):
907             kc.head(loc, num_retries=0)
908
909     def test_low_bandwidth_with_server_mid_delay_failure(self):
910         kc = self.keepClient()
911         loc = kc.put(self.DATA, copies=1, num_retries=0)
912         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
913         self.server.setdelays(mid_write=self.TIMEOUT_TIME, mid_read=self.TIMEOUT_TIME)
914         with self.assertTakesGreater(self.TIMEOUT_TIME):
915             with self.assertRaises(arvados.errors.KeepReadError) as e:
916                 kc.get(loc, num_retries=0)
917         with self.assertTakesGreater(self.TIMEOUT_TIME):
918             with self.assertRaises(arvados.errors.KeepWriteError):
919                 kc.put(self.DATA, copies=1, num_retries=0)
920
921     def test_timeout_slow_request(self):
922         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
923         self.server.setdelays(request=.2)
924         self._test_connect_timeout_under_200ms(loc)
925         self.server.setdelays(request=2)
926         self._test_response_timeout_under_2s(loc)
927
928     def test_timeout_slow_response(self):
929         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
930         self.server.setdelays(response=.2)
931         self._test_connect_timeout_under_200ms(loc)
932         self.server.setdelays(response=2)
933         self._test_response_timeout_under_2s(loc)
934
935     def test_timeout_slow_response_body(self):
936         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
937         self.server.setdelays(response_body=.2)
938         self._test_connect_timeout_under_200ms(loc)
939         self.server.setdelays(response_body=2)
940         self._test_response_timeout_under_2s(loc)
941
942     def _test_connect_timeout_under_200ms(self, loc):
943         # Allow 100ms to connect, then 1s for response. Everything
944         # should work, and everything should take at least 200ms to
945         # return.
946         kc = self.keepClient(timeouts=(.1, 1))
947         with self.assertTakesBetween(.2, .3):
948             kc.put(self.DATA, copies=1, num_retries=0)
949         with self.assertTakesBetween(.2, .3):
950             self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
951
952     def _test_response_timeout_under_2s(self, loc):
953         # Allow 10s to connect, then 1s for response. Nothing should
954         # work, and everything should take at least 1s to return.
955         kc = self.keepClient(timeouts=(10, 1))
956         with self.assertTakesBetween(1, 9):
957             with self.assertRaises(arvados.errors.KeepReadError):
958                 kc.get(loc, num_retries=0)
959         with self.assertTakesBetween(1, 9):
960             with self.assertRaises(arvados.errors.KeepWriteError):
961                 kc.put(self.DATA, copies=1, num_retries=0)
962
963
964 class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
965     def mock_disks_and_gateways(self, disks=3, gateways=1):
966         self.gateways = [{
967                 'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
968                 'owner_uuid': 'zzzzz-tpzed-000000000000000',
969                 'service_host': 'gatewayhost{}'.format(i),
970                 'service_port': 12345,
971                 'service_ssl_flag': True,
972                 'service_type': 'gateway:test',
973         } for i in range(gateways)]
974         self.gateway_roots = [
975             "https://{service_host}:{service_port}/".format(**gw)
976             for gw in self.gateways]
977         self.api_client = self.mock_keep_services(
978             count=disks, additional_services=self.gateways)
979         self.keepClient = arvados.KeepClient(api_client=self.api_client)
980
981     @mock.patch('pycurl.Curl')
982     def test_get_with_gateway_hint_first(self, MockCurl):
983         MockCurl.return_value = tutil.FakeCurl.make(
984             code=200, body='foo', headers={'Content-Length': 3})
985         self.mock_disks_and_gateways()
986         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@' + self.gateways[0]['uuid']
987         self.assertEqual(b'foo', self.keepClient.get(locator))
988         self.assertEqual(self.gateway_roots[0]+locator,
989                          MockCurl.return_value.getopt(pycurl.URL).decode())
990         self.assertEqual(True, self.keepClient.head(locator))
991
992     @mock.patch('pycurl.Curl')
993     def test_get_with_gateway_hints_in_order(self, MockCurl):
994         gateways = 4
995         disks = 3
996         mocks = [
997             tutil.FakeCurl.make(code=404, body='')
998             for _ in range(gateways+disks)
999         ]
1000         MockCurl.side_effect = tutil.queue_with(mocks)
1001         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1002         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1003                            ['K@'+gw['uuid'] for gw in self.gateways])
1004         with self.assertRaises(arvados.errors.NotFoundError):
1005             self.keepClient.get(locator)
1006         # Gateways are tried first, in the order given.
1007         for i, root in enumerate(self.gateway_roots):
1008             self.assertEqual(root+locator,
1009                              mocks[i].getopt(pycurl.URL).decode())
1010         # Disk services are tried next.
1011         for i in range(gateways, gateways+disks):
1012             self.assertRegex(
1013                 mocks[i].getopt(pycurl.URL).decode(),
1014                 r'keep0x')
1015
1016     @mock.patch('pycurl.Curl')
1017     def test_head_with_gateway_hints_in_order(self, MockCurl):
1018         gateways = 4
1019         disks = 3
1020         mocks = [
1021             tutil.FakeCurl.make(code=404, body=b'')
1022             for _ in range(gateways+disks)
1023         ]
1024         MockCurl.side_effect = tutil.queue_with(mocks)
1025         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1026         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1027                            ['K@'+gw['uuid'] for gw in self.gateways])
1028         with self.assertRaises(arvados.errors.NotFoundError):
1029             self.keepClient.head(locator)
1030         # Gateways are tried first, in the order given.
1031         for i, root in enumerate(self.gateway_roots):
1032             self.assertEqual(root+locator,
1033                              mocks[i].getopt(pycurl.URL).decode())
1034         # Disk services are tried next.
1035         for i in range(gateways, gateways+disks):
1036             self.assertRegex(
1037                 mocks[i].getopt(pycurl.URL).decode(),
1038                 r'keep0x')
1039
1040     @mock.patch('pycurl.Curl')
1041     def test_get_with_remote_proxy_hint(self, MockCurl):
1042         MockCurl.return_value = tutil.FakeCurl.make(
1043             code=200, body=b'foo', headers={'Content-Length': 3})
1044         self.mock_disks_and_gateways()
1045         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1046         self.assertEqual(b'foo', self.keepClient.get(locator))
1047         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1048                          MockCurl.return_value.getopt(pycurl.URL).decode())
1049
1050     @mock.patch('pycurl.Curl')
1051     def test_head_with_remote_proxy_hint(self, MockCurl):
1052         MockCurl.return_value = tutil.FakeCurl.make(
1053             code=200, body=b'foo', headers={'Content-Length': 3})
1054         self.mock_disks_and_gateways()
1055         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1056         self.assertEqual(True, self.keepClient.head(locator))
1057         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1058                          MockCurl.return_value.getopt(pycurl.URL).decode())
1059
1060
1061 class KeepClientRetryTestMixin(object):
1062     # Testing with a local Keep store won't exercise the retry behavior.
1063     # Instead, our strategy is:
1064     # * Create a client with one proxy specified (pointed at a black
1065     #   hole), so there's no need to instantiate an API client, and
1066     #   all HTTP requests come from one place.
1067     # * Mock httplib's request method to provide simulated responses.
1068     # This lets us test the retry logic extensively without relying on any
1069     # supporting servers, and prevents side effects in case something hiccups.
1070     # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
1071     # run_method().
1072     #
1073     # Test classes must define TEST_PATCHER to a method that mocks
1074     # out appropriate methods in the client.
1075
1076     PROXY_ADDR = 'http://[%s]:65535/' % (tutil.TEST_HOST,)
1077     TEST_DATA = b'testdata'
1078     TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
1079
1080     def setUp(self):
1081         self.client_kwargs = {'proxy': self.PROXY_ADDR, 'local_store': ''}
1082
1083     def new_client(self, **caller_kwargs):
1084         kwargs = self.client_kwargs.copy()
1085         kwargs.update(caller_kwargs)
1086         return arvados.KeepClient(**kwargs)
1087
1088     def run_method(self, *args, **kwargs):
1089         raise NotImplementedError("test subclasses must define run_method")
1090
1091     def check_success(self, expected=None, *args, **kwargs):
1092         if expected is None:
1093             expected = self.DEFAULT_EXPECT
1094         self.assertEqual(expected, self.run_method(*args, **kwargs))
1095
1096     def check_exception(self, error_class=None, *args, **kwargs):
1097         if error_class is None:
1098             error_class = self.DEFAULT_EXCEPTION
1099         self.assertRaises(error_class, self.run_method, *args, **kwargs)
1100
1101     def test_immediate_success(self):
1102         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 200):
1103             self.check_success()
1104
1105     def test_retry_then_success(self):
1106         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1107             self.check_success(num_retries=3)
1108
1109     def test_exception_then_success(self):
1110         with self.TEST_PATCHER(self.DEFAULT_EXPECT, Exception('mock err'), 200):
1111             self.check_success(num_retries=3)
1112
1113     def test_no_default_retry(self):
1114         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1115             self.check_exception()
1116
1117     def test_no_retry_after_permanent_error(self):
1118         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 403, 200):
1119             self.check_exception(num_retries=3)
1120
1121     def test_error_after_retries_exhausted(self):
1122         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 500, 200):
1123             self.check_exception(num_retries=1)
1124
1125     def test_num_retries_instance_fallback(self):
1126         self.client_kwargs['num_retries'] = 3
1127         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1128             self.check_success()
1129
1130
1131 @tutil.skip_sleep
1132 class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1133     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
1134     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1135     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1136     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1137
1138     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1139                    *args, **kwargs):
1140         return self.new_client().get(locator, *args, **kwargs)
1141
1142     def test_specific_exception_when_not_found(self):
1143         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1144             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1145
1146     def test_general_exception_with_mixed_errors(self):
1147         # get should raise a NotFoundError if no server returns the block,
1148         # and a high threshold of servers report that it's not found.
1149         # This test rigs up 50/50 disagreement between two servers, and
1150         # checks that it does not become a NotFoundError.
1151         client = self.new_client()
1152         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1153             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1154                 client.get(self.HINTED_LOCATOR)
1155             self.assertNotIsInstance(
1156                 exc_check.exception, arvados.errors.NotFoundError,
1157                 "mixed errors raised NotFoundError")
1158
1159     def test_hint_server_can_succeed_without_retries(self):
1160         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1161             self.check_success(locator=self.HINTED_LOCATOR)
1162
1163     def test_try_next_server_after_timeout(self):
1164         with tutil.mock_keep_responses(
1165                 (socket.timeout("timed out"), 200),
1166                 (self.DEFAULT_EXPECT, 200)):
1167             self.check_success(locator=self.HINTED_LOCATOR)
1168
1169     def test_retry_data_with_wrong_checksum(self):
1170         with tutil.mock_keep_responses(
1171                 ('baddata', 200),
1172                 (self.DEFAULT_EXPECT, 200)):
1173             self.check_success(locator=self.HINTED_LOCATOR)
1174
1175 @tutil.skip_sleep
1176 class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1177     DEFAULT_EXPECT = True
1178     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1179     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1180     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1181
1182     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1183                    *args, **kwargs):
1184         return self.new_client().head(locator, *args, **kwargs)
1185
1186     def test_specific_exception_when_not_found(self):
1187         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1188             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1189
1190     def test_general_exception_with_mixed_errors(self):
1191         # head should raise a NotFoundError if no server returns the block,
1192         # and a high threshold of servers report that it's not found.
1193         # This test rigs up 50/50 disagreement between two servers, and
1194         # checks that it does not become a NotFoundError.
1195         client = self.new_client()
1196         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1197             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1198                 client.head(self.HINTED_LOCATOR)
1199             self.assertNotIsInstance(
1200                 exc_check.exception, arvados.errors.NotFoundError,
1201                 "mixed errors raised NotFoundError")
1202
1203     def test_hint_server_can_succeed_without_retries(self):
1204         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1205             self.check_success(locator=self.HINTED_LOCATOR)
1206
1207     def test_try_next_server_after_timeout(self):
1208         with tutil.mock_keep_responses(
1209                 (socket.timeout("timed out"), 200),
1210                 (self.DEFAULT_EXPECT, 200)):
1211             self.check_success(locator=self.HINTED_LOCATOR)
1212
1213 @tutil.skip_sleep
1214 class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1215     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
1216     DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
1217     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1218
1219     def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
1220                    copies=1, *args, **kwargs):
1221         return self.new_client().put(data, copies, *args, **kwargs)
1222
1223     def test_do_not_send_multiple_copies_to_same_server(self):
1224         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200):
1225             self.check_exception(copies=2, num_retries=3)
1226
1227
1228 class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
1229
1230     class FakeKeepService(object):
1231         def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
1232             self.delay = delay
1233             self.will_succeed = will_succeed
1234             self.will_raise = will_raise
1235             self._result = {}
1236             self._result['headers'] = {}
1237             self._result['headers']['x-keep-replicas-stored'] = str(replicas)
1238             self._result['body'] = 'foobar'
1239
1240         def put(self, data_hash, data, timeout):
1241             time.sleep(self.delay)
1242             if self.will_raise is not None:
1243                 raise self.will_raise
1244             return self.will_succeed
1245
1246         def last_result(self):
1247             if self.will_succeed:
1248                 return self._result
1249
1250         def finished(self):
1251             return False
1252
1253     def setUp(self):
1254         self.copies = 3
1255         self.pool = arvados.KeepClient.KeepWriterThreadPool(
1256             data = 'foo',
1257             data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1258             max_service_replicas = self.copies,
1259             copies = self.copies
1260         )
1261
1262     def test_only_write_enough_on_success(self):
1263         for i in range(10):
1264             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1265             self.pool.add_task(ks, None)
1266         self.pool.join()
1267         self.assertEqual(self.pool.done(), self.copies)
1268
1269     def test_only_write_enough_on_partial_success(self):
1270         for i in range(5):
1271             ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
1272             self.pool.add_task(ks, None)
1273             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1274             self.pool.add_task(ks, None)
1275         self.pool.join()
1276         self.assertEqual(self.pool.done(), self.copies)
1277
1278     def test_only_write_enough_when_some_crash(self):
1279         for i in range(5):
1280             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1281             self.pool.add_task(ks, None)
1282             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1283             self.pool.add_task(ks, None)
1284         self.pool.join()
1285         self.assertEqual(self.pool.done(), self.copies)
1286
1287     def test_fail_when_too_many_crash(self):
1288         for i in range(self.copies+1):
1289             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1290             self.pool.add_task(ks, None)
1291         for i in range(self.copies-1):
1292             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1293             self.pool.add_task(ks, None)
1294         self.pool.join()
1295         self.assertEqual(self.pool.done(), self.copies-1)
1296
1297
1298 @tutil.skip_sleep
1299 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock):
1300     # Test put()s that need two distinct servers to succeed, possibly
1301     # requiring multiple passes through the retry loop.
1302
1303     def setUp(self):
1304         self.api_client = self.mock_keep_services(count=2)
1305         self.keep_client = arvados.KeepClient(api_client=self.api_client)
1306
1307     def test_success_after_exception(self):
1308         with tutil.mock_keep_responses(
1309                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1310                 Exception('mock err'), 200, 200) as req_mock:
1311             self.keep_client.put('foo', num_retries=1, copies=2)
1312         self.assertEqual(3, req_mock.call_count)
1313
1314     def test_success_after_retryable_error(self):
1315         with tutil.mock_keep_responses(
1316                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1317                 500, 200, 200) as req_mock:
1318             self.keep_client.put('foo', num_retries=1, copies=2)
1319         self.assertEqual(3, req_mock.call_count)
1320
1321     def test_fail_after_final_error(self):
1322         # First retry loop gets a 200 (can't achieve replication by
1323         # storing again on that server) and a 400 (can't retry that
1324         # server at all), so we shouldn't try a third request.
1325         with tutil.mock_keep_responses(
1326                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1327                 200, 400, 200) as req_mock:
1328             with self.assertRaises(arvados.errors.KeepWriteError):
1329                 self.keep_client.put('foo', num_retries=1, copies=2)
1330         self.assertEqual(2, req_mock.call_count)
1331
1332 class KeepClientAPIErrorTest(unittest.TestCase):
1333     def test_api_fail(self):
1334         class ApiMock(object):
1335             def __getattr__(self, r):
1336                 if r == "api_token":
1337                     return "abc"
1338                 elif r == "insecure":
1339                     return False
1340                 else:
1341                     raise arvados.errors.KeepReadError()
1342         keep_client = arvados.KeepClient(api_client=ApiMock(),
1343                                              proxy='', local_store='')
1344
1345         # The bug this is testing for is that if an API (not
1346         # keepstore) exception is thrown as part of a get(), the next
1347         # attempt to get that same block will result in a deadlock.
1348         # This is why there are two get()s in a row.  Unfortunately,
1349         # the failure mode for this test is that the test suite
1350         # deadlocks, there isn't a good way to avoid that without
1351         # adding a special case that has no use except for this test.
1352
1353         with self.assertRaises(arvados.errors.KeepReadError):
1354             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1355         with self.assertRaises(arvados.errors.KeepReadError):
1356             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")