18842: Turn disk write errors into KeepCacheError
[arvados.git] / sdk / python / tests / test_keep_client.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 standard_library.install_aliases()
9 from builtins import str
10 from builtins import range
11 from builtins import object
12 import hashlib
13 import mock
14 import os
15 import pycurl
16 import random
17 import re
18 import shutil
19 import socket
20 import sys
21 import stat
22 import tempfile
23 import time
24 import unittest
25 import urllib.parse
26
27 import parameterized
28
29 import arvados
30 import arvados.retry
31 import arvados.util
32 from . import arvados_testutil as tutil
33 from . import keepstub
34 from . import run_test_server
35
36 from .arvados_testutil import DiskCacheBase
37
38 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
39 class KeepTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
40     disk_cache = False
41     MAIN_SERVER = {}
42     KEEP_SERVER = {}
43     block_cache_test = None
44
45     @classmethod
46     def setUpClass(cls):
47         super(KeepTestCase, cls).setUpClass()
48         run_test_server.authorize_with("admin")
49         cls.api_client = arvados.api('v1')
50         cls.block_cache_test = DiskCacheBase()
51         cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
52                                              proxy='', local_store='',
53                                              block_cache=cls.block_cache_test.make_block_cache(cls.disk_cache))
54
55     @classmethod
56     def tearDownClass(cls):
57         super(KeepTestCase, cls).setUpClass()
58         cls.block_cache_test.tearDown()
59
60     def test_KeepBasicRWTest(self):
61         self.assertEqual(0, self.keep_client.upload_counter.get())
62         foo_locator = self.keep_client.put('foo')
63         self.assertRegex(
64             foo_locator,
65             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
66             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
67
68         # 6 bytes because uploaded 2 copies
69         self.assertEqual(6, self.keep_client.upload_counter.get())
70
71         self.assertEqual(0, self.keep_client.download_counter.get())
72         self.assertTrue(tutil.binary_compare(self.keep_client.get(foo_locator),
73                          b'foo'),
74                          'wrong content from Keep.get(md5("foo"))')
75         self.assertEqual(3, self.keep_client.download_counter.get())
76
77     def test_KeepBinaryRWTest(self):
78         blob_str = b'\xff\xfe\xf7\x00\x01\x02'
79         blob_locator = self.keep_client.put(blob_str)
80         self.assertRegex(
81             blob_locator,
82             '^7fc7c53b45e53926ba52821140fef396\+6',
83             ('wrong locator from Keep.put(<binarydata>):' + blob_locator))
84         self.assertEqual(self.keep_client.get(blob_locator),
85                          blob_str,
86                          'wrong content from Keep.get(md5(<binarydata>))')
87
88     def test_KeepLongBinaryRWTest(self):
89         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
90         for i in range(0, 23):
91             blob_data = blob_data + blob_data
92         blob_locator = self.keep_client.put(blob_data)
93         self.assertRegex(
94             blob_locator,
95             '^84d90fc0d8175dd5dcfab04b999bc956\+67108864',
96             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
97         self.assertEqual(self.keep_client.get(blob_locator),
98                          blob_data,
99                          'wrong content from Keep.get(md5(<binarydata>))')
100
101     @unittest.skip("unreliable test - please fix and close #8752")
102     def test_KeepSingleCopyRWTest(self):
103         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
104         blob_locator = self.keep_client.put(blob_data, copies=1)
105         self.assertRegex(
106             blob_locator,
107             '^c902006bc98a3eb4a3663b65ab4a6fab\+8',
108             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
109         self.assertEqual(self.keep_client.get(blob_locator),
110                          blob_data,
111                          'wrong content from Keep.get(md5(<binarydata>))')
112
113     def test_KeepEmptyCollectionTest(self):
114         blob_locator = self.keep_client.put('', copies=1)
115         self.assertRegex(
116             blob_locator,
117             '^d41d8cd98f00b204e9800998ecf8427e\+0',
118             ('wrong locator from Keep.put(""): ' + blob_locator))
119
120     def test_unicode_must_be_ascii(self):
121         # If unicode type, must only consist of valid ASCII
122         foo_locator = self.keep_client.put(u'foo')
123         self.assertRegex(
124             foo_locator,
125             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
126             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
127
128         if sys.version_info < (3, 0):
129             with self.assertRaises(UnicodeEncodeError):
130                 # Error if it is not ASCII
131                 self.keep_client.put(u'\xe2')
132
133         with self.assertRaises(AttributeError):
134             # Must be bytes or have an encode() method
135             self.keep_client.put({})
136
137     def test_KeepHeadTest(self):
138         locator = self.keep_client.put('test_head')
139         self.assertRegex(
140             locator,
141             '^b9a772c7049325feb7130fff1f8333e9\+9',
142             'wrong md5 hash from Keep.put for "test_head": ' + locator)
143         self.assertEqual(True, self.keep_client.head(locator))
144         self.assertEqual(self.keep_client.get(locator),
145                          b'test_head',
146                          'wrong content from Keep.get for "test_head"')
147
148 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
149 class KeepPermissionTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
150     disk_cache = False
151     MAIN_SERVER = {}
152     KEEP_SERVER = {'blob_signing': True}
153
154     def tearDown(self):
155         DiskCacheBase.tearDown(self)
156
157     def test_KeepBasicRWTest(self):
158         run_test_server.authorize_with('active')
159         keep_client = arvados.KeepClient(block_cache=self.make_block_cache(self.disk_cache))
160         foo_locator = keep_client.put('foo')
161         self.assertRegex(
162             foo_locator,
163             r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
164             'invalid locator from Keep.put("foo"): ' + foo_locator)
165         self.assertEqual(keep_client.get(foo_locator),
166                          b'foo',
167                          'wrong content from Keep.get(md5("foo"))')
168
169         # GET with an unsigned locator => NotFound
170         bar_locator = keep_client.put('bar')
171         unsigned_bar_locator = "37b51d194a7513e45b56f6524f2d51f2+3"
172         self.assertRegex(
173             bar_locator,
174             r'^37b51d194a7513e45b56f6524f2d51f2\+3\+A[a-f0-9]+@[a-f0-9]+$',
175             'invalid locator from Keep.put("bar"): ' + bar_locator)
176         self.assertRaises(arvados.errors.NotFoundError,
177                           keep_client.get,
178                           unsigned_bar_locator)
179
180         # GET from a different user => NotFound
181         run_test_server.authorize_with('spectator')
182         self.assertRaises(arvados.errors.NotFoundError,
183                           arvados.Keep.get,
184                           bar_locator)
185
186         # Unauthenticated GET for a signed locator => NotFound
187         # Unauthenticated GET for an unsigned locator => NotFound
188         keep_client.api_token = ''
189         self.assertRaises(arvados.errors.NotFoundError,
190                           keep_client.get,
191                           bar_locator)
192         self.assertRaises(arvados.errors.NotFoundError,
193                           keep_client.get,
194                           unsigned_bar_locator)
195
196 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
197 class KeepProxyTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
198     disk_cache = False
199     MAIN_SERVER = {}
200     KEEP_SERVER = {}
201     KEEP_PROXY_SERVER = {}
202
203     @classmethod
204     def setUpClass(cls):
205         super(KeepProxyTestCase, cls).setUpClass()
206         run_test_server.authorize_with('active')
207         cls.api_client = arvados.api('v1')
208
209     def tearDown(self):
210         super(KeepProxyTestCase, self).tearDown()
211         DiskCacheBase.tearDown(self)
212
213     def test_KeepProxyTest1(self):
214         # Will use ARVADOS_KEEP_SERVICES environment variable that
215         # is set by setUpClass().
216         keep_client = arvados.KeepClient(api_client=self.api_client,
217                                          local_store='', block_cache=self.make_block_cache(self.disk_cache))
218         baz_locator = keep_client.put('baz')
219         self.assertRegex(
220             baz_locator,
221             '^73feffa4b7f6bb68e44cf984c85f6e88\+3',
222             'wrong md5 hash from Keep.put("baz"): ' + baz_locator)
223         self.assertEqual(keep_client.get(baz_locator),
224                          b'baz',
225                          'wrong content from Keep.get(md5("baz"))')
226         self.assertTrue(keep_client.using_proxy)
227
228     def test_KeepProxyTestMultipleURIs(self):
229         # Test using ARVADOS_KEEP_SERVICES env var overriding any
230         # existing proxy setting and setting multiple proxies
231         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
232         keep_client = arvados.KeepClient(api_client=self.api_client,
233                                          local_store='',
234                                          block_cache=self.make_block_cache(self.disk_cache))
235         uris = [x['_service_root'] for x in keep_client._keep_services]
236         self.assertEqual(uris, ['http://10.0.0.1/',
237                                 'https://foo.example.org:1234/'])
238
239     def test_KeepProxyTestInvalidURI(self):
240         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org'
241         with self.assertRaises(arvados.errors.ArgumentError):
242             keep_client = arvados.KeepClient(api_client=self.api_client,
243                                              local_store='',
244                                              block_cache=self.make_block_cache(self.disk_cache))
245
246 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
247 class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
248     disk_cache = False
249
250     def tearDown(self):
251         DiskCacheBase.tearDown(self)
252
253     def get_service_roots(self, api_client):
254         keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
255         services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
256         return [urllib.parse.urlparse(url) for url in sorted(services)]
257
258     def test_ssl_flag_respected_in_roots(self):
259         for ssl_flag in [False, True]:
260             services = self.get_service_roots(self.mock_keep_services(
261                 service_ssl_flag=ssl_flag))
262             self.assertEqual(
263                 ('https' if ssl_flag else 'http'), services[0].scheme)
264
265     def test_correct_ports_with_ipv6_addresses(self):
266         service = self.get_service_roots(self.mock_keep_services(
267             service_type='proxy', service_host='100::1', service_port=10, count=1))[0]
268         self.assertEqual('100::1', service.hostname)
269         self.assertEqual(10, service.port)
270
271     def test_recognize_proxy_services_in_controller_response(self):
272         keep_client = arvados.KeepClient(api_client=self.mock_keep_services(
273             service_type='proxy', service_host='localhost', service_port=9, count=1),
274                                          block_cache=self.make_block_cache(self.disk_cache))
275         try:
276             # this will fail, but it ensures we get the service
277             # discovery response
278             keep_client.put('baz2')
279         except:
280             pass
281         self.assertTrue(keep_client.using_proxy)
282
283     def test_insecure_disables_tls_verify(self):
284         api_client = self.mock_keep_services(count=1)
285         force_timeout = socket.timeout("timed out")
286
287         api_client.insecure = True
288         with tutil.mock_keep_responses(b'foo', 200) as mock:
289             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
290             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
291             self.assertEqual(
292                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
293                 0)
294             self.assertEqual(
295                 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
296                 0)
297
298         api_client.insecure = False
299         with tutil.mock_keep_responses(b'foo', 200) as mock:
300             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
301             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
302             # getopt()==None here means we didn't change the
303             # default. If we were using real pycurl instead of a mock,
304             # it would return the default value 1.
305             self.assertEqual(
306                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
307                 None)
308             self.assertEqual(
309                 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
310                 None)
311
312     def test_refresh_signature(self):
313         blk_digest = '6f5902ac237024bdd0c176cb93063dc4+11'
314         blk_sig = 'da39a3ee5e6b4b0d3255bfef95601890afd80709@53bed294'
315         local_loc = blk_digest+'+A'+blk_sig
316         remote_loc = blk_digest+'+R'+blk_sig
317         api_client = self.mock_keep_services(count=1)
318         headers = {'X-Keep-Locator':local_loc}
319         with tutil.mock_keep_responses('', 200, **headers):
320             # Check that the translated locator gets returned
321             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
322             self.assertEqual(local_loc, keep_client.refresh_signature(remote_loc))
323             # Check that refresh_signature() uses the correct method and headers
324             keep_client._get_or_head = mock.MagicMock()
325             keep_client.refresh_signature(remote_loc)
326             args, kwargs = keep_client._get_or_head.call_args_list[0]
327             self.assertIn(remote_loc, args)
328             self.assertEqual("HEAD", kwargs['method'])
329             self.assertIn('X-Keep-Signature', kwargs['headers'])
330
331     # test_*_timeout verify that KeepClient instructs pycurl to use
332     # the appropriate connection and read timeouts. They don't care
333     # whether pycurl actually exhibits the expected timeout behavior
334     # -- those tests are in the KeepClientTimeout test class.
335
336     def test_get_timeout(self):
337         api_client = self.mock_keep_services(count=1)
338         force_timeout = socket.timeout("timed out")
339         with tutil.mock_keep_responses(force_timeout, 0) as mock:
340             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
341             with self.assertRaises(arvados.errors.KeepReadError):
342                 keep_client.get('ffffffffffffffffffffffffffffffff')
343             self.assertEqual(
344                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
345                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
346             self.assertEqual(
347                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
348                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
349             self.assertEqual(
350                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
351                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
352
353     def test_put_timeout(self):
354         api_client = self.mock_keep_services(count=1)
355         force_timeout = socket.timeout("timed out")
356         with tutil.mock_keep_responses(force_timeout, 0) as mock:
357             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
358             with self.assertRaises(arvados.errors.KeepWriteError):
359                 keep_client.put(b'foo')
360             self.assertEqual(
361                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
362                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
363             self.assertEqual(
364                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
365                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
366             self.assertEqual(
367                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
368                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
369
370     def test_head_timeout(self):
371         api_client = self.mock_keep_services(count=1)
372         force_timeout = socket.timeout("timed out")
373         with tutil.mock_keep_responses(force_timeout, 0) as mock:
374             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
375             with self.assertRaises(arvados.errors.KeepReadError):
376                 keep_client.head('ffffffffffffffffffffffffffffffff')
377             self.assertEqual(
378                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
379                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
380             self.assertEqual(
381                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
382                 None)
383             self.assertEqual(
384                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
385                 None)
386
387     def test_proxy_get_timeout(self):
388         api_client = self.mock_keep_services(service_type='proxy', count=1)
389         force_timeout = socket.timeout("timed out")
390         with tutil.mock_keep_responses(force_timeout, 0) as mock:
391             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
392             with self.assertRaises(arvados.errors.KeepReadError):
393                 keep_client.get('ffffffffffffffffffffffffffffffff')
394             self.assertEqual(
395                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
396                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
397             self.assertEqual(
398                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
399                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
400             self.assertEqual(
401                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
402                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
403
404     def test_proxy_head_timeout(self):
405         api_client = self.mock_keep_services(service_type='proxy', count=1)
406         force_timeout = socket.timeout("timed out")
407         with tutil.mock_keep_responses(force_timeout, 0) as mock:
408             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
409             with self.assertRaises(arvados.errors.KeepReadError):
410                 keep_client.head('ffffffffffffffffffffffffffffffff')
411             self.assertEqual(
412                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
413                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
414             self.assertEqual(
415                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
416                 None)
417             self.assertEqual(
418                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
419                 None)
420
421     def test_proxy_put_timeout(self):
422         self.disk_cache_dir = None
423         api_client = self.mock_keep_services(service_type='proxy', count=1)
424         force_timeout = socket.timeout("timed out")
425         with tutil.mock_keep_responses(force_timeout, 0) as mock:
426             keep_client = arvados.KeepClient(api_client=api_client)
427             with self.assertRaises(arvados.errors.KeepWriteError):
428                 keep_client.put('foo')
429             self.assertEqual(
430                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
431                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
432             self.assertEqual(
433                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
434                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
435             self.assertEqual(
436                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
437                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
438
439     def check_no_services_error(self, verb, exc_class):
440         api_client = mock.MagicMock(name='api_client')
441         api_client.keep_services().accessible().execute.side_effect = (
442             arvados.errors.ApiError)
443         keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
444         with self.assertRaises(exc_class) as err_check:
445             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
446         self.assertEqual(0, len(err_check.exception.request_errors()))
447
448     def test_get_error_with_no_services(self):
449         self.check_no_services_error('get', arvados.errors.KeepReadError)
450
451     def test_head_error_with_no_services(self):
452         self.check_no_services_error('head', arvados.errors.KeepReadError)
453
454     def test_put_error_with_no_services(self):
455         self.check_no_services_error('put', arvados.errors.KeepWriteError)
456
457     def check_errors_from_last_retry(self, verb, exc_class):
458         api_client = self.mock_keep_services(count=2)
459         req_mock = tutil.mock_keep_responses(
460             "retry error reporting test", 500, 500, 500, 500, 500, 500, 502, 502)
461         with req_mock, tutil.skip_sleep, \
462                 self.assertRaises(exc_class) as err_check:
463             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
464             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
465                                        num_retries=3)
466         self.assertEqual([502, 502], [
467                 getattr(error, 'status_code', None)
468                 for error in err_check.exception.request_errors().values()])
469         self.assertRegex(str(err_check.exception), r'failed to (read|write) .* after 4 attempts')
470
471     def test_get_error_reflects_last_retry(self):
472         self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
473
474     def test_head_error_reflects_last_retry(self):
475         self.check_errors_from_last_retry('head', arvados.errors.KeepReadError)
476
477     def test_put_error_reflects_last_retry(self):
478         self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
479
480     def test_put_error_does_not_include_successful_puts(self):
481         data = 'partial failure test'
482         data_loc = tutil.str_keep_locator(data)
483         api_client = self.mock_keep_services(count=3)
484         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
485                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
486             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
487             keep_client.put(data)
488         self.assertEqual(2, len(exc_check.exception.request_errors()))
489
490     def test_proxy_put_with_no_writable_services(self):
491         data = 'test with no writable services'
492         data_loc = tutil.str_keep_locator(data)
493         api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
494         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
495                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
496           keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
497           keep_client.put(data)
498         self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
499         self.assertEqual(0, len(exc_check.exception.request_errors()))
500
501     def test_oddball_service_get(self):
502         body = b'oddball service get'
503         api_client = self.mock_keep_services(service_type='fancynewblobstore')
504         with tutil.mock_keep_responses(body, 200):
505             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
506             actual = keep_client.get(tutil.str_keep_locator(body))
507         self.assertEqual(body, actual)
508
509     def test_oddball_service_put(self):
510         body = b'oddball service put'
511         pdh = tutil.str_keep_locator(body)
512         api_client = self.mock_keep_services(service_type='fancynewblobstore')
513         with tutil.mock_keep_responses(pdh, 200):
514             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
515             actual = keep_client.put(body, copies=1)
516         self.assertEqual(pdh, actual)
517
518     def test_oddball_service_writer_count(self):
519         body = b'oddball service writer count'
520         pdh = tutil.str_keep_locator(body)
521         api_client = self.mock_keep_services(service_type='fancynewblobstore',
522                                              count=4)
523         headers = {'x-keep-replicas-stored': 3}
524         with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
525                                        **headers) as req_mock:
526             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
527             actual = keep_client.put(body, copies=2)
528         self.assertEqual(pdh, actual)
529         self.assertEqual(1, req_mock.call_count)
530
531 @tutil.skip_sleep
532 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
533 class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
534     disk_cache = False
535
536     def setUp(self):
537         self.api_client = self.mock_keep_services(count=2)
538         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
539         self.data = b'xyzzy'
540         self.locator = '1271ed5ef305aadabc605b1609e24c52'
541
542     def tearDown(self):
543         DiskCacheBase.tearDown(self)
544
545     @mock.patch('arvados.KeepClient.KeepService.get')
546     def test_get_request_cache(self, get_mock):
547         with tutil.mock_keep_responses(self.data, 200, 200):
548             self.keep_client.get(self.locator)
549             self.keep_client.get(self.locator)
550         # Request already cached, don't require more than one request
551         get_mock.assert_called_once()
552
553     @mock.patch('arvados.KeepClient.KeepService.get')
554     def test_head_request_cache(self, get_mock):
555         with tutil.mock_keep_responses(self.data, 200, 200):
556             self.keep_client.head(self.locator)
557             self.keep_client.head(self.locator)
558         # Don't cache HEAD requests so that they're not confused with GET reqs
559         self.assertEqual(2, get_mock.call_count)
560
561     @mock.patch('arvados.KeepClient.KeepService.get')
562     def test_head_and_then_get_return_different_responses(self, get_mock):
563         head_resp = None
564         get_resp = None
565         get_mock.side_effect = [b'first response', b'second response']
566         with tutil.mock_keep_responses(self.data, 200, 200):
567             head_resp = self.keep_client.head(self.locator)
568             get_resp = self.keep_client.get(self.locator)
569         self.assertEqual(b'first response', head_resp)
570         # First reponse was not cached because it was from a HEAD request.
571         self.assertNotEqual(head_resp, get_resp)
572
573
574
575
576 @tutil.skip_sleep
577 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
578 class KeepStorageClassesTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
579     disk_cache = False
580
581     def setUp(self):
582         self.api_client = self.mock_keep_services(count=2)
583         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
584         self.data = b'xyzzy'
585         self.locator = '1271ed5ef305aadabc605b1609e24c52'
586
587     def tearDown(self):
588         DiskCacheBase.tearDown(self)
589
590     def test_multiple_default_storage_classes_req_header(self):
591         api_mock = self.api_client_mock()
592         api_mock.config.return_value = {
593             'StorageClasses': {
594                 'foo': { 'Default': True },
595                 'bar': { 'Default': True },
596                 'baz': { 'Default': False }
597             }
598         }
599         api_client = self.mock_keep_services(api_mock=api_mock, count=2)
600         keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
601         resp_hdr = {
602             'x-keep-storage-classes-confirmed': 'foo=1, bar=1',
603             'x-keep-replicas-stored': 1
604         }
605         with tutil.mock_keep_responses(self.locator, 200, **resp_hdr) as mock:
606             keep_client.put(self.data, copies=1)
607             req_hdr = mock.responses[0]
608             self.assertIn(
609                 'X-Keep-Storage-Classes: bar, foo', req_hdr.getopt(pycurl.HTTPHEADER))
610
611     def test_storage_classes_req_header(self):
612         self.assertEqual(
613             self.api_client.config()['StorageClasses'],
614             {'default': {'Default': True}})
615         cases = [
616             # requested, expected
617             [['foo'], 'X-Keep-Storage-Classes: foo'],
618             [['bar', 'foo'], 'X-Keep-Storage-Classes: bar, foo'],
619             [[], 'X-Keep-Storage-Classes: default'],
620             [None, 'X-Keep-Storage-Classes: default'],
621         ]
622         for req_classes, expected_header in cases:
623             headers = {'x-keep-replicas-stored': 1}
624             if req_classes is None or len(req_classes) == 0:
625                 confirmed_hdr = 'default=1'
626             elif len(req_classes) > 0:
627                 confirmed_hdr = ', '.join(["{}=1".format(cls) for cls in req_classes])
628             headers.update({'x-keep-storage-classes-confirmed': confirmed_hdr})
629             with tutil.mock_keep_responses(self.locator, 200, **headers) as mock:
630                 self.keep_client.put(self.data, copies=1, classes=req_classes)
631                 req_hdr = mock.responses[0]
632                 self.assertIn(expected_header, req_hdr.getopt(pycurl.HTTPHEADER))
633
634     def test_partial_storage_classes_put(self):
635         headers = {
636             'x-keep-replicas-stored': 1,
637             'x-keep-storage-classes-confirmed': 'foo=1'}
638         with tutil.mock_keep_responses(self.locator, 200, 503, **headers) as mock:
639             with self.assertRaises(arvados.errors.KeepWriteError):
640                 self.keep_client.put(self.data, copies=1, classes=['foo', 'bar'])
641             # 1st request, both classes pending
642             req1_headers = mock.responses[0].getopt(pycurl.HTTPHEADER)
643             self.assertIn('X-Keep-Storage-Classes: bar, foo', req1_headers)
644             # 2nd try, 'foo' class already satisfied
645             req2_headers = mock.responses[1].getopt(pycurl.HTTPHEADER)
646             self.assertIn('X-Keep-Storage-Classes: bar', req2_headers)
647
648     def test_successful_storage_classes_put_requests(self):
649         cases = [
650             # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, expected_requests
651             [ 1, ['foo'], 1, 'foo=1', 1],
652             [ 1, ['foo'], 2, 'foo=2', 1],
653             [ 2, ['foo'], 2, 'foo=2', 1],
654             [ 2, ['foo'], 1, 'foo=1', 2],
655             [ 1, ['foo', 'bar'], 1, 'foo=1, bar=1', 1],
656             [ 1, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
657             [ 2, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
658             [ 2, ['foo', 'bar'], 1, 'foo=1, bar=1', 2],
659             [ 1, ['foo', 'bar'], 1, None, 1],
660             [ 1, ['foo'], 1, None, 1],
661             [ 2, ['foo'], 2, None, 1],
662             [ 2, ['foo'], 1, None, 2],
663         ]
664         for w_copies, w_classes, c_copies, c_classes, e_reqs in cases:
665             headers = {'x-keep-replicas-stored': c_copies}
666             if c_classes is not None:
667                 headers.update({'x-keep-storage-classes-confirmed': c_classes})
668             with tutil.mock_keep_responses(self.locator, 200, 200, **headers) as mock:
669                 case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}", expected_requests={}'.format(w_copies, ', '.join(w_classes), c_copies, c_classes, e_reqs)
670                 self.assertEqual(self.locator,
671                     self.keep_client.put(self.data, copies=w_copies, classes=w_classes),
672                     case_desc)
673                 self.assertEqual(e_reqs, mock.call_count, case_desc)
674
675     def test_failed_storage_classes_put_requests(self):
676         cases = [
677             # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, return_code
678             [ 1, ['foo'], 1, 'bar=1', 200],
679             [ 1, ['foo'], 1, None, 503],
680             [ 2, ['foo'], 1, 'bar=1, foo=0', 200],
681             [ 3, ['foo'], 1, 'bar=1, foo=1', 200],
682             [ 3, ['foo', 'bar'], 1, 'bar=2, foo=1', 200],
683         ]
684         for w_copies, w_classes, c_copies, c_classes, return_code in cases:
685             headers = {'x-keep-replicas-stored': c_copies}
686             if c_classes is not None:
687                 headers.update({'x-keep-storage-classes-confirmed': c_classes})
688             with tutil.mock_keep_responses(self.locator, return_code, return_code, **headers):
689                 case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}"'.format(w_copies, ', '.join(w_classes), c_copies, c_classes)
690                 with self.assertRaises(arvados.errors.KeepWriteError, msg=case_desc):
691                     self.keep_client.put(self.data, copies=w_copies, classes=w_classes)
692
693 @tutil.skip_sleep
694 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
695 class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
696     disk_cache = False
697
698     def setUp(self):
699         self.api_client = self.mock_keep_services(count=2)
700         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
701         self.data = b'xyzzy'
702         self.locator = '1271ed5ef305aadabc605b1609e24c52'
703         self.test_id = arvados.util.new_request_id()
704         self.assertRegex(self.test_id, r'^req-[a-z0-9]{20}$')
705         # If we don't set request_id to None explicitly here, it will
706         # return <MagicMock name='api_client_mock.request_id'
707         # id='123456789'>:
708         self.api_client.request_id = None
709
710     def tearDown(self):
711         DiskCacheBase.tearDown(self)
712
713     def test_default_to_api_client_request_id(self):
714         self.api_client.request_id = self.test_id
715         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
716             self.keep_client.put(self.data)
717         self.assertEqual(2, len(mock.responses))
718         for resp in mock.responses:
719             self.assertProvidedRequestId(resp)
720
721         with tutil.mock_keep_responses(self.data, 200) as mock:
722             self.keep_client.get(self.locator)
723         self.assertProvidedRequestId(mock.responses[0])
724
725         with tutil.mock_keep_responses(b'', 200) as mock:
726             self.keep_client.head(self.locator)
727         self.assertProvidedRequestId(mock.responses[0])
728
729     def test_explicit_request_id(self):
730         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
731             self.keep_client.put(self.data, request_id=self.test_id)
732         self.assertEqual(2, len(mock.responses))
733         for resp in mock.responses:
734             self.assertProvidedRequestId(resp)
735
736         with tutil.mock_keep_responses(self.data, 200) as mock:
737             self.keep_client.get(self.locator, request_id=self.test_id)
738         self.assertProvidedRequestId(mock.responses[0])
739
740         with tutil.mock_keep_responses(b'', 200) as mock:
741             self.keep_client.head(self.locator, request_id=self.test_id)
742         self.assertProvidedRequestId(mock.responses[0])
743
744     def test_automatic_request_id(self):
745         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
746             self.keep_client.put(self.data)
747         self.assertEqual(2, len(mock.responses))
748         for resp in mock.responses:
749             self.assertAutomaticRequestId(resp)
750
751         with tutil.mock_keep_responses(self.data, 200) as mock:
752             self.keep_client.get(self.locator)
753         self.assertAutomaticRequestId(mock.responses[0])
754
755         with tutil.mock_keep_responses(b'', 200) as mock:
756             self.keep_client.head(self.locator)
757         self.assertAutomaticRequestId(mock.responses[0])
758
759     def test_request_id_in_exception(self):
760         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
761             with self.assertRaisesRegex(arvados.errors.KeepReadError, self.test_id):
762                 self.keep_client.head(self.locator, request_id=self.test_id)
763
764         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
765             with self.assertRaisesRegex(arvados.errors.KeepReadError, r'req-[a-z0-9]{20}'):
766                 self.keep_client.get(self.locator)
767
768         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
769             with self.assertRaisesRegex(arvados.errors.KeepWriteError, self.test_id):
770                 self.keep_client.put(self.data, request_id=self.test_id)
771
772         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
773             with self.assertRaisesRegex(arvados.errors.KeepWriteError, r'req-[a-z0-9]{20}'):
774                 self.keep_client.put(self.data)
775
776     def assertAutomaticRequestId(self, resp):
777         hdr = [x for x in resp.getopt(pycurl.HTTPHEADER)
778                if x.startswith('X-Request-Id: ')][0]
779         self.assertNotEqual(hdr, 'X-Request-Id: '+self.test_id)
780         self.assertRegex(hdr, r'^X-Request-Id: req-[a-z0-9]{20}$')
781
782     def assertProvidedRequestId(self, resp):
783         self.assertIn('X-Request-Id: '+self.test_id,
784                       resp.getopt(pycurl.HTTPHEADER))
785
786
787 @tutil.skip_sleep
788 #@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
789 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
790     disk_cache = False
791
792     def setUp(self):
793         # expected_order[i] is the probe order for
794         # hash=md5(sprintf("%064x",i)) where there are 16 services
795         # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
796         # the first probe for the block consisting of 64 "0"
797         # characters is the service whose uuid is
798         # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
799         self.services = 16
800         self.expected_order = [
801             list('3eab2d5fc9681074'),
802             list('097dba52e648f1c3'),
803             list('c5b4e023f8a7d691'),
804             list('9d81c02e76a3bf54'),
805             ]
806         self.blocks = [
807             "{:064x}".format(x).encode()
808             for x in range(len(self.expected_order))]
809         self.hashes = [
810             hashlib.md5(self.blocks[x]).hexdigest()
811             for x in range(len(self.expected_order))]
812         self.api_client = self.mock_keep_services(count=self.services)
813         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
814
815     def tearDown(self):
816         DiskCacheBase.tearDown(self)
817
818     def test_weighted_service_roots_against_reference_set(self):
819         # Confirm weighted_service_roots() returns the correct order
820         for i, hash in enumerate(self.hashes):
821             roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash))
822             got_order = [
823                 re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
824                 for root in roots]
825             self.assertEqual(self.expected_order[i], got_order)
826
827     def test_get_probe_order_against_reference_set(self):
828         self._test_probe_order_against_reference_set(
829             lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
830
831     def test_head_probe_order_against_reference_set(self):
832         self._test_probe_order_against_reference_set(
833             lambda i: self.keep_client.head(self.hashes[i], num_retries=1))
834
835     def test_put_probe_order_against_reference_set(self):
836         # copies=1 prevents the test from being sensitive to races
837         # between writer threads.
838         self._test_probe_order_against_reference_set(
839             lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1))
840
841     def _test_probe_order_against_reference_set(self, op):
842         for i in range(len(self.blocks)):
843             with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \
844                  self.assertRaises(arvados.errors.KeepRequestError):
845                 op(i)
846             got_order = [
847                 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
848                 for resp in mock.responses]
849             self.assertEqual(self.expected_order[i]*2, got_order)
850
851     def test_put_probe_order_multiple_copies(self):
852         for copies in range(2, 4):
853             for i in range(len(self.blocks)):
854                 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \
855                      self.assertRaises(arvados.errors.KeepWriteError):
856                     self.keep_client.put(self.blocks[i], num_retries=2, copies=copies)
857                 got_order = [
858                     re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
859                     for resp in mock.responses]
860                 # With T threads racing to make requests, the position
861                 # of a given server in the sequence of HTTP requests
862                 # (got_order) cannot be more than T-1 positions
863                 # earlier than that server's position in the reference
864                 # probe sequence (expected_order).
865                 #
866                 # Loop invariant: we have accounted for +pos+ expected
867                 # probes, either by seeing them in +got_order+ or by
868                 # putting them in +pending+ in the hope of seeing them
869                 # later. As long as +len(pending)<T+, we haven't
870                 # started a request too early.
871                 pending = []
872                 for pos, expected in enumerate(self.expected_order[i]*3):
873                     got = got_order[pos-len(pending)]
874                     while got in pending:
875                         del pending[pending.index(got)]
876                         got = got_order[pos-len(pending)]
877                     if got != expected:
878                         pending.append(expected)
879                         self.assertLess(
880                             len(pending), copies,
881                             "pending={}, with copies={}, got {}, expected {}".format(
882                                 pending, copies, repr(got_order), repr(self.expected_order[i]*3)))
883
884     def test_probe_waste_adding_one_server(self):
885         hashes = [
886             hashlib.md5("{:064x}".format(x).encode()).hexdigest() for x in range(100)]
887         initial_services = 12
888         self.api_client = self.mock_keep_services(count=initial_services)
889         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
890         probes_before = [
891             self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
892         for added_services in range(1, 12):
893             api_client = self.mock_keep_services(count=initial_services+added_services)
894             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
895             total_penalty = 0
896             for hash_index in range(len(hashes)):
897                 probe_after = keep_client.weighted_service_roots(
898                     arvados.KeepLocator(hashes[hash_index]))
899                 penalty = probe_after.index(probes_before[hash_index][0])
900                 self.assertLessEqual(penalty, added_services)
901                 total_penalty += penalty
902             # Average penalty per block should not exceed
903             # N(added)/N(orig) by more than 20%, and should get closer
904             # to the ideal as we add data points.
905             expect_penalty = (
906                 added_services *
907                 len(hashes) / initial_services)
908             max_penalty = (
909                 expect_penalty *
910                 (120 - added_services)/100)
911             min_penalty = (
912                 expect_penalty * 8/10)
913             self.assertTrue(
914                 min_penalty <= total_penalty <= max_penalty,
915                 "With {}+{} services, {} blocks, penalty {} but expected {}..{}".format(
916                     initial_services,
917                     added_services,
918                     len(hashes),
919                     total_penalty,
920                     min_penalty,
921                     max_penalty))
922
923     def check_64_zeros_error_order(self, verb, exc_class):
924         data = b'0' * 64
925         if verb == 'get':
926             data = tutil.str_keep_locator(data)
927         # Arbitrary port number:
928         aport = random.randint(1024,65535)
929         api_client = self.mock_keep_services(service_port=aport, count=self.services)
930         keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
931         with mock.patch('pycurl.Curl') as curl_mock, \
932              self.assertRaises(exc_class) as err_check:
933             curl_mock.return_value = tutil.FakeCurl.make(code=500, body=b'')
934             getattr(keep_client, verb)(data)
935         urls = [urllib.parse.urlparse(url)
936                 for url in err_check.exception.request_errors()]
937         self.assertEqual([('keep0x' + c, aport) for c in '3eab2d5fc9681074'],
938                          [(url.hostname, url.port) for url in urls])
939
940     def test_get_error_shows_probe_order(self):
941         self.check_64_zeros_error_order('get', arvados.errors.KeepReadError)
942
943     def test_put_error_shows_probe_order(self):
944         self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
945
946 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
947 class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase, DiskCacheBase):
948     disk_cache = False
949
950     # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
951     # 1s worth of data and then trigger bandwidth errors before running
952     # out of data.
953     DATA = b'x'*2**11
954     BANDWIDTH_LOW_LIM = 1024
955     TIMEOUT_TIME = 1.0
956
957     def tearDown(self):
958         DiskCacheBase.tearDown(self)
959
960     class assertTakesBetween(unittest.TestCase):
961         def __init__(self, tmin, tmax):
962             self.tmin = tmin
963             self.tmax = tmax
964
965         def __enter__(self):
966             self.t0 = time.time()
967
968         def __exit__(self, *args, **kwargs):
969             # Round times to milliseconds, like CURL. Otherwise, we
970             # fail when CURL reaches a 1s timeout at 0.9998s.
971             delta = round(time.time() - self.t0, 3)
972             self.assertGreaterEqual(delta, self.tmin)
973             self.assertLessEqual(delta, self.tmax)
974
975     class assertTakesGreater(unittest.TestCase):
976         def __init__(self, tmin):
977             self.tmin = tmin
978
979         def __enter__(self):
980             self.t0 = time.time()
981
982         def __exit__(self, *args, **kwargs):
983             delta = round(time.time() - self.t0, 3)
984             self.assertGreaterEqual(delta, self.tmin)
985
986     def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
987         return arvados.KeepClient(
988             api_client=self.api_client,
989             timeout=timeouts, block_cache=self.make_block_cache(self.disk_cache))
990
991     def test_timeout_slow_connect(self):
992         # Can't simulate TCP delays with our own socket. Leave our
993         # stub server running uselessly, and try to connect to an
994         # unroutable IP address instead.
995         self.api_client = self.mock_keep_services(
996             count=1,
997             service_host='240.0.0.0',
998         )
999         with self.assertTakesBetween(0.1, 0.5):
1000             with self.assertRaises(arvados.errors.KeepWriteError):
1001                 self.keepClient().put(self.DATA, copies=1, num_retries=0)
1002
1003     def test_low_bandwidth_no_delays_success(self):
1004         self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
1005         kc = self.keepClient()
1006         loc = kc.put(self.DATA, copies=1, num_retries=0)
1007         self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
1008
1009     def test_too_low_bandwidth_no_delays_failure(self):
1010         # Check that lessening bandwidth corresponds to failing
1011         kc = self.keepClient()
1012         loc = kc.put(self.DATA, copies=1, num_retries=0)
1013         self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
1014         with self.assertTakesGreater(self.TIMEOUT_TIME):
1015             with self.assertRaises(arvados.errors.KeepReadError):
1016                 kc.get(loc, num_retries=0)
1017         with self.assertTakesGreater(self.TIMEOUT_TIME):
1018             with self.assertRaises(arvados.errors.KeepWriteError):
1019                 kc.put(self.DATA, copies=1, num_retries=0)
1020
1021     def test_low_bandwidth_with_server_response_delay_failure(self):
1022         kc = self.keepClient()
1023         loc = kc.put(self.DATA, copies=1, num_retries=0)
1024         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
1025         # Note the actual delay must be 1s longer than the low speed
1026         # limit interval in order for curl to detect it reliably.
1027         self.server.setdelays(response=self.TIMEOUT_TIME+1)
1028         with self.assertTakesGreater(self.TIMEOUT_TIME):
1029             with self.assertRaises(arvados.errors.KeepReadError):
1030                 kc.get(loc, num_retries=0)
1031         with self.assertTakesGreater(self.TIMEOUT_TIME):
1032             with self.assertRaises(arvados.errors.KeepWriteError):
1033                 kc.put(self.DATA, copies=1, num_retries=0)
1034         with self.assertTakesGreater(self.TIMEOUT_TIME):
1035             kc.head(loc, num_retries=0)
1036
1037     def test_low_bandwidth_with_server_mid_delay_failure(self):
1038         kc = self.keepClient()
1039         loc = kc.put(self.DATA, copies=1, num_retries=0)
1040         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
1041         # Note the actual delay must be 1s longer than the low speed
1042         # limit interval in order for curl to detect it reliably.
1043         self.server.setdelays(mid_write=self.TIMEOUT_TIME+1, mid_read=self.TIMEOUT_TIME+1)
1044         with self.assertTakesGreater(self.TIMEOUT_TIME):
1045             with self.assertRaises(arvados.errors.KeepReadError) as e:
1046                 kc.get(loc, num_retries=0)
1047         with self.assertTakesGreater(self.TIMEOUT_TIME):
1048             with self.assertRaises(arvados.errors.KeepWriteError):
1049                 kc.put(self.DATA, copies=1, num_retries=0)
1050
1051     def test_timeout_slow_request(self):
1052         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
1053         self.server.setdelays(request=.2)
1054         self._test_connect_timeout_under_200ms(loc)
1055         self.server.setdelays(request=2)
1056         self._test_response_timeout_under_2s(loc)
1057
1058     def test_timeout_slow_response(self):
1059         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
1060         self.server.setdelays(response=.2)
1061         self._test_connect_timeout_under_200ms(loc)
1062         self.server.setdelays(response=2)
1063         self._test_response_timeout_under_2s(loc)
1064
1065     def test_timeout_slow_response_body(self):
1066         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
1067         self.server.setdelays(response_body=.2)
1068         self._test_connect_timeout_under_200ms(loc)
1069         self.server.setdelays(response_body=2)
1070         self._test_response_timeout_under_2s(loc)
1071
1072     def _test_connect_timeout_under_200ms(self, loc):
1073         # Allow 100ms to connect, then 1s for response. Everything
1074         # should work, and everything should take at least 200ms to
1075         # return.
1076         kc = self.keepClient(timeouts=(.1, 1))
1077         with self.assertTakesBetween(.2, .3):
1078             kc.put(self.DATA, copies=1, num_retries=0)
1079         with self.assertTakesBetween(.2, .3):
1080             self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
1081
1082     def _test_response_timeout_under_2s(self, loc):
1083         # Allow 10s to connect, then 1s for response. Nothing should
1084         # work, and everything should take at least 1s to return.
1085         kc = self.keepClient(timeouts=(10, 1))
1086         with self.assertTakesBetween(1, 9):
1087             with self.assertRaises(arvados.errors.KeepReadError):
1088                 kc.get(loc, num_retries=0)
1089         with self.assertTakesBetween(1, 9):
1090             with self.assertRaises(arvados.errors.KeepWriteError):
1091                 kc.put(self.DATA, copies=1, num_retries=0)
1092
1093 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1094 class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
1095     disk_cache = False
1096
1097     def tearDown(self):
1098         DiskCacheBase.tearDown(self)
1099
1100     def mock_disks_and_gateways(self, disks=3, gateways=1):
1101         self.gateways = [{
1102                 'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
1103                 'owner_uuid': 'zzzzz-tpzed-000000000000000',
1104                 'service_host': 'gatewayhost{}'.format(i),
1105                 'service_port': 12345,
1106                 'service_ssl_flag': True,
1107                 'service_type': 'gateway:test',
1108         } for i in range(gateways)]
1109         self.gateway_roots = [
1110             "https://{service_host}:{service_port}/".format(**gw)
1111             for gw in self.gateways]
1112         self.api_client = self.mock_keep_services(
1113             count=disks, additional_services=self.gateways)
1114         self.keepClient = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
1115
1116     @mock.patch('pycurl.Curl')
1117     def test_get_with_gateway_hint_first(self, MockCurl):
1118         MockCurl.return_value = tutil.FakeCurl.make(
1119             code=200, body='foo', headers={'Content-Length': 3})
1120         self.mock_disks_and_gateways()
1121         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@' + self.gateways[0]['uuid']
1122         self.assertEqual(b'foo', self.keepClient.get(locator))
1123         self.assertEqual(self.gateway_roots[0]+locator,
1124                          MockCurl.return_value.getopt(pycurl.URL).decode())
1125         self.assertEqual(True, self.keepClient.head(locator))
1126
1127     @mock.patch('pycurl.Curl')
1128     def test_get_with_gateway_hints_in_order(self, MockCurl):
1129         gateways = 4
1130         disks = 3
1131         mocks = [
1132             tutil.FakeCurl.make(code=404, body='')
1133             for _ in range(gateways+disks)
1134         ]
1135         MockCurl.side_effect = tutil.queue_with(mocks)
1136         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1137         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1138                            ['K@'+gw['uuid'] for gw in self.gateways])
1139         with self.assertRaises(arvados.errors.NotFoundError):
1140             self.keepClient.get(locator)
1141         # Gateways are tried first, in the order given.
1142         for i, root in enumerate(self.gateway_roots):
1143             self.assertEqual(root+locator,
1144                              mocks[i].getopt(pycurl.URL).decode())
1145         # Disk services are tried next.
1146         for i in range(gateways, gateways+disks):
1147             self.assertRegex(
1148                 mocks[i].getopt(pycurl.URL).decode(),
1149                 r'keep0x')
1150
1151     @mock.patch('pycurl.Curl')
1152     def test_head_with_gateway_hints_in_order(self, MockCurl):
1153         gateways = 4
1154         disks = 3
1155         mocks = [
1156             tutil.FakeCurl.make(code=404, body=b'')
1157             for _ in range(gateways+disks)
1158         ]
1159         MockCurl.side_effect = tutil.queue_with(mocks)
1160         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1161         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1162                            ['K@'+gw['uuid'] for gw in self.gateways])
1163         with self.assertRaises(arvados.errors.NotFoundError):
1164             self.keepClient.head(locator)
1165         # Gateways are tried first, in the order given.
1166         for i, root in enumerate(self.gateway_roots):
1167             self.assertEqual(root+locator,
1168                              mocks[i].getopt(pycurl.URL).decode())
1169         # Disk services are tried next.
1170         for i in range(gateways, gateways+disks):
1171             self.assertRegex(
1172                 mocks[i].getopt(pycurl.URL).decode(),
1173                 r'keep0x')
1174
1175     @mock.patch('pycurl.Curl')
1176     def test_get_with_remote_proxy_hint(self, MockCurl):
1177         MockCurl.return_value = tutil.FakeCurl.make(
1178             code=200, body=b'foo', headers={'Content-Length': 3})
1179         self.mock_disks_and_gateways()
1180         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1181         self.assertEqual(b'foo', self.keepClient.get(locator))
1182         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1183                          MockCurl.return_value.getopt(pycurl.URL).decode())
1184
1185     @mock.patch('pycurl.Curl')
1186     def test_head_with_remote_proxy_hint(self, MockCurl):
1187         MockCurl.return_value = tutil.FakeCurl.make(
1188             code=200, body=b'foo', headers={'Content-Length': 3})
1189         self.mock_disks_and_gateways()
1190         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1191         self.assertEqual(True, self.keepClient.head(locator))
1192         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1193                          MockCurl.return_value.getopt(pycurl.URL).decode())
1194
1195 class KeepClientRetryTestMixin(object):
1196     disk_cache = False
1197
1198     # Testing with a local Keep store won't exercise the retry behavior.
1199     # Instead, our strategy is:
1200     # * Create a client with one proxy specified (pointed at a black
1201     #   hole), so there's no need to instantiate an API client, and
1202     #   all HTTP requests come from one place.
1203     # * Mock httplib's request method to provide simulated responses.
1204     # This lets us test the retry logic extensively without relying on any
1205     # supporting servers, and prevents side effects in case something hiccups.
1206     # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
1207     # run_method().
1208     #
1209     # Test classes must define TEST_PATCHER to a method that mocks
1210     # out appropriate methods in the client.
1211
1212     PROXY_ADDR = 'http://[%s]:65535/' % (tutil.TEST_HOST,)
1213     TEST_DATA = b'testdata'
1214     TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
1215
1216     def setUp(self):
1217         self.client_kwargs = {'proxy': self.PROXY_ADDR, 'local_store': ''}
1218
1219     def new_client(self, **caller_kwargs):
1220         kwargs = self.client_kwargs.copy()
1221         kwargs.update(caller_kwargs)
1222         kwargs['block_cache'] = self.make_block_cache(self.disk_cache)
1223         return arvados.KeepClient(**kwargs)
1224
1225     def run_method(self, *args, **kwargs):
1226         raise NotImplementedError("test subclasses must define run_method")
1227
1228     def check_success(self, expected=None, *args, **kwargs):
1229         if expected is None:
1230             expected = self.DEFAULT_EXPECT
1231         self.assertEqual(expected, self.run_method(*args, **kwargs))
1232
1233     def check_exception(self, error_class=None, *args, **kwargs):
1234         if error_class is None:
1235             error_class = self.DEFAULT_EXCEPTION
1236         with self.assertRaises(error_class) as err:
1237             self.run_method(*args, **kwargs)
1238         return err
1239
1240     def test_immediate_success(self):
1241         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 200):
1242             self.check_success()
1243
1244     def test_retry_then_success(self):
1245         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1246             self.check_success(num_retries=3)
1247
1248     def test_exception_then_success(self):
1249         with self.TEST_PATCHER(self.DEFAULT_EXPECT, Exception('mock err'), 200):
1250             self.check_success(num_retries=3)
1251
1252     def test_no_default_retry(self):
1253         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1254             self.check_exception()
1255
1256     def test_no_retry_after_permanent_error(self):
1257         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 403, 200):
1258             self.check_exception(num_retries=3)
1259
1260     def test_error_after_retries_exhausted(self):
1261         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 500, 200):
1262             err = self.check_exception(num_retries=1)
1263         self.assertRegex(str(err.exception), r'failed to .* after 2 attempts')
1264
1265     def test_num_retries_instance_fallback(self):
1266         self.client_kwargs['num_retries'] = 3
1267         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1268             self.check_success()
1269
1270
1271 @tutil.skip_sleep
1272 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1273 class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1274     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
1275     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1276     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1277     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1278
1279     def tearDown(self):
1280         DiskCacheBase.tearDown(self)
1281
1282     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1283                    *args, **kwargs):
1284         return self.new_client().get(locator, *args, **kwargs)
1285
1286     def test_specific_exception_when_not_found(self):
1287         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1288             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1289
1290     def test_general_exception_with_mixed_errors(self):
1291         # get should raise a NotFoundError if no server returns the block,
1292         # and a high threshold of servers report that it's not found.
1293         # This test rigs up 50/50 disagreement between two servers, and
1294         # checks that it does not become a NotFoundError.
1295         client = self.new_client()
1296         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1297             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1298                 client.get(self.HINTED_LOCATOR)
1299             self.assertNotIsInstance(
1300                 exc_check.exception, arvados.errors.NotFoundError,
1301                 "mixed errors raised NotFoundError")
1302
1303     def test_hint_server_can_succeed_without_retries(self):
1304         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1305             self.check_success(locator=self.HINTED_LOCATOR)
1306
1307     def test_try_next_server_after_timeout(self):
1308         with tutil.mock_keep_responses(
1309                 (socket.timeout("timed out"), 200),
1310                 (self.DEFAULT_EXPECT, 200)):
1311             self.check_success(locator=self.HINTED_LOCATOR)
1312
1313     def test_retry_data_with_wrong_checksum(self):
1314         with tutil.mock_keep_responses(
1315                 ('baddata', 200),
1316                 (self.DEFAULT_EXPECT, 200)):
1317             self.check_success(locator=self.HINTED_LOCATOR)
1318
1319 @tutil.skip_sleep
1320 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1321 class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1322     DEFAULT_EXPECT = True
1323     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1324     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1325     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1326
1327     def tearDown(self):
1328         DiskCacheBase.tearDown(self)
1329
1330     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1331                    *args, **kwargs):
1332         return self.new_client().head(locator, *args, **kwargs)
1333
1334     def test_specific_exception_when_not_found(self):
1335         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1336             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1337
1338     def test_general_exception_with_mixed_errors(self):
1339         # head should raise a NotFoundError if no server returns the block,
1340         # and a high threshold of servers report that it's not found.
1341         # This test rigs up 50/50 disagreement between two servers, and
1342         # checks that it does not become a NotFoundError.
1343         client = self.new_client()
1344         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1345             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1346                 client.head(self.HINTED_LOCATOR)
1347             self.assertNotIsInstance(
1348                 exc_check.exception, arvados.errors.NotFoundError,
1349                 "mixed errors raised NotFoundError")
1350
1351     def test_hint_server_can_succeed_without_retries(self):
1352         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1353             self.check_success(locator=self.HINTED_LOCATOR)
1354
1355     def test_try_next_server_after_timeout(self):
1356         with tutil.mock_keep_responses(
1357                 (socket.timeout("timed out"), 200),
1358                 (self.DEFAULT_EXPECT, 200)):
1359             self.check_success(locator=self.HINTED_LOCATOR)
1360
1361 @tutil.skip_sleep
1362 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1363 class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1364     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
1365     DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
1366     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1367
1368     def tearDown(self):
1369         DiskCacheBase.tearDown(self)
1370
1371     def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
1372                    copies=1, *args, **kwargs):
1373         return self.new_client().put(data, copies, *args, **kwargs)
1374
1375     def test_do_not_send_multiple_copies_to_same_server(self):
1376         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200):
1377             self.check_exception(copies=2, num_retries=3)
1378
1379
1380 class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
1381
1382     class FakeKeepService(object):
1383         def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
1384             self.delay = delay
1385             self.will_succeed = will_succeed
1386             self.will_raise = will_raise
1387             self._result = {}
1388             self._result['headers'] = {}
1389             self._result['headers']['x-keep-replicas-stored'] = str(replicas)
1390             self._result['headers']['x-keep-storage-classes-confirmed'] = 'default={}'.format(replicas)
1391             self._result['body'] = 'foobar'
1392
1393         def put(self, data_hash, data, timeout, headers):
1394             time.sleep(self.delay)
1395             if self.will_raise is not None:
1396                 raise self.will_raise
1397             return self.will_succeed
1398
1399         def last_result(self):
1400             if self.will_succeed:
1401                 return self._result
1402             else:
1403                 return {"status_code": 500, "body": "didn't succeed"}
1404
1405         def finished(self):
1406             return False
1407
1408     def setUp(self):
1409         self.copies = 3
1410         self.pool = arvados.KeepClient.KeepWriterThreadPool(
1411             data = 'foo',
1412             data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1413             max_service_replicas = self.copies,
1414             copies = self.copies
1415         )
1416
1417     def test_only_write_enough_on_success(self):
1418         for i in range(10):
1419             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1420             self.pool.add_task(ks, None)
1421         self.pool.join()
1422         self.assertEqual(self.pool.done(), (self.copies, []))
1423
1424     def test_only_write_enough_on_partial_success(self):
1425         for i in range(5):
1426             ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
1427             self.pool.add_task(ks, None)
1428             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1429             self.pool.add_task(ks, None)
1430         self.pool.join()
1431         self.assertEqual(self.pool.done(), (self.copies, []))
1432
1433     def test_only_write_enough_when_some_crash(self):
1434         for i in range(5):
1435             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1436             self.pool.add_task(ks, None)
1437             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1438             self.pool.add_task(ks, None)
1439         self.pool.join()
1440         self.assertEqual(self.pool.done(), (self.copies, []))
1441
1442     def test_fail_when_too_many_crash(self):
1443         for i in range(self.copies+1):
1444             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1445             self.pool.add_task(ks, None)
1446         for i in range(self.copies-1):
1447             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1448             self.pool.add_task(ks, None)
1449         self.pool.join()
1450         self.assertEqual(self.pool.done(), (self.copies-1, []))
1451
1452
1453 @tutil.skip_sleep
1454 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1455 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
1456     block_cache = False
1457
1458     # Test put()s that need two distinct servers to succeed, possibly
1459     # requiring multiple passes through the retry loop.
1460
1461     def setUp(self):
1462         self.api_client = self.mock_keep_services(count=2)
1463         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
1464
1465     def tearDown(self):
1466         DiskCacheBase.tearDown(self)
1467
1468     def test_success_after_exception(self):
1469         with tutil.mock_keep_responses(
1470                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1471                 Exception('mock err'), 200, 200) as req_mock:
1472             self.keep_client.put('foo', num_retries=1, copies=2)
1473         self.assertEqual(3, req_mock.call_count)
1474
1475     def test_success_after_retryable_error(self):
1476         with tutil.mock_keep_responses(
1477                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1478                 500, 200, 200) as req_mock:
1479             self.keep_client.put('foo', num_retries=1, copies=2)
1480         self.assertEqual(3, req_mock.call_count)
1481
1482     def test_fail_after_final_error(self):
1483         # First retry loop gets a 200 (can't achieve replication by
1484         # storing again on that server) and a 400 (can't retry that
1485         # server at all), so we shouldn't try a third request.
1486         with tutil.mock_keep_responses(
1487                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1488                 200, 400, 200) as req_mock:
1489             with self.assertRaises(arvados.errors.KeepWriteError):
1490                 self.keep_client.put('foo', num_retries=1, copies=2)
1491         self.assertEqual(2, req_mock.call_count)
1492
1493 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1494 class KeepClientAPIErrorTest(unittest.TestCase, DiskCacheBase):
1495     disk_cache = False
1496
1497     def tearDown(self):
1498         DiskCacheBase.tearDown(self)
1499
1500     def test_api_fail(self):
1501         class ApiMock(object):
1502             def __getattr__(self, r):
1503                 if r == "api_token":
1504                     return "abc"
1505                 elif r == "insecure":
1506                     return False
1507                 elif r == "config":
1508                     return lambda: {}
1509                 else:
1510                     raise arvados.errors.KeepReadError()
1511         keep_client = arvados.KeepClient(api_client=ApiMock(),
1512                                          proxy='', local_store='',
1513                                          block_cache=self.make_block_cache(self.disk_cache))
1514
1515         # The bug this is testing for is that if an API (not
1516         # keepstore) exception is thrown as part of a get(), the next
1517         # attempt to get that same block will result in a deadlock.
1518         # This is why there are two get()s in a row.  Unfortunately,
1519         # the failure mode for this test is that the test suite
1520         # deadlocks, there isn't a good way to avoid that without
1521         # adding a special case that has no use except for this test.
1522
1523         with self.assertRaises(arvados.errors.KeepReadError):
1524             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1525         with self.assertRaises(arvados.errors.KeepReadError):
1526             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1527
1528
1529 class KeepDiskCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
1530     def setUp(self):
1531         self.api_client = self.mock_keep_services(count=2)
1532         self.data = b'xyzzy'
1533         self.locator = '1271ed5ef305aadabc605b1609e24c52'
1534         self.disk_cache_dir = tempfile.mkdtemp()
1535
1536     def tearDown(self):
1537         shutil.rmtree(self.disk_cache_dir)
1538
1539
1540     @mock.patch('arvados.KeepClient.KeepService.get')
1541     def test_disk_cache_read(self, get_mock):
1542         # confirm it finds an existing cache block when the cache is
1543         # initialized.
1544
1545         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1546         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1547             f.write(self.data)
1548
1549         # block cache should have found the existing block
1550         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1551                                                   disk_cache_dir=self.disk_cache_dir)
1552         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1553
1554         self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1555
1556         get_mock.assert_not_called()
1557
1558
1559     @mock.patch('arvados.KeepClient.KeepService.get')
1560     def test_disk_cache_share(self, get_mock):
1561         # confirm it finds a cache block written after the disk cache
1562         # was initialized.
1563
1564         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1565                                                   disk_cache_dir=self.disk_cache_dir)
1566         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1567
1568         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1569         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1570             f.write(self.data)
1571
1572         # when we try to get the block, it'll check the disk and find it.
1573         self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1574
1575         get_mock.assert_not_called()
1576
1577
1578     def test_disk_cache_write(self):
1579         # confirm the cache block was created
1580
1581         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1582                                                   disk_cache_dir=self.disk_cache_dir)
1583         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1584
1585         with tutil.mock_keep_responses(self.data, 200) as mock:
1586             self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1587
1588         self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1589
1590         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1591             self.assertTrue(tutil.binary_compare(f.read(), self.data))
1592
1593
1594     def test_disk_cache_clean(self):
1595         # confirm that a tmp file in the cache is cleaned up
1596
1597         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1598         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock"), "wb") as f:
1599             f.write(b"abc1")
1600
1601         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC"), "wb") as f:
1602             f.write(b"abc2")
1603
1604         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC"), "wb") as f:
1605             f.write(b"abc3")
1606
1607         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1608         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1609         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1610
1611         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1612                                                   disk_cache_dir=self.disk_cache_dir)
1613
1614         # The tmp still hasn't been deleted because it was created in the last 60 seconds
1615         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1616         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1617         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1618
1619         # Set the mtime to 61s in the past
1620         os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock"), times=(time.time()-61, time.time()-61))
1621         os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC"), times=(time.time()-61, time.time()-61))
1622         os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC"), times=(time.time()-61, time.time()-61))
1623
1624         block_cache2 = arvados.keep.KeepBlockCache(disk_cache=True,
1625                                                    disk_cache_dir=self.disk_cache_dir)
1626
1627         # Tmp should be gone but the other ones are safe.
1628         self.assertFalse(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1629         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1630         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1631
1632
1633     @mock.patch('arvados.KeepClient.KeepService.get')
1634     def test_disk_cache_cap(self, get_mock):
1635         # confirm that the cache is kept to the desired limit
1636
1637         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1638         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1639             f.write(self.data)
1640
1641         os.makedirs(os.path.join(self.disk_cache_dir, "acb"))
1642         with open(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock"), "wb") as f:
1643             f.write(b"foo")
1644
1645         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1646         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1647
1648         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1649                                                    disk_cache_dir=self.disk_cache_dir,
1650                                                    max_slots=1)
1651
1652         self.assertFalse(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1653         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1654
1655
1656     @mock.patch('arvados.KeepClient.KeepService.get')
1657     def test_disk_cache_share(self, get_mock):
1658         # confirm that a second cache doesn't delete files that belong to the first cache.
1659
1660         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1661         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1662             f.write(self.data)
1663
1664         os.makedirs(os.path.join(self.disk_cache_dir, "acb"))
1665         with open(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock"), "wb") as f:
1666             f.write(b"foo")
1667
1668         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1669         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1670
1671         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1672                                                    disk_cache_dir=self.disk_cache_dir,
1673                                                    max_slots=2)
1674
1675         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1676         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1677
1678         block_cache2 = arvados.keep.KeepBlockCache(disk_cache=True,
1679                                                    disk_cache_dir=self.disk_cache_dir,
1680                                                    max_slots=1)
1681
1682         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1683         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1684
1685
1686
1687     def test_disk_cache_error(self):
1688         os.chmod(self.disk_cache_dir, stat.S_IRUSR)
1689
1690         # Fail during cache initialization.
1691         with self.assertRaises(OSError):
1692             block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1693                                                       disk_cache_dir=self.disk_cache_dir)
1694
1695
1696     def test_disk_cache_write_error(self):
1697         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1698                                                   disk_cache_dir=self.disk_cache_dir)
1699
1700         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1701
1702         # Make the cache dir read-only
1703         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1704         os.chmod(os.path.join(self.disk_cache_dir, self.locator[0:3]), stat.S_IRUSR)
1705
1706         # Cache fails
1707         with self.assertRaises(arvados.errors.KeepCacheError):
1708             with tutil.mock_keep_responses(self.data, 200) as mock:
1709                 keep_client.get(self.locator)