Merge branch '12684-pysdk-auto-retry'
[arvados.git] / sdk / python / tests / test_keep_client.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 standard_library.install_aliases()
9 from builtins import str
10 from builtins import range
11 from builtins import object
12 import hashlib
13 import mock
14 import os
15 import errno
16 import pycurl
17 import random
18 import re
19 import shutil
20 import socket
21 import sys
22 import stat
23 import tempfile
24 import time
25 import unittest
26 import urllib.parse
27
28 import parameterized
29
30 import arvados
31 import arvados.retry
32 import arvados.util
33 from . import arvados_testutil as tutil
34 from . import keepstub
35 from . import run_test_server
36
37 from .arvados_testutil import DiskCacheBase
38
39 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
40 class KeepTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
41     disk_cache = False
42     MAIN_SERVER = {}
43     KEEP_SERVER = {}
44     block_cache_test = None
45
46     @classmethod
47     def setUpClass(cls):
48         super(KeepTestCase, cls).setUpClass()
49         run_test_server.authorize_with("admin")
50         cls.api_client = arvados.api('v1')
51         cls.block_cache_test = DiskCacheBase()
52         cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
53                                              proxy='', local_store='',
54                                              block_cache=cls.block_cache_test.make_block_cache(cls.disk_cache))
55
56     @classmethod
57     def tearDownClass(cls):
58         super(KeepTestCase, cls).setUpClass()
59         cls.block_cache_test.tearDown()
60
61     def test_KeepBasicRWTest(self):
62         self.assertEqual(0, self.keep_client.upload_counter.get())
63         foo_locator = self.keep_client.put('foo')
64         self.assertRegex(
65             foo_locator,
66             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
67             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
68
69         # 6 bytes because uploaded 2 copies
70         self.assertEqual(6, self.keep_client.upload_counter.get())
71
72         self.assertEqual(0, self.keep_client.download_counter.get())
73         self.assertTrue(tutil.binary_compare(self.keep_client.get(foo_locator),
74                          b'foo'),
75                          'wrong content from Keep.get(md5("foo"))')
76         self.assertEqual(3, self.keep_client.download_counter.get())
77
78     def test_KeepBinaryRWTest(self):
79         blob_str = b'\xff\xfe\xf7\x00\x01\x02'
80         blob_locator = self.keep_client.put(blob_str)
81         self.assertRegex(
82             blob_locator,
83             '^7fc7c53b45e53926ba52821140fef396\+6',
84             ('wrong locator from Keep.put(<binarydata>):' + blob_locator))
85         self.assertEqual(self.keep_client.get(blob_locator),
86                          blob_str,
87                          'wrong content from Keep.get(md5(<binarydata>))')
88
89     def test_KeepLongBinaryRWTest(self):
90         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
91         for i in range(0, 23):
92             blob_data = blob_data + blob_data
93         blob_locator = self.keep_client.put(blob_data)
94         self.assertRegex(
95             blob_locator,
96             '^84d90fc0d8175dd5dcfab04b999bc956\+67108864',
97             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
98         self.assertEqual(self.keep_client.get(blob_locator),
99                          blob_data,
100                          'wrong content from Keep.get(md5(<binarydata>))')
101
102     @unittest.skip("unreliable test - please fix and close #8752")
103     def test_KeepSingleCopyRWTest(self):
104         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
105         blob_locator = self.keep_client.put(blob_data, copies=1)
106         self.assertRegex(
107             blob_locator,
108             '^c902006bc98a3eb4a3663b65ab4a6fab\+8',
109             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
110         self.assertEqual(self.keep_client.get(blob_locator),
111                          blob_data,
112                          'wrong content from Keep.get(md5(<binarydata>))')
113
114     def test_KeepEmptyCollectionTest(self):
115         blob_locator = self.keep_client.put('', copies=1)
116         self.assertRegex(
117             blob_locator,
118             '^d41d8cd98f00b204e9800998ecf8427e\+0',
119             ('wrong locator from Keep.put(""): ' + blob_locator))
120
121     def test_unicode_must_be_ascii(self):
122         # If unicode type, must only consist of valid ASCII
123         foo_locator = self.keep_client.put(u'foo')
124         self.assertRegex(
125             foo_locator,
126             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
127             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
128
129         if sys.version_info < (3, 0):
130             with self.assertRaises(UnicodeEncodeError):
131                 # Error if it is not ASCII
132                 self.keep_client.put(u'\xe2')
133
134         with self.assertRaises(AttributeError):
135             # Must be bytes or have an encode() method
136             self.keep_client.put({})
137
138     def test_KeepHeadTest(self):
139         locator = self.keep_client.put('test_head')
140         self.assertRegex(
141             locator,
142             '^b9a772c7049325feb7130fff1f8333e9\+9',
143             'wrong md5 hash from Keep.put for "test_head": ' + locator)
144         self.assertEqual(True, self.keep_client.head(locator))
145         self.assertEqual(self.keep_client.get(locator),
146                          b'test_head',
147                          'wrong content from Keep.get for "test_head"')
148
149 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
150 class KeepPermissionTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
151     disk_cache = False
152     MAIN_SERVER = {}
153     KEEP_SERVER = {'blob_signing': True}
154
155     def tearDown(self):
156         DiskCacheBase.tearDown(self)
157
158     def test_KeepBasicRWTest(self):
159         run_test_server.authorize_with('active')
160         keep_client = arvados.KeepClient(block_cache=self.make_block_cache(self.disk_cache))
161         foo_locator = keep_client.put('foo')
162         self.assertRegex(
163             foo_locator,
164             r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
165             'invalid locator from Keep.put("foo"): ' + foo_locator)
166         self.assertEqual(keep_client.get(foo_locator),
167                          b'foo',
168                          'wrong content from Keep.get(md5("foo"))')
169
170         # GET with an unsigned locator => NotFound
171         bar_locator = keep_client.put('bar')
172         unsigned_bar_locator = "37b51d194a7513e45b56f6524f2d51f2+3"
173         self.assertRegex(
174             bar_locator,
175             r'^37b51d194a7513e45b56f6524f2d51f2\+3\+A[a-f0-9]+@[a-f0-9]+$',
176             'invalid locator from Keep.put("bar"): ' + bar_locator)
177         self.assertRaises(arvados.errors.NotFoundError,
178                           keep_client.get,
179                           unsigned_bar_locator)
180
181         # GET from a different user => NotFound
182         run_test_server.authorize_with('spectator')
183         self.assertRaises(arvados.errors.NotFoundError,
184                           arvados.Keep.get,
185                           bar_locator)
186
187         # Unauthenticated GET for a signed locator => NotFound
188         # Unauthenticated GET for an unsigned locator => NotFound
189         keep_client.api_token = ''
190         self.assertRaises(arvados.errors.NotFoundError,
191                           keep_client.get,
192                           bar_locator)
193         self.assertRaises(arvados.errors.NotFoundError,
194                           keep_client.get,
195                           unsigned_bar_locator)
196
197 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
198 class KeepProxyTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
199     disk_cache = False
200     MAIN_SERVER = {}
201     KEEP_SERVER = {}
202     KEEP_PROXY_SERVER = {}
203
204     @classmethod
205     def setUpClass(cls):
206         super(KeepProxyTestCase, cls).setUpClass()
207         run_test_server.authorize_with('active')
208         cls.api_client = arvados.api('v1')
209
210     def tearDown(self):
211         super(KeepProxyTestCase, self).tearDown()
212         DiskCacheBase.tearDown(self)
213
214     def test_KeepProxyTest1(self):
215         # Will use ARVADOS_KEEP_SERVICES environment variable that
216         # is set by setUpClass().
217         keep_client = arvados.KeepClient(api_client=self.api_client,
218                                          local_store='', block_cache=self.make_block_cache(self.disk_cache))
219         baz_locator = keep_client.put('baz')
220         self.assertRegex(
221             baz_locator,
222             '^73feffa4b7f6bb68e44cf984c85f6e88\+3',
223             'wrong md5 hash from Keep.put("baz"): ' + baz_locator)
224         self.assertEqual(keep_client.get(baz_locator),
225                          b'baz',
226                          'wrong content from Keep.get(md5("baz"))')
227         self.assertTrue(keep_client.using_proxy)
228
229     def test_KeepProxyTestMultipleURIs(self):
230         # Test using ARVADOS_KEEP_SERVICES env var overriding any
231         # existing proxy setting and setting multiple proxies
232         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
233         keep_client = arvados.KeepClient(api_client=self.api_client,
234                                          local_store='',
235                                          block_cache=self.make_block_cache(self.disk_cache))
236         uris = [x['_service_root'] for x in keep_client._keep_services]
237         self.assertEqual(uris, ['http://10.0.0.1/',
238                                 'https://foo.example.org:1234/'])
239
240     def test_KeepProxyTestInvalidURI(self):
241         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org'
242         with self.assertRaises(arvados.errors.ArgumentError):
243             keep_client = arvados.KeepClient(api_client=self.api_client,
244                                              local_store='',
245                                              block_cache=self.make_block_cache(self.disk_cache))
246
247 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
248 class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
249     disk_cache = False
250
251     def tearDown(self):
252         DiskCacheBase.tearDown(self)
253
254     def get_service_roots(self, api_client):
255         keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
256         services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
257         return [urllib.parse.urlparse(url) for url in sorted(services)]
258
259     def test_ssl_flag_respected_in_roots(self):
260         for ssl_flag in [False, True]:
261             services = self.get_service_roots(self.mock_keep_services(
262                 service_ssl_flag=ssl_flag))
263             self.assertEqual(
264                 ('https' if ssl_flag else 'http'), services[0].scheme)
265
266     def test_correct_ports_with_ipv6_addresses(self):
267         service = self.get_service_roots(self.mock_keep_services(
268             service_type='proxy', service_host='100::1', service_port=10, count=1))[0]
269         self.assertEqual('100::1', service.hostname)
270         self.assertEqual(10, service.port)
271
272     def test_recognize_proxy_services_in_controller_response(self):
273         keep_client = arvados.KeepClient(api_client=self.mock_keep_services(
274             service_type='proxy', service_host='localhost', service_port=9, count=1),
275                                          block_cache=self.make_block_cache(self.disk_cache))
276         try:
277             # this will fail, but it ensures we get the service
278             # discovery response
279             keep_client.put('baz2', num_retries=0)
280         except:
281             pass
282         self.assertTrue(keep_client.using_proxy)
283
284     def test_insecure_disables_tls_verify(self):
285         api_client = self.mock_keep_services(count=1)
286         force_timeout = socket.timeout("timed out")
287
288         api_client.insecure = True
289         with tutil.mock_keep_responses(b'foo', 200) as mock:
290             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
291             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
292             self.assertEqual(
293                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
294                 0)
295             self.assertEqual(
296                 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
297                 0)
298
299         api_client.insecure = False
300         with tutil.mock_keep_responses(b'foo', 200) as mock:
301             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
302             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
303             # getopt()==None here means we didn't change the
304             # default. If we were using real pycurl instead of a mock,
305             # it would return the default value 1.
306             self.assertEqual(
307                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
308                 None)
309             self.assertEqual(
310                 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
311                 None)
312
313     def test_refresh_signature(self):
314         blk_digest = '6f5902ac237024bdd0c176cb93063dc4+11'
315         blk_sig = 'da39a3ee5e6b4b0d3255bfef95601890afd80709@53bed294'
316         local_loc = blk_digest+'+A'+blk_sig
317         remote_loc = blk_digest+'+R'+blk_sig
318         api_client = self.mock_keep_services(count=1)
319         headers = {'X-Keep-Locator':local_loc}
320         with tutil.mock_keep_responses('', 200, **headers):
321             # Check that the translated locator gets returned
322             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
323             self.assertEqual(local_loc, keep_client.refresh_signature(remote_loc))
324             # Check that refresh_signature() uses the correct method and headers
325             keep_client._get_or_head = mock.MagicMock()
326             keep_client.refresh_signature(remote_loc)
327             args, kwargs = keep_client._get_or_head.call_args_list[0]
328             self.assertIn(remote_loc, args)
329             self.assertEqual("HEAD", kwargs['method'])
330             self.assertIn('X-Keep-Signature', kwargs['headers'])
331
332     # test_*_timeout verify that KeepClient instructs pycurl to use
333     # the appropriate connection and read timeouts. They don't care
334     # whether pycurl actually exhibits the expected timeout behavior
335     # -- those tests are in the KeepClientTimeout test class.
336
337     def test_get_timeout(self):
338         api_client = self.mock_keep_services(count=1)
339         force_timeout = socket.timeout("timed out")
340         with tutil.mock_keep_responses(force_timeout, 0) as mock:
341             keep_client = arvados.KeepClient(
342                 api_client=api_client,
343                 block_cache=self.make_block_cache(self.disk_cache),
344                 num_retries=0,
345             )
346             with self.assertRaises(arvados.errors.KeepReadError):
347                 keep_client.get('ffffffffffffffffffffffffffffffff')
348             self.assertEqual(
349                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
350                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
351             self.assertEqual(
352                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
353                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
354             self.assertEqual(
355                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
356                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
357
358     def test_put_timeout(self):
359         api_client = self.mock_keep_services(count=1)
360         force_timeout = socket.timeout("timed out")
361         with tutil.mock_keep_responses(force_timeout, 0) as mock:
362             keep_client = arvados.KeepClient(
363                 api_client=api_client,
364                 block_cache=self.make_block_cache(self.disk_cache),
365                 num_retries=0,
366             )
367             with self.assertRaises(arvados.errors.KeepWriteError):
368                 keep_client.put(b'foo')
369             self.assertEqual(
370                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
371                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
372             self.assertEqual(
373                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
374                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
375             self.assertEqual(
376                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
377                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
378
379     def test_head_timeout(self):
380         api_client = self.mock_keep_services(count=1)
381         force_timeout = socket.timeout("timed out")
382         with tutil.mock_keep_responses(force_timeout, 0) as mock:
383             keep_client = arvados.KeepClient(
384                 api_client=api_client,
385                 block_cache=self.make_block_cache(self.disk_cache),
386                 num_retries=0,
387             )
388             with self.assertRaises(arvados.errors.KeepReadError):
389                 keep_client.head('ffffffffffffffffffffffffffffffff')
390             self.assertEqual(
391                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
392                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
393             self.assertEqual(
394                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
395                 None)
396             self.assertEqual(
397                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
398                 None)
399
400     def test_proxy_get_timeout(self):
401         api_client = self.mock_keep_services(service_type='proxy', count=1)
402         force_timeout = socket.timeout("timed out")
403         with tutil.mock_keep_responses(force_timeout, 0) as mock:
404             keep_client = arvados.KeepClient(
405                 api_client=api_client,
406                 block_cache=self.make_block_cache(self.disk_cache),
407                 num_retries=0,
408             )
409             with self.assertRaises(arvados.errors.KeepReadError):
410                 keep_client.get('ffffffffffffffffffffffffffffffff')
411             self.assertEqual(
412                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
413                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
414             self.assertEqual(
415                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
416                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
417             self.assertEqual(
418                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
419                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
420
421     def test_proxy_head_timeout(self):
422         api_client = self.mock_keep_services(service_type='proxy', count=1)
423         force_timeout = socket.timeout("timed out")
424         with tutil.mock_keep_responses(force_timeout, 0) as mock:
425             keep_client = arvados.KeepClient(
426                 api_client=api_client,
427                 block_cache=self.make_block_cache(self.disk_cache),
428                 num_retries=0,
429             )
430             with self.assertRaises(arvados.errors.KeepReadError):
431                 keep_client.head('ffffffffffffffffffffffffffffffff')
432             self.assertEqual(
433                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
434                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
435             self.assertEqual(
436                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
437                 None)
438             self.assertEqual(
439                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
440                 None)
441
442     def test_proxy_put_timeout(self):
443         self.disk_cache_dir = None
444         api_client = self.mock_keep_services(service_type='proxy', count=1)
445         force_timeout = socket.timeout("timed out")
446         with tutil.mock_keep_responses(force_timeout, 0) as mock:
447             keep_client = arvados.KeepClient(
448                 api_client=api_client,
449                 num_retries=0,
450             )
451             with self.assertRaises(arvados.errors.KeepWriteError):
452                 keep_client.put('foo')
453             self.assertEqual(
454                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
455                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
456             self.assertEqual(
457                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
458                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
459             self.assertEqual(
460                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
461                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
462
463     def check_no_services_error(self, verb, exc_class):
464         api_client = mock.MagicMock(name='api_client')
465         api_client.keep_services().accessible().execute.side_effect = (
466             arvados.errors.ApiError)
467         keep_client = arvados.KeepClient(
468             api_client=api_client,
469             block_cache=self.make_block_cache(self.disk_cache),
470             num_retries=0,
471         )
472         with self.assertRaises(exc_class) as err_check:
473             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
474         self.assertEqual(0, len(err_check.exception.request_errors()))
475
476     def test_get_error_with_no_services(self):
477         self.check_no_services_error('get', arvados.errors.KeepReadError)
478
479     def test_head_error_with_no_services(self):
480         self.check_no_services_error('head', arvados.errors.KeepReadError)
481
482     def test_put_error_with_no_services(self):
483         self.check_no_services_error('put', arvados.errors.KeepWriteError)
484
485     def check_errors_from_last_retry(self, verb, exc_class):
486         api_client = self.mock_keep_services(count=2)
487         req_mock = tutil.mock_keep_responses(
488             "retry error reporting test", 500, 500, 500, 500, 500, 500, 502, 502)
489         with req_mock, tutil.skip_sleep, \
490                 self.assertRaises(exc_class) as err_check:
491             keep_client = arvados.KeepClient(
492                 api_client=api_client,
493                 block_cache=self.make_block_cache(self.disk_cache),
494                 num_retries=0,
495             )
496             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
497                                        num_retries=3)
498         self.assertEqual([502, 502], [
499                 getattr(error, 'status_code', None)
500                 for error in err_check.exception.request_errors().values()])
501         self.assertRegex(str(err_check.exception), r'failed to (read|write) .* after 4 attempts')
502
503     def test_get_error_reflects_last_retry(self):
504         self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
505
506     def test_head_error_reflects_last_retry(self):
507         self.check_errors_from_last_retry('head', arvados.errors.KeepReadError)
508
509     def test_put_error_reflects_last_retry(self):
510         self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
511
512     def test_put_error_does_not_include_successful_puts(self):
513         data = 'partial failure test'
514         data_loc = tutil.str_keep_locator(data)
515         api_client = self.mock_keep_services(count=3)
516         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
517                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
518             keep_client = arvados.KeepClient(
519                 api_client=api_client,
520                 block_cache=self.make_block_cache(self.disk_cache),
521                 num_retries=0,
522             )
523             keep_client.put(data)
524         self.assertEqual(2, len(exc_check.exception.request_errors()))
525
526     def test_proxy_put_with_no_writable_services(self):
527         data = 'test with no writable services'
528         data_loc = tutil.str_keep_locator(data)
529         api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
530         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
531                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
532             keep_client = arvados.KeepClient(
533                 api_client=api_client,
534                 block_cache=self.make_block_cache(self.disk_cache),
535                 num_retries=0,
536             )
537             keep_client.put(data)
538         self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
539         self.assertEqual(0, len(exc_check.exception.request_errors()))
540
541     def test_oddball_service_get(self):
542         body = b'oddball service get'
543         api_client = self.mock_keep_services(service_type='fancynewblobstore')
544         with tutil.mock_keep_responses(body, 200):
545             keep_client = arvados.KeepClient(
546                 api_client=api_client,
547                 block_cache=self.make_block_cache(self.disk_cache),
548                 num_retries=0,
549             )
550             actual = keep_client.get(tutil.str_keep_locator(body))
551         self.assertEqual(body, actual)
552
553     def test_oddball_service_put(self):
554         body = b'oddball service put'
555         pdh = tutil.str_keep_locator(body)
556         api_client = self.mock_keep_services(service_type='fancynewblobstore')
557         with tutil.mock_keep_responses(pdh, 200):
558             keep_client = arvados.KeepClient(
559                 api_client=api_client,
560                 block_cache=self.make_block_cache(self.disk_cache),
561                 num_retries=0,
562             )
563             actual = keep_client.put(body, copies=1)
564         self.assertEqual(pdh, actual)
565
566     def test_oddball_service_writer_count(self):
567         body = b'oddball service writer count'
568         pdh = tutil.str_keep_locator(body)
569         api_client = self.mock_keep_services(service_type='fancynewblobstore',
570                                              count=4)
571         headers = {'x-keep-replicas-stored': 3}
572         with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
573                                        **headers) as req_mock:
574             keep_client = arvados.KeepClient(
575                 api_client=api_client,
576                 block_cache=self.make_block_cache(self.disk_cache),
577                 num_retries=0,
578             )
579             actual = keep_client.put(body, copies=2)
580         self.assertEqual(pdh, actual)
581         self.assertEqual(1, req_mock.call_count)
582
583 @tutil.skip_sleep
584 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
585 class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
586     disk_cache = False
587
588     def setUp(self):
589         self.api_client = self.mock_keep_services(count=2)
590         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
591         self.data = b'xyzzy'
592         self.locator = '1271ed5ef305aadabc605b1609e24c52'
593
594     def tearDown(self):
595         DiskCacheBase.tearDown(self)
596
597     @mock.patch('arvados.KeepClient.KeepService.get')
598     def test_get_request_cache(self, get_mock):
599         with tutil.mock_keep_responses(self.data, 200, 200):
600             self.keep_client.get(self.locator)
601             self.keep_client.get(self.locator)
602         # Request already cached, don't require more than one request
603         get_mock.assert_called_once()
604
605     @mock.patch('arvados.KeepClient.KeepService.get')
606     def test_head_request_cache(self, get_mock):
607         with tutil.mock_keep_responses(self.data, 200, 200):
608             self.keep_client.head(self.locator)
609             self.keep_client.head(self.locator)
610         # Don't cache HEAD requests so that they're not confused with GET reqs
611         self.assertEqual(2, get_mock.call_count)
612
613     @mock.patch('arvados.KeepClient.KeepService.get')
614     def test_head_and_then_get_return_different_responses(self, get_mock):
615         head_resp = None
616         get_resp = None
617         get_mock.side_effect = [b'first response', b'second response']
618         with tutil.mock_keep_responses(self.data, 200, 200):
619             head_resp = self.keep_client.head(self.locator)
620             get_resp = self.keep_client.get(self.locator)
621         self.assertEqual(b'first response', head_resp)
622         # First reponse was not cached because it was from a HEAD request.
623         self.assertNotEqual(head_resp, get_resp)
624
625
626
627
628 @tutil.skip_sleep
629 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
630 class KeepStorageClassesTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
631     disk_cache = False
632
633     def setUp(self):
634         self.api_client = self.mock_keep_services(count=2)
635         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
636         self.data = b'xyzzy'
637         self.locator = '1271ed5ef305aadabc605b1609e24c52'
638
639     def tearDown(self):
640         DiskCacheBase.tearDown(self)
641
642     def test_multiple_default_storage_classes_req_header(self):
643         api_mock = self.api_client_mock()
644         api_mock.config.return_value = {
645             'StorageClasses': {
646                 'foo': { 'Default': True },
647                 'bar': { 'Default': True },
648                 'baz': { 'Default': False }
649             }
650         }
651         api_client = self.mock_keep_services(api_mock=api_mock, count=2)
652         keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
653         resp_hdr = {
654             'x-keep-storage-classes-confirmed': 'foo=1, bar=1',
655             'x-keep-replicas-stored': 1
656         }
657         with tutil.mock_keep_responses(self.locator, 200, **resp_hdr) as mock:
658             keep_client.put(self.data, copies=1)
659             req_hdr = mock.responses[0]
660             self.assertIn(
661                 'X-Keep-Storage-Classes: bar, foo', req_hdr.getopt(pycurl.HTTPHEADER))
662
663     def test_storage_classes_req_header(self):
664         self.assertEqual(
665             self.api_client.config()['StorageClasses'],
666             {'default': {'Default': True}})
667         cases = [
668             # requested, expected
669             [['foo'], 'X-Keep-Storage-Classes: foo'],
670             [['bar', 'foo'], 'X-Keep-Storage-Classes: bar, foo'],
671             [[], 'X-Keep-Storage-Classes: default'],
672             [None, 'X-Keep-Storage-Classes: default'],
673         ]
674         for req_classes, expected_header in cases:
675             headers = {'x-keep-replicas-stored': 1}
676             if req_classes is None or len(req_classes) == 0:
677                 confirmed_hdr = 'default=1'
678             elif len(req_classes) > 0:
679                 confirmed_hdr = ', '.join(["{}=1".format(cls) for cls in req_classes])
680             headers.update({'x-keep-storage-classes-confirmed': confirmed_hdr})
681             with tutil.mock_keep_responses(self.locator, 200, **headers) as mock:
682                 self.keep_client.put(self.data, copies=1, classes=req_classes)
683                 req_hdr = mock.responses[0]
684                 self.assertIn(expected_header, req_hdr.getopt(pycurl.HTTPHEADER))
685
686     def test_partial_storage_classes_put(self):
687         headers = {
688             'x-keep-replicas-stored': 1,
689             'x-keep-storage-classes-confirmed': 'foo=1'}
690         with tutil.mock_keep_responses(self.locator, 200, 503, **headers) as mock:
691             with self.assertRaises(arvados.errors.KeepWriteError):
692                 self.keep_client.put(self.data, copies=1, classes=['foo', 'bar'], num_retries=0)
693             # 1st request, both classes pending
694             req1_headers = mock.responses[0].getopt(pycurl.HTTPHEADER)
695             self.assertIn('X-Keep-Storage-Classes: bar, foo', req1_headers)
696             # 2nd try, 'foo' class already satisfied
697             req2_headers = mock.responses[1].getopt(pycurl.HTTPHEADER)
698             self.assertIn('X-Keep-Storage-Classes: bar', req2_headers)
699
700     def test_successful_storage_classes_put_requests(self):
701         cases = [
702             # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, expected_requests
703             [ 1, ['foo'], 1, 'foo=1', 1],
704             [ 1, ['foo'], 2, 'foo=2', 1],
705             [ 2, ['foo'], 2, 'foo=2', 1],
706             [ 2, ['foo'], 1, 'foo=1', 2],
707             [ 1, ['foo', 'bar'], 1, 'foo=1, bar=1', 1],
708             [ 1, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
709             [ 2, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
710             [ 2, ['foo', 'bar'], 1, 'foo=1, bar=1', 2],
711             [ 1, ['foo', 'bar'], 1, None, 1],
712             [ 1, ['foo'], 1, None, 1],
713             [ 2, ['foo'], 2, None, 1],
714             [ 2, ['foo'], 1, None, 2],
715         ]
716         for w_copies, w_classes, c_copies, c_classes, e_reqs in cases:
717             headers = {'x-keep-replicas-stored': c_copies}
718             if c_classes is not None:
719                 headers.update({'x-keep-storage-classes-confirmed': c_classes})
720             with tutil.mock_keep_responses(self.locator, 200, 200, **headers) as mock:
721                 case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}", expected_requests={}'.format(w_copies, ', '.join(w_classes), c_copies, c_classes, e_reqs)
722                 self.assertEqual(self.locator,
723                     self.keep_client.put(self.data, copies=w_copies, classes=w_classes),
724                     case_desc)
725                 self.assertEqual(e_reqs, mock.call_count, case_desc)
726
727     def test_failed_storage_classes_put_requests(self):
728         cases = [
729             # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, return_code
730             [ 1, ['foo'], 1, 'bar=1', 200],
731             [ 1, ['foo'], 1, None, 503],
732             [ 2, ['foo'], 1, 'bar=1, foo=0', 200],
733             [ 3, ['foo'], 1, 'bar=1, foo=1', 200],
734             [ 3, ['foo', 'bar'], 1, 'bar=2, foo=1', 200],
735         ]
736         for w_copies, w_classes, c_copies, c_classes, return_code in cases:
737             headers = {'x-keep-replicas-stored': c_copies}
738             if c_classes is not None:
739                 headers.update({'x-keep-storage-classes-confirmed': c_classes})
740             with tutil.mock_keep_responses(self.locator, return_code, return_code, **headers):
741                 case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}"'.format(w_copies, ', '.join(w_classes), c_copies, c_classes)
742                 with self.assertRaises(arvados.errors.KeepWriteError, msg=case_desc):
743                     self.keep_client.put(self.data, copies=w_copies, classes=w_classes, num_retries=0)
744
745 @tutil.skip_sleep
746 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
747 class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
748     disk_cache = False
749
750     def setUp(self):
751         self.api_client = self.mock_keep_services(count=2)
752         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
753         self.data = b'xyzzy'
754         self.locator = '1271ed5ef305aadabc605b1609e24c52'
755         self.test_id = arvados.util.new_request_id()
756         self.assertRegex(self.test_id, r'^req-[a-z0-9]{20}$')
757         # If we don't set request_id to None explicitly here, it will
758         # return <MagicMock name='api_client_mock.request_id'
759         # id='123456789'>:
760         self.api_client.request_id = None
761
762     def tearDown(self):
763         DiskCacheBase.tearDown(self)
764
765     def test_default_to_api_client_request_id(self):
766         self.api_client.request_id = self.test_id
767         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
768             self.keep_client.put(self.data)
769         self.assertEqual(2, len(mock.responses))
770         for resp in mock.responses:
771             self.assertProvidedRequestId(resp)
772
773         with tutil.mock_keep_responses(self.data, 200) as mock:
774             self.keep_client.get(self.locator)
775         self.assertProvidedRequestId(mock.responses[0])
776
777         with tutil.mock_keep_responses(b'', 200) as mock:
778             self.keep_client.head(self.locator)
779         self.assertProvidedRequestId(mock.responses[0])
780
781     def test_explicit_request_id(self):
782         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
783             self.keep_client.put(self.data, request_id=self.test_id)
784         self.assertEqual(2, len(mock.responses))
785         for resp in mock.responses:
786             self.assertProvidedRequestId(resp)
787
788         with tutil.mock_keep_responses(self.data, 200) as mock:
789             self.keep_client.get(self.locator, request_id=self.test_id)
790         self.assertProvidedRequestId(mock.responses[0])
791
792         with tutil.mock_keep_responses(b'', 200) as mock:
793             self.keep_client.head(self.locator, request_id=self.test_id)
794         self.assertProvidedRequestId(mock.responses[0])
795
796     def test_automatic_request_id(self):
797         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
798             self.keep_client.put(self.data)
799         self.assertEqual(2, len(mock.responses))
800         for resp in mock.responses:
801             self.assertAutomaticRequestId(resp)
802
803         with tutil.mock_keep_responses(self.data, 200) as mock:
804             self.keep_client.get(self.locator)
805         self.assertAutomaticRequestId(mock.responses[0])
806
807         with tutil.mock_keep_responses(b'', 200) as mock:
808             self.keep_client.head(self.locator)
809         self.assertAutomaticRequestId(mock.responses[0])
810
811     def test_request_id_in_exception(self):
812         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
813             with self.assertRaisesRegex(arvados.errors.KeepReadError, self.test_id):
814                 self.keep_client.head(self.locator, request_id=self.test_id)
815
816         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
817             with self.assertRaisesRegex(arvados.errors.KeepReadError, r'req-[a-z0-9]{20}'):
818                 self.keep_client.get(self.locator)
819
820         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
821             with self.assertRaisesRegex(arvados.errors.KeepWriteError, self.test_id):
822                 self.keep_client.put(self.data, request_id=self.test_id)
823
824         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
825             with self.assertRaisesRegex(arvados.errors.KeepWriteError, r'req-[a-z0-9]{20}'):
826                 self.keep_client.put(self.data)
827
828     def assertAutomaticRequestId(self, resp):
829         hdr = [x for x in resp.getopt(pycurl.HTTPHEADER)
830                if x.startswith('X-Request-Id: ')][0]
831         self.assertNotEqual(hdr, 'X-Request-Id: '+self.test_id)
832         self.assertRegex(hdr, r'^X-Request-Id: req-[a-z0-9]{20}$')
833
834     def assertProvidedRequestId(self, resp):
835         self.assertIn('X-Request-Id: '+self.test_id,
836                       resp.getopt(pycurl.HTTPHEADER))
837
838
839 @tutil.skip_sleep
840 #@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
841 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
842     disk_cache = False
843
844     def setUp(self):
845         # expected_order[i] is the probe order for
846         # hash=md5(sprintf("%064x",i)) where there are 16 services
847         # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
848         # the first probe for the block consisting of 64 "0"
849         # characters is the service whose uuid is
850         # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
851         self.services = 16
852         self.expected_order = [
853             list('3eab2d5fc9681074'),
854             list('097dba52e648f1c3'),
855             list('c5b4e023f8a7d691'),
856             list('9d81c02e76a3bf54'),
857             ]
858         self.blocks = [
859             "{:064x}".format(x).encode()
860             for x in range(len(self.expected_order))]
861         self.hashes = [
862             hashlib.md5(self.blocks[x]).hexdigest()
863             for x in range(len(self.expected_order))]
864         self.api_client = self.mock_keep_services(count=self.services)
865         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
866
867     def tearDown(self):
868         DiskCacheBase.tearDown(self)
869
870     def test_weighted_service_roots_against_reference_set(self):
871         # Confirm weighted_service_roots() returns the correct order
872         for i, hash in enumerate(self.hashes):
873             roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash))
874             got_order = [
875                 re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
876                 for root in roots]
877             self.assertEqual(self.expected_order[i], got_order)
878
879     def test_get_probe_order_against_reference_set(self):
880         self._test_probe_order_against_reference_set(
881             lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
882
883     def test_head_probe_order_against_reference_set(self):
884         self._test_probe_order_against_reference_set(
885             lambda i: self.keep_client.head(self.hashes[i], num_retries=1))
886
887     def test_put_probe_order_against_reference_set(self):
888         # copies=1 prevents the test from being sensitive to races
889         # between writer threads.
890         self._test_probe_order_against_reference_set(
891             lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1))
892
893     def _test_probe_order_against_reference_set(self, op):
894         for i in range(len(self.blocks)):
895             with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \
896                  self.assertRaises(arvados.errors.KeepRequestError):
897                 op(i)
898             got_order = [
899                 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
900                 for resp in mock.responses]
901             self.assertEqual(self.expected_order[i]*2, got_order)
902
903     def test_put_probe_order_multiple_copies(self):
904         for copies in range(2, 4):
905             for i in range(len(self.blocks)):
906                 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \
907                      self.assertRaises(arvados.errors.KeepWriteError):
908                     self.keep_client.put(self.blocks[i], num_retries=2, copies=copies)
909                 got_order = [
910                     re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
911                     for resp in mock.responses]
912                 # With T threads racing to make requests, the position
913                 # of a given server in the sequence of HTTP requests
914                 # (got_order) cannot be more than T-1 positions
915                 # earlier than that server's position in the reference
916                 # probe sequence (expected_order).
917                 #
918                 # Loop invariant: we have accounted for +pos+ expected
919                 # probes, either by seeing them in +got_order+ or by
920                 # putting them in +pending+ in the hope of seeing them
921                 # later. As long as +len(pending)<T+, we haven't
922                 # started a request too early.
923                 pending = []
924                 for pos, expected in enumerate(self.expected_order[i]*3):
925                     got = got_order[pos-len(pending)]
926                     while got in pending:
927                         del pending[pending.index(got)]
928                         got = got_order[pos-len(pending)]
929                     if got != expected:
930                         pending.append(expected)
931                         self.assertLess(
932                             len(pending), copies,
933                             "pending={}, with copies={}, got {}, expected {}".format(
934                                 pending, copies, repr(got_order), repr(self.expected_order[i]*3)))
935
936     def test_probe_waste_adding_one_server(self):
937         hashes = [
938             hashlib.md5("{:064x}".format(x).encode()).hexdigest() for x in range(100)]
939         initial_services = 12
940         self.api_client = self.mock_keep_services(count=initial_services)
941         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
942         probes_before = [
943             self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
944         for added_services in range(1, 12):
945             api_client = self.mock_keep_services(count=initial_services+added_services)
946             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
947             total_penalty = 0
948             for hash_index in range(len(hashes)):
949                 probe_after = keep_client.weighted_service_roots(
950                     arvados.KeepLocator(hashes[hash_index]))
951                 penalty = probe_after.index(probes_before[hash_index][0])
952                 self.assertLessEqual(penalty, added_services)
953                 total_penalty += penalty
954             # Average penalty per block should not exceed
955             # N(added)/N(orig) by more than 20%, and should get closer
956             # to the ideal as we add data points.
957             expect_penalty = (
958                 added_services *
959                 len(hashes) / initial_services)
960             max_penalty = (
961                 expect_penalty *
962                 (120 - added_services)/100)
963             min_penalty = (
964                 expect_penalty * 8/10)
965             self.assertTrue(
966                 min_penalty <= total_penalty <= max_penalty,
967                 "With {}+{} services, {} blocks, penalty {} but expected {}..{}".format(
968                     initial_services,
969                     added_services,
970                     len(hashes),
971                     total_penalty,
972                     min_penalty,
973                     max_penalty))
974
975     def check_64_zeros_error_order(self, verb, exc_class):
976         data = b'0' * 64
977         if verb == 'get':
978             data = tutil.str_keep_locator(data)
979         # Arbitrary port number:
980         aport = random.randint(1024,65535)
981         api_client = self.mock_keep_services(service_port=aport, count=self.services)
982         keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
983         with mock.patch('pycurl.Curl') as curl_mock, \
984              self.assertRaises(exc_class) as err_check:
985             curl_mock.return_value = tutil.FakeCurl.make(code=500, body=b'')
986             getattr(keep_client, verb)(data)
987         urls = [urllib.parse.urlparse(url)
988                 for url in err_check.exception.request_errors()]
989         self.assertEqual([('keep0x' + c, aport) for c in '3eab2d5fc9681074'],
990                          [(url.hostname, url.port) for url in urls])
991
992     def test_get_error_shows_probe_order(self):
993         self.check_64_zeros_error_order('get', arvados.errors.KeepReadError)
994
995     def test_put_error_shows_probe_order(self):
996         self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
997
998 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
999 class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase, DiskCacheBase):
1000     disk_cache = False
1001
1002     # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
1003     # 1s worth of data and then trigger bandwidth errors before running
1004     # out of data.
1005     DATA = b'x'*2**11
1006     BANDWIDTH_LOW_LIM = 1024
1007     TIMEOUT_TIME = 1.0
1008
1009     def tearDown(self):
1010         DiskCacheBase.tearDown(self)
1011
1012     class assertTakesBetween(unittest.TestCase):
1013         def __init__(self, tmin, tmax):
1014             self.tmin = tmin
1015             self.tmax = tmax
1016
1017         def __enter__(self):
1018             self.t0 = time.time()
1019
1020         def __exit__(self, *args, **kwargs):
1021             # Round times to milliseconds, like CURL. Otherwise, we
1022             # fail when CURL reaches a 1s timeout at 0.9998s.
1023             delta = round(time.time() - self.t0, 3)
1024             self.assertGreaterEqual(delta, self.tmin)
1025             self.assertLessEqual(delta, self.tmax)
1026
1027     class assertTakesGreater(unittest.TestCase):
1028         def __init__(self, tmin):
1029             self.tmin = tmin
1030
1031         def __enter__(self):
1032             self.t0 = time.time()
1033
1034         def __exit__(self, *args, **kwargs):
1035             delta = round(time.time() - self.t0, 3)
1036             self.assertGreaterEqual(delta, self.tmin)
1037
1038     def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
1039         return arvados.KeepClient(
1040             api_client=self.api_client,
1041             timeout=timeouts, block_cache=self.make_block_cache(self.disk_cache))
1042
1043     def test_timeout_slow_connect(self):
1044         # Can't simulate TCP delays with our own socket. Leave our
1045         # stub server running uselessly, and try to connect to an
1046         # unroutable IP address instead.
1047         self.api_client = self.mock_keep_services(
1048             count=1,
1049             service_host='240.0.0.0',
1050         )
1051         with self.assertTakesBetween(0.1, 0.5):
1052             with self.assertRaises(arvados.errors.KeepWriteError):
1053                 self.keepClient().put(self.DATA, copies=1, num_retries=0)
1054
1055     def test_low_bandwidth_no_delays_success(self):
1056         self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
1057         kc = self.keepClient()
1058         loc = kc.put(self.DATA, copies=1, num_retries=0)
1059         self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
1060
1061     def test_too_low_bandwidth_no_delays_failure(self):
1062         # Check that lessening bandwidth corresponds to failing
1063         kc = self.keepClient()
1064         loc = kc.put(self.DATA, copies=1, num_retries=0)
1065         self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
1066         with self.assertTakesGreater(self.TIMEOUT_TIME):
1067             with self.assertRaises(arvados.errors.KeepReadError):
1068                 kc.get(loc, num_retries=0)
1069         with self.assertTakesGreater(self.TIMEOUT_TIME):
1070             with self.assertRaises(arvados.errors.KeepWriteError):
1071                 kc.put(self.DATA, copies=1, num_retries=0)
1072
1073     def test_low_bandwidth_with_server_response_delay_failure(self):
1074         kc = self.keepClient()
1075         loc = kc.put(self.DATA, copies=1, num_retries=0)
1076         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
1077         # Note the actual delay must be 1s longer than the low speed
1078         # limit interval in order for curl to detect it reliably.
1079         self.server.setdelays(response=self.TIMEOUT_TIME+1)
1080         with self.assertTakesGreater(self.TIMEOUT_TIME):
1081             with self.assertRaises(arvados.errors.KeepReadError):
1082                 kc.get(loc, num_retries=0)
1083         with self.assertTakesGreater(self.TIMEOUT_TIME):
1084             with self.assertRaises(arvados.errors.KeepWriteError):
1085                 kc.put(self.DATA, copies=1, num_retries=0)
1086         with self.assertTakesGreater(self.TIMEOUT_TIME):
1087             kc.head(loc, num_retries=0)
1088
1089     def test_low_bandwidth_with_server_mid_delay_failure(self):
1090         kc = self.keepClient()
1091         loc = kc.put(self.DATA, copies=1, num_retries=0)
1092         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
1093         # Note the actual delay must be 1s longer than the low speed
1094         # limit interval in order for curl to detect it reliably.
1095         self.server.setdelays(mid_write=self.TIMEOUT_TIME+1, mid_read=self.TIMEOUT_TIME+1)
1096         with self.assertTakesGreater(self.TIMEOUT_TIME):
1097             with self.assertRaises(arvados.errors.KeepReadError) as e:
1098                 kc.get(loc, num_retries=0)
1099         with self.assertTakesGreater(self.TIMEOUT_TIME):
1100             with self.assertRaises(arvados.errors.KeepWriteError):
1101                 kc.put(self.DATA, copies=1, num_retries=0)
1102
1103     def test_timeout_slow_request(self):
1104         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
1105         self.server.setdelays(request=.2)
1106         self._test_connect_timeout_under_200ms(loc)
1107         self.server.setdelays(request=2)
1108         self._test_response_timeout_under_2s(loc)
1109
1110     def test_timeout_slow_response(self):
1111         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
1112         self.server.setdelays(response=.2)
1113         self._test_connect_timeout_under_200ms(loc)
1114         self.server.setdelays(response=2)
1115         self._test_response_timeout_under_2s(loc)
1116
1117     def test_timeout_slow_response_body(self):
1118         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
1119         self.server.setdelays(response_body=.2)
1120         self._test_connect_timeout_under_200ms(loc)
1121         self.server.setdelays(response_body=2)
1122         self._test_response_timeout_under_2s(loc)
1123
1124     def _test_connect_timeout_under_200ms(self, loc):
1125         # Allow 100ms to connect, then 1s for response. Everything
1126         # should work, and everything should take at least 200ms to
1127         # return.
1128         kc = self.keepClient(timeouts=(.1, 1))
1129         with self.assertTakesBetween(.2, .3):
1130             kc.put(self.DATA, copies=1, num_retries=0)
1131         with self.assertTakesBetween(.2, .3):
1132             self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
1133
1134     def _test_response_timeout_under_2s(self, loc):
1135         # Allow 10s to connect, then 1s for response. Nothing should
1136         # work, and everything should take at least 1s to return.
1137         kc = self.keepClient(timeouts=(10, 1))
1138         with self.assertTakesBetween(1, 9):
1139             with self.assertRaises(arvados.errors.KeepReadError):
1140                 kc.get(loc, num_retries=0)
1141         with self.assertTakesBetween(1, 9):
1142             with self.assertRaises(arvados.errors.KeepWriteError):
1143                 kc.put(self.DATA, copies=1, num_retries=0)
1144
1145 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1146 class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
1147     disk_cache = False
1148
1149     def tearDown(self):
1150         DiskCacheBase.tearDown(self)
1151
1152     def mock_disks_and_gateways(self, disks=3, gateways=1):
1153         self.gateways = [{
1154                 'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
1155                 'owner_uuid': 'zzzzz-tpzed-000000000000000',
1156                 'service_host': 'gatewayhost{}'.format(i),
1157                 'service_port': 12345,
1158                 'service_ssl_flag': True,
1159                 'service_type': 'gateway:test',
1160         } for i in range(gateways)]
1161         self.gateway_roots = [
1162             "https://{service_host}:{service_port}/".format(**gw)
1163             for gw in self.gateways]
1164         self.api_client = self.mock_keep_services(
1165             count=disks, additional_services=self.gateways)
1166         self.keepClient = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
1167
1168     @mock.patch('pycurl.Curl')
1169     def test_get_with_gateway_hint_first(self, MockCurl):
1170         MockCurl.return_value = tutil.FakeCurl.make(
1171             code=200, body='foo', headers={'Content-Length': 3})
1172         self.mock_disks_and_gateways()
1173         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@' + self.gateways[0]['uuid']
1174         self.assertEqual(b'foo', self.keepClient.get(locator))
1175         self.assertEqual(self.gateway_roots[0]+locator,
1176                          MockCurl.return_value.getopt(pycurl.URL).decode())
1177         self.assertEqual(True, self.keepClient.head(locator))
1178
1179     @mock.patch('pycurl.Curl')
1180     def test_get_with_gateway_hints_in_order(self, MockCurl):
1181         gateways = 4
1182         disks = 3
1183         mocks = [
1184             tutil.FakeCurl.make(code=404, body='')
1185             for _ in range(gateways+disks)
1186         ]
1187         MockCurl.side_effect = tutil.queue_with(mocks)
1188         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1189         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1190                            ['K@'+gw['uuid'] for gw in self.gateways])
1191         with self.assertRaises(arvados.errors.NotFoundError):
1192             self.keepClient.get(locator)
1193         # Gateways are tried first, in the order given.
1194         for i, root in enumerate(self.gateway_roots):
1195             self.assertEqual(root+locator,
1196                              mocks[i].getopt(pycurl.URL).decode())
1197         # Disk services are tried next.
1198         for i in range(gateways, gateways+disks):
1199             self.assertRegex(
1200                 mocks[i].getopt(pycurl.URL).decode(),
1201                 r'keep0x')
1202
1203     @mock.patch('pycurl.Curl')
1204     def test_head_with_gateway_hints_in_order(self, MockCurl):
1205         gateways = 4
1206         disks = 3
1207         mocks = [
1208             tutil.FakeCurl.make(code=404, body=b'')
1209             for _ in range(gateways+disks)
1210         ]
1211         MockCurl.side_effect = tutil.queue_with(mocks)
1212         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1213         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1214                            ['K@'+gw['uuid'] for gw in self.gateways])
1215         with self.assertRaises(arvados.errors.NotFoundError):
1216             self.keepClient.head(locator)
1217         # Gateways are tried first, in the order given.
1218         for i, root in enumerate(self.gateway_roots):
1219             self.assertEqual(root+locator,
1220                              mocks[i].getopt(pycurl.URL).decode())
1221         # Disk services are tried next.
1222         for i in range(gateways, gateways+disks):
1223             self.assertRegex(
1224                 mocks[i].getopt(pycurl.URL).decode(),
1225                 r'keep0x')
1226
1227     @mock.patch('pycurl.Curl')
1228     def test_get_with_remote_proxy_hint(self, MockCurl):
1229         MockCurl.return_value = tutil.FakeCurl.make(
1230             code=200, body=b'foo', headers={'Content-Length': 3})
1231         self.mock_disks_and_gateways()
1232         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1233         self.assertEqual(b'foo', self.keepClient.get(locator))
1234         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1235                          MockCurl.return_value.getopt(pycurl.URL).decode())
1236
1237     @mock.patch('pycurl.Curl')
1238     def test_head_with_remote_proxy_hint(self, MockCurl):
1239         MockCurl.return_value = tutil.FakeCurl.make(
1240             code=200, body=b'foo', headers={'Content-Length': 3})
1241         self.mock_disks_and_gateways()
1242         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1243         self.assertEqual(True, self.keepClient.head(locator))
1244         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1245                          MockCurl.return_value.getopt(pycurl.URL).decode())
1246
1247 class KeepClientRetryTestMixin(object):
1248     disk_cache = False
1249
1250     # Testing with a local Keep store won't exercise the retry behavior.
1251     # Instead, our strategy is:
1252     # * Create a client with one proxy specified (pointed at a black
1253     #   hole), so there's no need to instantiate an API client, and
1254     #   all HTTP requests come from one place.
1255     # * Mock httplib's request method to provide simulated responses.
1256     # This lets us test the retry logic extensively without relying on any
1257     # supporting servers, and prevents side effects in case something hiccups.
1258     # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
1259     # run_method().
1260     #
1261     # Test classes must define TEST_PATCHER to a method that mocks
1262     # out appropriate methods in the client.
1263
1264     PROXY_ADDR = 'http://[%s]:65535/' % (tutil.TEST_HOST,)
1265     TEST_DATA = b'testdata'
1266     TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
1267
1268     def setUp(self):
1269         self.client_kwargs = {'proxy': self.PROXY_ADDR, 'local_store': ''}
1270
1271     def new_client(self, **caller_kwargs):
1272         kwargs = self.client_kwargs.copy()
1273         kwargs.update(caller_kwargs)
1274         kwargs['block_cache'] = self.make_block_cache(self.disk_cache)
1275         return arvados.KeepClient(**kwargs)
1276
1277     def run_method(self, *args, **kwargs):
1278         raise NotImplementedError("test subclasses must define run_method")
1279
1280     def check_success(self, expected=None, *args, **kwargs):
1281         if expected is None:
1282             expected = self.DEFAULT_EXPECT
1283         self.assertEqual(expected, self.run_method(*args, **kwargs))
1284
1285     def check_exception(self, error_class=None, *args, **kwargs):
1286         if error_class is None:
1287             error_class = self.DEFAULT_EXCEPTION
1288         with self.assertRaises(error_class) as err:
1289             self.run_method(*args, **kwargs)
1290         return err
1291
1292     def test_immediate_success(self):
1293         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 200):
1294             self.check_success()
1295
1296     def test_retry_then_success(self):
1297         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1298             self.check_success(num_retries=3)
1299
1300     def test_exception_then_success(self):
1301         with self.TEST_PATCHER(self.DEFAULT_EXPECT, Exception('mock err'), 200):
1302             self.check_success(num_retries=3)
1303
1304     def test_no_retry_after_permanent_error(self):
1305         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 403, 200):
1306             self.check_exception(num_retries=3)
1307
1308     def test_error_after_retries_exhausted(self):
1309         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 500, 200):
1310             err = self.check_exception(num_retries=1)
1311         self.assertRegex(str(err.exception), r'failed to .* after 2 attempts')
1312
1313     def test_num_retries_instance_fallback(self):
1314         self.client_kwargs['num_retries'] = 3
1315         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1316             self.check_success()
1317
1318
1319 @tutil.skip_sleep
1320 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1321 class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1322     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
1323     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1324     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1325     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1326
1327     def tearDown(self):
1328         DiskCacheBase.tearDown(self)
1329
1330     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1331                    *args, **kwargs):
1332         return self.new_client().get(locator, *args, **kwargs)
1333
1334     def test_specific_exception_when_not_found(self):
1335         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1336             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1337
1338     def test_general_exception_with_mixed_errors(self):
1339         # get should raise a NotFoundError if no server returns the block,
1340         # and a high threshold of servers report that it's not found.
1341         # This test rigs up 50/50 disagreement between two servers, and
1342         # checks that it does not become a NotFoundError.
1343         client = self.new_client(num_retries=0)
1344         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1345             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1346                 client.get(self.HINTED_LOCATOR)
1347             self.assertNotIsInstance(
1348                 exc_check.exception, arvados.errors.NotFoundError,
1349                 "mixed errors raised NotFoundError")
1350
1351     def test_hint_server_can_succeed_without_retries(self):
1352         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1353             self.check_success(locator=self.HINTED_LOCATOR)
1354
1355     def test_try_next_server_after_timeout(self):
1356         with tutil.mock_keep_responses(
1357                 (socket.timeout("timed out"), 200),
1358                 (self.DEFAULT_EXPECT, 200)):
1359             self.check_success(locator=self.HINTED_LOCATOR)
1360
1361     def test_retry_data_with_wrong_checksum(self):
1362         with tutil.mock_keep_responses(
1363                 ('baddata', 200),
1364                 (self.DEFAULT_EXPECT, 200)):
1365             self.check_success(locator=self.HINTED_LOCATOR)
1366
1367 @tutil.skip_sleep
1368 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1369 class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1370     DEFAULT_EXPECT = True
1371     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1372     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1373     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1374
1375     def tearDown(self):
1376         DiskCacheBase.tearDown(self)
1377
1378     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1379                    *args, **kwargs):
1380         return self.new_client().head(locator, *args, **kwargs)
1381
1382     def test_specific_exception_when_not_found(self):
1383         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1384             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1385
1386     def test_general_exception_with_mixed_errors(self):
1387         # head should raise a NotFoundError if no server returns the block,
1388         # and a high threshold of servers report that it's not found.
1389         # This test rigs up 50/50 disagreement between two servers, and
1390         # checks that it does not become a NotFoundError.
1391         client = self.new_client(num_retries=0)
1392         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1393             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1394                 client.head(self.HINTED_LOCATOR)
1395             self.assertNotIsInstance(
1396                 exc_check.exception, arvados.errors.NotFoundError,
1397                 "mixed errors raised NotFoundError")
1398
1399     def test_hint_server_can_succeed_without_retries(self):
1400         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1401             self.check_success(locator=self.HINTED_LOCATOR)
1402
1403     def test_try_next_server_after_timeout(self):
1404         with tutil.mock_keep_responses(
1405                 (socket.timeout("timed out"), 200),
1406                 (self.DEFAULT_EXPECT, 200)):
1407             self.check_success(locator=self.HINTED_LOCATOR)
1408
1409 @tutil.skip_sleep
1410 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1411 class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1412     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
1413     DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
1414     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1415
1416     def tearDown(self):
1417         DiskCacheBase.tearDown(self)
1418
1419     def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
1420                    copies=1, *args, **kwargs):
1421         return self.new_client().put(data, copies, *args, **kwargs)
1422
1423     def test_do_not_send_multiple_copies_to_same_server(self):
1424         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200):
1425             self.check_exception(copies=2, num_retries=3)
1426
1427
1428 class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
1429
1430     class FakeKeepService(object):
1431         def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
1432             self.delay = delay
1433             self.will_succeed = will_succeed
1434             self.will_raise = will_raise
1435             self._result = {}
1436             self._result['headers'] = {}
1437             self._result['headers']['x-keep-replicas-stored'] = str(replicas)
1438             self._result['headers']['x-keep-storage-classes-confirmed'] = 'default={}'.format(replicas)
1439             self._result['body'] = 'foobar'
1440
1441         def put(self, data_hash, data, timeout, headers):
1442             time.sleep(self.delay)
1443             if self.will_raise is not None:
1444                 raise self.will_raise
1445             return self.will_succeed
1446
1447         def last_result(self):
1448             if self.will_succeed:
1449                 return self._result
1450             else:
1451                 return {"status_code": 500, "body": "didn't succeed"}
1452
1453         def finished(self):
1454             return False
1455
1456     def setUp(self):
1457         self.copies = 3
1458         self.pool = arvados.KeepClient.KeepWriterThreadPool(
1459             data = 'foo',
1460             data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1461             max_service_replicas = self.copies,
1462             copies = self.copies
1463         )
1464
1465     def test_only_write_enough_on_success(self):
1466         for i in range(10):
1467             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1468             self.pool.add_task(ks, None)
1469         self.pool.join()
1470         self.assertEqual(self.pool.done(), (self.copies, []))
1471
1472     def test_only_write_enough_on_partial_success(self):
1473         for i in range(5):
1474             ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
1475             self.pool.add_task(ks, None)
1476             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1477             self.pool.add_task(ks, None)
1478         self.pool.join()
1479         self.assertEqual(self.pool.done(), (self.copies, []))
1480
1481     def test_only_write_enough_when_some_crash(self):
1482         for i in range(5):
1483             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1484             self.pool.add_task(ks, None)
1485             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1486             self.pool.add_task(ks, None)
1487         self.pool.join()
1488         self.assertEqual(self.pool.done(), (self.copies, []))
1489
1490     def test_fail_when_too_many_crash(self):
1491         for i in range(self.copies+1):
1492             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1493             self.pool.add_task(ks, None)
1494         for i in range(self.copies-1):
1495             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1496             self.pool.add_task(ks, None)
1497         self.pool.join()
1498         self.assertEqual(self.pool.done(), (self.copies-1, []))
1499
1500
1501 @tutil.skip_sleep
1502 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1503 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
1504     block_cache = False
1505
1506     # Test put()s that need two distinct servers to succeed, possibly
1507     # requiring multiple passes through the retry loop.
1508
1509     def setUp(self):
1510         self.api_client = self.mock_keep_services(count=2)
1511         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
1512
1513     def tearDown(self):
1514         DiskCacheBase.tearDown(self)
1515
1516     def test_success_after_exception(self):
1517         with tutil.mock_keep_responses(
1518                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1519                 Exception('mock err'), 200, 200) as req_mock:
1520             self.keep_client.put('foo', num_retries=1, copies=2)
1521         self.assertEqual(3, req_mock.call_count)
1522
1523     def test_success_after_retryable_error(self):
1524         with tutil.mock_keep_responses(
1525                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1526                 500, 200, 200) as req_mock:
1527             self.keep_client.put('foo', num_retries=1, copies=2)
1528         self.assertEqual(3, req_mock.call_count)
1529
1530     def test_fail_after_final_error(self):
1531         # First retry loop gets a 200 (can't achieve replication by
1532         # storing again on that server) and a 400 (can't retry that
1533         # server at all), so we shouldn't try a third request.
1534         with tutil.mock_keep_responses(
1535                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1536                 200, 400, 200) as req_mock:
1537             with self.assertRaises(arvados.errors.KeepWriteError):
1538                 self.keep_client.put('foo', num_retries=1, copies=2)
1539         self.assertEqual(2, req_mock.call_count)
1540
1541 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1542 class KeepClientAPIErrorTest(unittest.TestCase, DiskCacheBase):
1543     disk_cache = False
1544
1545     def tearDown(self):
1546         DiskCacheBase.tearDown(self)
1547
1548     def test_api_fail(self):
1549         class ApiMock(object):
1550             def __getattr__(self, r):
1551                 if r == "api_token":
1552                     return "abc"
1553                 elif r == "insecure":
1554                     return False
1555                 elif r == "config":
1556                     return lambda: {}
1557                 else:
1558                     raise arvados.errors.KeepReadError()
1559         keep_client = arvados.KeepClient(api_client=ApiMock(),
1560                                          proxy='', local_store='',
1561                                          block_cache=self.make_block_cache(self.disk_cache))
1562
1563         # The bug this is testing for is that if an API (not
1564         # keepstore) exception is thrown as part of a get(), the next
1565         # attempt to get that same block will result in a deadlock.
1566         # This is why there are two get()s in a row.  Unfortunately,
1567         # the failure mode for this test is that the test suite
1568         # deadlocks, there isn't a good way to avoid that without
1569         # adding a special case that has no use except for this test.
1570
1571         with self.assertRaises(arvados.errors.KeepReadError):
1572             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1573         with self.assertRaises(arvados.errors.KeepReadError):
1574             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1575
1576
1577 class KeepDiskCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
1578     def setUp(self):
1579         self.api_client = self.mock_keep_services(count=2)
1580         self.data = b'xyzzy'
1581         self.locator = '1271ed5ef305aadabc605b1609e24c52'
1582         self.disk_cache_dir = tempfile.mkdtemp()
1583
1584     def tearDown(self):
1585         shutil.rmtree(self.disk_cache_dir)
1586
1587
1588     @mock.patch('arvados.KeepClient.KeepService.get')
1589     def test_disk_cache_read(self, get_mock):
1590         # confirm it finds an existing cache block when the cache is
1591         # initialized.
1592
1593         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1594         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1595             f.write(self.data)
1596
1597         # block cache should have found the existing block
1598         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1599                                                   disk_cache_dir=self.disk_cache_dir)
1600         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1601
1602         self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1603
1604         get_mock.assert_not_called()
1605
1606
1607     @mock.patch('arvados.KeepClient.KeepService.get')
1608     def test_disk_cache_share(self, get_mock):
1609         # confirm it finds a cache block written after the disk cache
1610         # was initialized.
1611
1612         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1613                                                   disk_cache_dir=self.disk_cache_dir)
1614         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1615
1616         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1617         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1618             f.write(self.data)
1619
1620         # when we try to get the block, it'll check the disk and find it.
1621         self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1622
1623         get_mock.assert_not_called()
1624
1625
1626     def test_disk_cache_write(self):
1627         # confirm the cache block was created
1628
1629         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1630                                                   disk_cache_dir=self.disk_cache_dir)
1631         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1632
1633         with tutil.mock_keep_responses(self.data, 200) as mock:
1634             self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1635
1636         self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1637
1638         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1639             self.assertTrue(tutil.binary_compare(f.read(), self.data))
1640
1641
1642     def test_disk_cache_clean(self):
1643         # confirm that a tmp file in the cache is cleaned up
1644
1645         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1646         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock"), "wb") as f:
1647             f.write(b"abc1")
1648
1649         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC"), "wb") as f:
1650             f.write(b"abc2")
1651
1652         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC"), "wb") as f:
1653             f.write(b"abc3")
1654
1655         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1656         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1657         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1658
1659         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1660                                                   disk_cache_dir=self.disk_cache_dir)
1661
1662         # The tmp still hasn't been deleted because it was created in the last 60 seconds
1663         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1664         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1665         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1666
1667         # Set the mtime to 61s in the past
1668         os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock"), times=(time.time()-61, time.time()-61))
1669         os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC"), times=(time.time()-61, time.time()-61))
1670         os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC"), times=(time.time()-61, time.time()-61))
1671
1672         block_cache2 = arvados.keep.KeepBlockCache(disk_cache=True,
1673                                                    disk_cache_dir=self.disk_cache_dir)
1674
1675         # Tmp should be gone but the other ones are safe.
1676         self.assertFalse(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1677         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1678         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1679
1680
1681     @mock.patch('arvados.KeepClient.KeepService.get')
1682     def test_disk_cache_cap(self, get_mock):
1683         # confirm that the cache is kept to the desired limit
1684
1685         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1686         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1687             f.write(self.data)
1688
1689         os.makedirs(os.path.join(self.disk_cache_dir, "acb"))
1690         with open(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock"), "wb") as f:
1691             f.write(b"foo")
1692
1693         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1694         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1695
1696         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1697                                                    disk_cache_dir=self.disk_cache_dir,
1698                                                    max_slots=1)
1699
1700         self.assertFalse(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1701         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1702
1703
1704     @mock.patch('arvados.KeepClient.KeepService.get')
1705     def test_disk_cache_share(self, get_mock):
1706         # confirm that a second cache doesn't delete files that belong to the first cache.
1707
1708         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1709         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1710             f.write(self.data)
1711
1712         os.makedirs(os.path.join(self.disk_cache_dir, "acb"))
1713         with open(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock"), "wb") as f:
1714             f.write(b"foo")
1715
1716         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1717         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1718
1719         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1720                                                    disk_cache_dir=self.disk_cache_dir,
1721                                                    max_slots=2)
1722
1723         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1724         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1725
1726         block_cache2 = arvados.keep.KeepBlockCache(disk_cache=True,
1727                                                    disk_cache_dir=self.disk_cache_dir,
1728                                                    max_slots=1)
1729
1730         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1731         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1732
1733
1734
1735     def test_disk_cache_error(self):
1736         os.chmod(self.disk_cache_dir, stat.S_IRUSR)
1737
1738         # Fail during cache initialization.
1739         with self.assertRaises(OSError):
1740             block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1741                                                       disk_cache_dir=self.disk_cache_dir)
1742
1743
1744     def test_disk_cache_write_error(self):
1745         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1746                                                   disk_cache_dir=self.disk_cache_dir)
1747
1748         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1749
1750         # Make the cache dir read-only
1751         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1752         os.chmod(os.path.join(self.disk_cache_dir, self.locator[0:3]), stat.S_IRUSR)
1753
1754         # Cache fails
1755         with self.assertRaises(arvados.errors.KeepCacheError):
1756             with tutil.mock_keep_responses(self.data, 200) as mock:
1757                 keep_client.get(self.locator)
1758
1759
1760     @mock.patch('mmap.mmap')
1761     def test_disk_cache_retry_write_error(self, mockmmap):
1762         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1763                                                   disk_cache_dir=self.disk_cache_dir)
1764
1765         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1766
1767         mockmmap.side_effect = (OSError(errno.ENOSPC, "no space"), self.data)
1768
1769         cache_max_before = block_cache.cache_max
1770
1771         with tutil.mock_keep_responses(self.data, 200) as mock:
1772             self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1773
1774         self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1775
1776         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1777             self.assertTrue(tutil.binary_compare(f.read(), self.data))
1778
1779         # shrank the cache in response to ENOSPC
1780         self.assertTrue(cache_max_before > block_cache.cache_max)
1781
1782
1783     @mock.patch('mmap.mmap')
1784     def test_disk_cache_retry_write_error2(self, mockmmap):
1785         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1786                                                   disk_cache_dir=self.disk_cache_dir)
1787
1788         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1789
1790         mockmmap.side_effect = (OSError(errno.ENOMEM, "no memory"), self.data)
1791
1792         slots_before = block_cache._max_slots
1793
1794         with tutil.mock_keep_responses(self.data, 200) as mock:
1795             self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1796
1797         self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1798
1799         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1800             self.assertTrue(tutil.binary_compare(f.read(), self.data))
1801
1802         # shrank the cache in response to ENOMEM
1803         self.assertTrue(slots_before > block_cache._max_slots)