18842: Add a couple of error handling tests
[arvados.git] / sdk / python / tests / test_keep_client.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 standard_library.install_aliases()
9 from builtins import str
10 from builtins import range
11 from builtins import object
12 import hashlib
13 import mock
14 import os
15 import errno
16 import pycurl
17 import random
18 import re
19 import shutil
20 import socket
21 import sys
22 import stat
23 import tempfile
24 import time
25 import unittest
26 import urllib.parse
27
28 import parameterized
29
30 import arvados
31 import arvados.retry
32 import arvados.util
33 from . import arvados_testutil as tutil
34 from . import keepstub
35 from . import run_test_server
36
37 from .arvados_testutil import DiskCacheBase
38
39 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
40 class KeepTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
41     disk_cache = False
42     MAIN_SERVER = {}
43     KEEP_SERVER = {}
44     block_cache_test = None
45
46     @classmethod
47     def setUpClass(cls):
48         super(KeepTestCase, cls).setUpClass()
49         run_test_server.authorize_with("admin")
50         cls.api_client = arvados.api('v1')
51         cls.block_cache_test = DiskCacheBase()
52         cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
53                                              proxy='', local_store='',
54                                              block_cache=cls.block_cache_test.make_block_cache(cls.disk_cache))
55
56     @classmethod
57     def tearDownClass(cls):
58         super(KeepTestCase, cls).setUpClass()
59         cls.block_cache_test.tearDown()
60
61     def test_KeepBasicRWTest(self):
62         self.assertEqual(0, self.keep_client.upload_counter.get())
63         foo_locator = self.keep_client.put('foo')
64         self.assertRegex(
65             foo_locator,
66             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
67             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
68
69         # 6 bytes because uploaded 2 copies
70         self.assertEqual(6, self.keep_client.upload_counter.get())
71
72         self.assertEqual(0, self.keep_client.download_counter.get())
73         self.assertTrue(tutil.binary_compare(self.keep_client.get(foo_locator),
74                          b'foo'),
75                          'wrong content from Keep.get(md5("foo"))')
76         self.assertEqual(3, self.keep_client.download_counter.get())
77
78     def test_KeepBinaryRWTest(self):
79         blob_str = b'\xff\xfe\xf7\x00\x01\x02'
80         blob_locator = self.keep_client.put(blob_str)
81         self.assertRegex(
82             blob_locator,
83             '^7fc7c53b45e53926ba52821140fef396\+6',
84             ('wrong locator from Keep.put(<binarydata>):' + blob_locator))
85         self.assertEqual(self.keep_client.get(blob_locator),
86                          blob_str,
87                          'wrong content from Keep.get(md5(<binarydata>))')
88
89     def test_KeepLongBinaryRWTest(self):
90         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
91         for i in range(0, 23):
92             blob_data = blob_data + blob_data
93         blob_locator = self.keep_client.put(blob_data)
94         self.assertRegex(
95             blob_locator,
96             '^84d90fc0d8175dd5dcfab04b999bc956\+67108864',
97             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
98         self.assertEqual(self.keep_client.get(blob_locator),
99                          blob_data,
100                          'wrong content from Keep.get(md5(<binarydata>))')
101
102     @unittest.skip("unreliable test - please fix and close #8752")
103     def test_KeepSingleCopyRWTest(self):
104         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
105         blob_locator = self.keep_client.put(blob_data, copies=1)
106         self.assertRegex(
107             blob_locator,
108             '^c902006bc98a3eb4a3663b65ab4a6fab\+8',
109             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
110         self.assertEqual(self.keep_client.get(blob_locator),
111                          blob_data,
112                          'wrong content from Keep.get(md5(<binarydata>))')
113
114     def test_KeepEmptyCollectionTest(self):
115         blob_locator = self.keep_client.put('', copies=1)
116         self.assertRegex(
117             blob_locator,
118             '^d41d8cd98f00b204e9800998ecf8427e\+0',
119             ('wrong locator from Keep.put(""): ' + blob_locator))
120
121     def test_unicode_must_be_ascii(self):
122         # If unicode type, must only consist of valid ASCII
123         foo_locator = self.keep_client.put(u'foo')
124         self.assertRegex(
125             foo_locator,
126             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
127             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
128
129         if sys.version_info < (3, 0):
130             with self.assertRaises(UnicodeEncodeError):
131                 # Error if it is not ASCII
132                 self.keep_client.put(u'\xe2')
133
134         with self.assertRaises(AttributeError):
135             # Must be bytes or have an encode() method
136             self.keep_client.put({})
137
138     def test_KeepHeadTest(self):
139         locator = self.keep_client.put('test_head')
140         self.assertRegex(
141             locator,
142             '^b9a772c7049325feb7130fff1f8333e9\+9',
143             'wrong md5 hash from Keep.put for "test_head": ' + locator)
144         self.assertEqual(True, self.keep_client.head(locator))
145         self.assertEqual(self.keep_client.get(locator),
146                          b'test_head',
147                          'wrong content from Keep.get for "test_head"')
148
149 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
150 class KeepPermissionTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
151     disk_cache = False
152     MAIN_SERVER = {}
153     KEEP_SERVER = {'blob_signing': True}
154
155     def tearDown(self):
156         DiskCacheBase.tearDown(self)
157
158     def test_KeepBasicRWTest(self):
159         run_test_server.authorize_with('active')
160         keep_client = arvados.KeepClient(block_cache=self.make_block_cache(self.disk_cache))
161         foo_locator = keep_client.put('foo')
162         self.assertRegex(
163             foo_locator,
164             r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
165             'invalid locator from Keep.put("foo"): ' + foo_locator)
166         self.assertEqual(keep_client.get(foo_locator),
167                          b'foo',
168                          'wrong content from Keep.get(md5("foo"))')
169
170         # GET with an unsigned locator => NotFound
171         bar_locator = keep_client.put('bar')
172         unsigned_bar_locator = "37b51d194a7513e45b56f6524f2d51f2+3"
173         self.assertRegex(
174             bar_locator,
175             r'^37b51d194a7513e45b56f6524f2d51f2\+3\+A[a-f0-9]+@[a-f0-9]+$',
176             'invalid locator from Keep.put("bar"): ' + bar_locator)
177         self.assertRaises(arvados.errors.NotFoundError,
178                           keep_client.get,
179                           unsigned_bar_locator)
180
181         # GET from a different user => NotFound
182         run_test_server.authorize_with('spectator')
183         self.assertRaises(arvados.errors.NotFoundError,
184                           arvados.Keep.get,
185                           bar_locator)
186
187         # Unauthenticated GET for a signed locator => NotFound
188         # Unauthenticated GET for an unsigned locator => NotFound
189         keep_client.api_token = ''
190         self.assertRaises(arvados.errors.NotFoundError,
191                           keep_client.get,
192                           bar_locator)
193         self.assertRaises(arvados.errors.NotFoundError,
194                           keep_client.get,
195                           unsigned_bar_locator)
196
197 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
198 class KeepProxyTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
199     disk_cache = False
200     MAIN_SERVER = {}
201     KEEP_SERVER = {}
202     KEEP_PROXY_SERVER = {}
203
204     @classmethod
205     def setUpClass(cls):
206         super(KeepProxyTestCase, cls).setUpClass()
207         run_test_server.authorize_with('active')
208         cls.api_client = arvados.api('v1')
209
210     def tearDown(self):
211         super(KeepProxyTestCase, self).tearDown()
212         DiskCacheBase.tearDown(self)
213
214     def test_KeepProxyTest1(self):
215         # Will use ARVADOS_KEEP_SERVICES environment variable that
216         # is set by setUpClass().
217         keep_client = arvados.KeepClient(api_client=self.api_client,
218                                          local_store='', block_cache=self.make_block_cache(self.disk_cache))
219         baz_locator = keep_client.put('baz')
220         self.assertRegex(
221             baz_locator,
222             '^73feffa4b7f6bb68e44cf984c85f6e88\+3',
223             'wrong md5 hash from Keep.put("baz"): ' + baz_locator)
224         self.assertEqual(keep_client.get(baz_locator),
225                          b'baz',
226                          'wrong content from Keep.get(md5("baz"))')
227         self.assertTrue(keep_client.using_proxy)
228
229     def test_KeepProxyTestMultipleURIs(self):
230         # Test using ARVADOS_KEEP_SERVICES env var overriding any
231         # existing proxy setting and setting multiple proxies
232         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
233         keep_client = arvados.KeepClient(api_client=self.api_client,
234                                          local_store='',
235                                          block_cache=self.make_block_cache(self.disk_cache))
236         uris = [x['_service_root'] for x in keep_client._keep_services]
237         self.assertEqual(uris, ['http://10.0.0.1/',
238                                 'https://foo.example.org:1234/'])
239
240     def test_KeepProxyTestInvalidURI(self):
241         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org'
242         with self.assertRaises(arvados.errors.ArgumentError):
243             keep_client = arvados.KeepClient(api_client=self.api_client,
244                                              local_store='',
245                                              block_cache=self.make_block_cache(self.disk_cache))
246
247 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
248 class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
249     disk_cache = False
250
251     def tearDown(self):
252         DiskCacheBase.tearDown(self)
253
254     def get_service_roots(self, api_client):
255         keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
256         services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
257         return [urllib.parse.urlparse(url) for url in sorted(services)]
258
259     def test_ssl_flag_respected_in_roots(self):
260         for ssl_flag in [False, True]:
261             services = self.get_service_roots(self.mock_keep_services(
262                 service_ssl_flag=ssl_flag))
263             self.assertEqual(
264                 ('https' if ssl_flag else 'http'), services[0].scheme)
265
266     def test_correct_ports_with_ipv6_addresses(self):
267         service = self.get_service_roots(self.mock_keep_services(
268             service_type='proxy', service_host='100::1', service_port=10, count=1))[0]
269         self.assertEqual('100::1', service.hostname)
270         self.assertEqual(10, service.port)
271
272     def test_recognize_proxy_services_in_controller_response(self):
273         keep_client = arvados.KeepClient(api_client=self.mock_keep_services(
274             service_type='proxy', service_host='localhost', service_port=9, count=1),
275                                          block_cache=self.make_block_cache(self.disk_cache))
276         try:
277             # this will fail, but it ensures we get the service
278             # discovery response
279             keep_client.put('baz2')
280         except:
281             pass
282         self.assertTrue(keep_client.using_proxy)
283
284     def test_insecure_disables_tls_verify(self):
285         api_client = self.mock_keep_services(count=1)
286         force_timeout = socket.timeout("timed out")
287
288         api_client.insecure = True
289         with tutil.mock_keep_responses(b'foo', 200) as mock:
290             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
291             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
292             self.assertEqual(
293                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
294                 0)
295             self.assertEqual(
296                 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
297                 0)
298
299         api_client.insecure = False
300         with tutil.mock_keep_responses(b'foo', 200) as mock:
301             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
302             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
303             # getopt()==None here means we didn't change the
304             # default. If we were using real pycurl instead of a mock,
305             # it would return the default value 1.
306             self.assertEqual(
307                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
308                 None)
309             self.assertEqual(
310                 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
311                 None)
312
313     def test_refresh_signature(self):
314         blk_digest = '6f5902ac237024bdd0c176cb93063dc4+11'
315         blk_sig = 'da39a3ee5e6b4b0d3255bfef95601890afd80709@53bed294'
316         local_loc = blk_digest+'+A'+blk_sig
317         remote_loc = blk_digest+'+R'+blk_sig
318         api_client = self.mock_keep_services(count=1)
319         headers = {'X-Keep-Locator':local_loc}
320         with tutil.mock_keep_responses('', 200, **headers):
321             # Check that the translated locator gets returned
322             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
323             self.assertEqual(local_loc, keep_client.refresh_signature(remote_loc))
324             # Check that refresh_signature() uses the correct method and headers
325             keep_client._get_or_head = mock.MagicMock()
326             keep_client.refresh_signature(remote_loc)
327             args, kwargs = keep_client._get_or_head.call_args_list[0]
328             self.assertIn(remote_loc, args)
329             self.assertEqual("HEAD", kwargs['method'])
330             self.assertIn('X-Keep-Signature', kwargs['headers'])
331
332     # test_*_timeout verify that KeepClient instructs pycurl to use
333     # the appropriate connection and read timeouts. They don't care
334     # whether pycurl actually exhibits the expected timeout behavior
335     # -- those tests are in the KeepClientTimeout test class.
336
337     def test_get_timeout(self):
338         api_client = self.mock_keep_services(count=1)
339         force_timeout = socket.timeout("timed out")
340         with tutil.mock_keep_responses(force_timeout, 0) as mock:
341             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
342             with self.assertRaises(arvados.errors.KeepReadError):
343                 keep_client.get('ffffffffffffffffffffffffffffffff')
344             self.assertEqual(
345                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
346                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
347             self.assertEqual(
348                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
349                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
350             self.assertEqual(
351                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
352                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
353
354     def test_put_timeout(self):
355         api_client = self.mock_keep_services(count=1)
356         force_timeout = socket.timeout("timed out")
357         with tutil.mock_keep_responses(force_timeout, 0) as mock:
358             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
359             with self.assertRaises(arvados.errors.KeepWriteError):
360                 keep_client.put(b'foo')
361             self.assertEqual(
362                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
363                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
364             self.assertEqual(
365                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
366                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
367             self.assertEqual(
368                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
369                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
370
371     def test_head_timeout(self):
372         api_client = self.mock_keep_services(count=1)
373         force_timeout = socket.timeout("timed out")
374         with tutil.mock_keep_responses(force_timeout, 0) as mock:
375             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
376             with self.assertRaises(arvados.errors.KeepReadError):
377                 keep_client.head('ffffffffffffffffffffffffffffffff')
378             self.assertEqual(
379                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
380                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
381             self.assertEqual(
382                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
383                 None)
384             self.assertEqual(
385                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
386                 None)
387
388     def test_proxy_get_timeout(self):
389         api_client = self.mock_keep_services(service_type='proxy', count=1)
390         force_timeout = socket.timeout("timed out")
391         with tutil.mock_keep_responses(force_timeout, 0) as mock:
392             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
393             with self.assertRaises(arvados.errors.KeepReadError):
394                 keep_client.get('ffffffffffffffffffffffffffffffff')
395             self.assertEqual(
396                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
397                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
398             self.assertEqual(
399                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
400                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
401             self.assertEqual(
402                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
403                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
404
405     def test_proxy_head_timeout(self):
406         api_client = self.mock_keep_services(service_type='proxy', count=1)
407         force_timeout = socket.timeout("timed out")
408         with tutil.mock_keep_responses(force_timeout, 0) as mock:
409             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
410             with self.assertRaises(arvados.errors.KeepReadError):
411                 keep_client.head('ffffffffffffffffffffffffffffffff')
412             self.assertEqual(
413                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
414                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
415             self.assertEqual(
416                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
417                 None)
418             self.assertEqual(
419                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
420                 None)
421
422     def test_proxy_put_timeout(self):
423         self.disk_cache_dir = None
424         api_client = self.mock_keep_services(service_type='proxy', count=1)
425         force_timeout = socket.timeout("timed out")
426         with tutil.mock_keep_responses(force_timeout, 0) as mock:
427             keep_client = arvados.KeepClient(api_client=api_client)
428             with self.assertRaises(arvados.errors.KeepWriteError):
429                 keep_client.put('foo')
430             self.assertEqual(
431                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
432                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
433             self.assertEqual(
434                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
435                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
436             self.assertEqual(
437                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
438                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
439
440     def check_no_services_error(self, verb, exc_class):
441         api_client = mock.MagicMock(name='api_client')
442         api_client.keep_services().accessible().execute.side_effect = (
443             arvados.errors.ApiError)
444         keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
445         with self.assertRaises(exc_class) as err_check:
446             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
447         self.assertEqual(0, len(err_check.exception.request_errors()))
448
449     def test_get_error_with_no_services(self):
450         self.check_no_services_error('get', arvados.errors.KeepReadError)
451
452     def test_head_error_with_no_services(self):
453         self.check_no_services_error('head', arvados.errors.KeepReadError)
454
455     def test_put_error_with_no_services(self):
456         self.check_no_services_error('put', arvados.errors.KeepWriteError)
457
458     def check_errors_from_last_retry(self, verb, exc_class):
459         api_client = self.mock_keep_services(count=2)
460         req_mock = tutil.mock_keep_responses(
461             "retry error reporting test", 500, 500, 500, 500, 500, 500, 502, 502)
462         with req_mock, tutil.skip_sleep, \
463                 self.assertRaises(exc_class) as err_check:
464             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
465             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
466                                        num_retries=3)
467         self.assertEqual([502, 502], [
468                 getattr(error, 'status_code', None)
469                 for error in err_check.exception.request_errors().values()])
470         self.assertRegex(str(err_check.exception), r'failed to (read|write) .* after 4 attempts')
471
472     def test_get_error_reflects_last_retry(self):
473         self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
474
475     def test_head_error_reflects_last_retry(self):
476         self.check_errors_from_last_retry('head', arvados.errors.KeepReadError)
477
478     def test_put_error_reflects_last_retry(self):
479         self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
480
481     def test_put_error_does_not_include_successful_puts(self):
482         data = 'partial failure test'
483         data_loc = tutil.str_keep_locator(data)
484         api_client = self.mock_keep_services(count=3)
485         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
486                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
487             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
488             keep_client.put(data)
489         self.assertEqual(2, len(exc_check.exception.request_errors()))
490
491     def test_proxy_put_with_no_writable_services(self):
492         data = 'test with no writable services'
493         data_loc = tutil.str_keep_locator(data)
494         api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
495         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
496                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
497           keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
498           keep_client.put(data)
499         self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
500         self.assertEqual(0, len(exc_check.exception.request_errors()))
501
502     def test_oddball_service_get(self):
503         body = b'oddball service get'
504         api_client = self.mock_keep_services(service_type='fancynewblobstore')
505         with tutil.mock_keep_responses(body, 200):
506             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
507             actual = keep_client.get(tutil.str_keep_locator(body))
508         self.assertEqual(body, actual)
509
510     def test_oddball_service_put(self):
511         body = b'oddball service put'
512         pdh = tutil.str_keep_locator(body)
513         api_client = self.mock_keep_services(service_type='fancynewblobstore')
514         with tutil.mock_keep_responses(pdh, 200):
515             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
516             actual = keep_client.put(body, copies=1)
517         self.assertEqual(pdh, actual)
518
519     def test_oddball_service_writer_count(self):
520         body = b'oddball service writer count'
521         pdh = tutil.str_keep_locator(body)
522         api_client = self.mock_keep_services(service_type='fancynewblobstore',
523                                              count=4)
524         headers = {'x-keep-replicas-stored': 3}
525         with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
526                                        **headers) as req_mock:
527             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
528             actual = keep_client.put(body, copies=2)
529         self.assertEqual(pdh, actual)
530         self.assertEqual(1, req_mock.call_count)
531
532 @tutil.skip_sleep
533 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
534 class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
535     disk_cache = False
536
537     def setUp(self):
538         self.api_client = self.mock_keep_services(count=2)
539         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
540         self.data = b'xyzzy'
541         self.locator = '1271ed5ef305aadabc605b1609e24c52'
542
543     def tearDown(self):
544         DiskCacheBase.tearDown(self)
545
546     @mock.patch('arvados.KeepClient.KeepService.get')
547     def test_get_request_cache(self, get_mock):
548         with tutil.mock_keep_responses(self.data, 200, 200):
549             self.keep_client.get(self.locator)
550             self.keep_client.get(self.locator)
551         # Request already cached, don't require more than one request
552         get_mock.assert_called_once()
553
554     @mock.patch('arvados.KeepClient.KeepService.get')
555     def test_head_request_cache(self, get_mock):
556         with tutil.mock_keep_responses(self.data, 200, 200):
557             self.keep_client.head(self.locator)
558             self.keep_client.head(self.locator)
559         # Don't cache HEAD requests so that they're not confused with GET reqs
560         self.assertEqual(2, get_mock.call_count)
561
562     @mock.patch('arvados.KeepClient.KeepService.get')
563     def test_head_and_then_get_return_different_responses(self, get_mock):
564         head_resp = None
565         get_resp = None
566         get_mock.side_effect = [b'first response', b'second response']
567         with tutil.mock_keep_responses(self.data, 200, 200):
568             head_resp = self.keep_client.head(self.locator)
569             get_resp = self.keep_client.get(self.locator)
570         self.assertEqual(b'first response', head_resp)
571         # First reponse was not cached because it was from a HEAD request.
572         self.assertNotEqual(head_resp, get_resp)
573
574
575
576
577 @tutil.skip_sleep
578 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
579 class KeepStorageClassesTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
580     disk_cache = False
581
582     def setUp(self):
583         self.api_client = self.mock_keep_services(count=2)
584         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
585         self.data = b'xyzzy'
586         self.locator = '1271ed5ef305aadabc605b1609e24c52'
587
588     def tearDown(self):
589         DiskCacheBase.tearDown(self)
590
591     def test_multiple_default_storage_classes_req_header(self):
592         api_mock = self.api_client_mock()
593         api_mock.config.return_value = {
594             'StorageClasses': {
595                 'foo': { 'Default': True },
596                 'bar': { 'Default': True },
597                 'baz': { 'Default': False }
598             }
599         }
600         api_client = self.mock_keep_services(api_mock=api_mock, count=2)
601         keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
602         resp_hdr = {
603             'x-keep-storage-classes-confirmed': 'foo=1, bar=1',
604             'x-keep-replicas-stored': 1
605         }
606         with tutil.mock_keep_responses(self.locator, 200, **resp_hdr) as mock:
607             keep_client.put(self.data, copies=1)
608             req_hdr = mock.responses[0]
609             self.assertIn(
610                 'X-Keep-Storage-Classes: bar, foo', req_hdr.getopt(pycurl.HTTPHEADER))
611
612     def test_storage_classes_req_header(self):
613         self.assertEqual(
614             self.api_client.config()['StorageClasses'],
615             {'default': {'Default': True}})
616         cases = [
617             # requested, expected
618             [['foo'], 'X-Keep-Storage-Classes: foo'],
619             [['bar', 'foo'], 'X-Keep-Storage-Classes: bar, foo'],
620             [[], 'X-Keep-Storage-Classes: default'],
621             [None, 'X-Keep-Storage-Classes: default'],
622         ]
623         for req_classes, expected_header in cases:
624             headers = {'x-keep-replicas-stored': 1}
625             if req_classes is None or len(req_classes) == 0:
626                 confirmed_hdr = 'default=1'
627             elif len(req_classes) > 0:
628                 confirmed_hdr = ', '.join(["{}=1".format(cls) for cls in req_classes])
629             headers.update({'x-keep-storage-classes-confirmed': confirmed_hdr})
630             with tutil.mock_keep_responses(self.locator, 200, **headers) as mock:
631                 self.keep_client.put(self.data, copies=1, classes=req_classes)
632                 req_hdr = mock.responses[0]
633                 self.assertIn(expected_header, req_hdr.getopt(pycurl.HTTPHEADER))
634
635     def test_partial_storage_classes_put(self):
636         headers = {
637             'x-keep-replicas-stored': 1,
638             'x-keep-storage-classes-confirmed': 'foo=1'}
639         with tutil.mock_keep_responses(self.locator, 200, 503, **headers) as mock:
640             with self.assertRaises(arvados.errors.KeepWriteError):
641                 self.keep_client.put(self.data, copies=1, classes=['foo', 'bar'])
642             # 1st request, both classes pending
643             req1_headers = mock.responses[0].getopt(pycurl.HTTPHEADER)
644             self.assertIn('X-Keep-Storage-Classes: bar, foo', req1_headers)
645             # 2nd try, 'foo' class already satisfied
646             req2_headers = mock.responses[1].getopt(pycurl.HTTPHEADER)
647             self.assertIn('X-Keep-Storage-Classes: bar', req2_headers)
648
649     def test_successful_storage_classes_put_requests(self):
650         cases = [
651             # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, expected_requests
652             [ 1, ['foo'], 1, 'foo=1', 1],
653             [ 1, ['foo'], 2, 'foo=2', 1],
654             [ 2, ['foo'], 2, 'foo=2', 1],
655             [ 2, ['foo'], 1, 'foo=1', 2],
656             [ 1, ['foo', 'bar'], 1, 'foo=1, bar=1', 1],
657             [ 1, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
658             [ 2, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
659             [ 2, ['foo', 'bar'], 1, 'foo=1, bar=1', 2],
660             [ 1, ['foo', 'bar'], 1, None, 1],
661             [ 1, ['foo'], 1, None, 1],
662             [ 2, ['foo'], 2, None, 1],
663             [ 2, ['foo'], 1, None, 2],
664         ]
665         for w_copies, w_classes, c_copies, c_classes, e_reqs in cases:
666             headers = {'x-keep-replicas-stored': c_copies}
667             if c_classes is not None:
668                 headers.update({'x-keep-storage-classes-confirmed': c_classes})
669             with tutil.mock_keep_responses(self.locator, 200, 200, **headers) as mock:
670                 case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}", expected_requests={}'.format(w_copies, ', '.join(w_classes), c_copies, c_classes, e_reqs)
671                 self.assertEqual(self.locator,
672                     self.keep_client.put(self.data, copies=w_copies, classes=w_classes),
673                     case_desc)
674                 self.assertEqual(e_reqs, mock.call_count, case_desc)
675
676     def test_failed_storage_classes_put_requests(self):
677         cases = [
678             # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, return_code
679             [ 1, ['foo'], 1, 'bar=1', 200],
680             [ 1, ['foo'], 1, None, 503],
681             [ 2, ['foo'], 1, 'bar=1, foo=0', 200],
682             [ 3, ['foo'], 1, 'bar=1, foo=1', 200],
683             [ 3, ['foo', 'bar'], 1, 'bar=2, foo=1', 200],
684         ]
685         for w_copies, w_classes, c_copies, c_classes, return_code in cases:
686             headers = {'x-keep-replicas-stored': c_copies}
687             if c_classes is not None:
688                 headers.update({'x-keep-storage-classes-confirmed': c_classes})
689             with tutil.mock_keep_responses(self.locator, return_code, return_code, **headers):
690                 case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}"'.format(w_copies, ', '.join(w_classes), c_copies, c_classes)
691                 with self.assertRaises(arvados.errors.KeepWriteError, msg=case_desc):
692                     self.keep_client.put(self.data, copies=w_copies, classes=w_classes)
693
694 @tutil.skip_sleep
695 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
696 class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
697     disk_cache = False
698
699     def setUp(self):
700         self.api_client = self.mock_keep_services(count=2)
701         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
702         self.data = b'xyzzy'
703         self.locator = '1271ed5ef305aadabc605b1609e24c52'
704         self.test_id = arvados.util.new_request_id()
705         self.assertRegex(self.test_id, r'^req-[a-z0-9]{20}$')
706         # If we don't set request_id to None explicitly here, it will
707         # return <MagicMock name='api_client_mock.request_id'
708         # id='123456789'>:
709         self.api_client.request_id = None
710
711     def tearDown(self):
712         DiskCacheBase.tearDown(self)
713
714     def test_default_to_api_client_request_id(self):
715         self.api_client.request_id = self.test_id
716         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
717             self.keep_client.put(self.data)
718         self.assertEqual(2, len(mock.responses))
719         for resp in mock.responses:
720             self.assertProvidedRequestId(resp)
721
722         with tutil.mock_keep_responses(self.data, 200) as mock:
723             self.keep_client.get(self.locator)
724         self.assertProvidedRequestId(mock.responses[0])
725
726         with tutil.mock_keep_responses(b'', 200) as mock:
727             self.keep_client.head(self.locator)
728         self.assertProvidedRequestId(mock.responses[0])
729
730     def test_explicit_request_id(self):
731         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
732             self.keep_client.put(self.data, request_id=self.test_id)
733         self.assertEqual(2, len(mock.responses))
734         for resp in mock.responses:
735             self.assertProvidedRequestId(resp)
736
737         with tutil.mock_keep_responses(self.data, 200) as mock:
738             self.keep_client.get(self.locator, request_id=self.test_id)
739         self.assertProvidedRequestId(mock.responses[0])
740
741         with tutil.mock_keep_responses(b'', 200) as mock:
742             self.keep_client.head(self.locator, request_id=self.test_id)
743         self.assertProvidedRequestId(mock.responses[0])
744
745     def test_automatic_request_id(self):
746         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
747             self.keep_client.put(self.data)
748         self.assertEqual(2, len(mock.responses))
749         for resp in mock.responses:
750             self.assertAutomaticRequestId(resp)
751
752         with tutil.mock_keep_responses(self.data, 200) as mock:
753             self.keep_client.get(self.locator)
754         self.assertAutomaticRequestId(mock.responses[0])
755
756         with tutil.mock_keep_responses(b'', 200) as mock:
757             self.keep_client.head(self.locator)
758         self.assertAutomaticRequestId(mock.responses[0])
759
760     def test_request_id_in_exception(self):
761         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
762             with self.assertRaisesRegex(arvados.errors.KeepReadError, self.test_id):
763                 self.keep_client.head(self.locator, request_id=self.test_id)
764
765         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
766             with self.assertRaisesRegex(arvados.errors.KeepReadError, r'req-[a-z0-9]{20}'):
767                 self.keep_client.get(self.locator)
768
769         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
770             with self.assertRaisesRegex(arvados.errors.KeepWriteError, self.test_id):
771                 self.keep_client.put(self.data, request_id=self.test_id)
772
773         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
774             with self.assertRaisesRegex(arvados.errors.KeepWriteError, r'req-[a-z0-9]{20}'):
775                 self.keep_client.put(self.data)
776
777     def assertAutomaticRequestId(self, resp):
778         hdr = [x for x in resp.getopt(pycurl.HTTPHEADER)
779                if x.startswith('X-Request-Id: ')][0]
780         self.assertNotEqual(hdr, 'X-Request-Id: '+self.test_id)
781         self.assertRegex(hdr, r'^X-Request-Id: req-[a-z0-9]{20}$')
782
783     def assertProvidedRequestId(self, resp):
784         self.assertIn('X-Request-Id: '+self.test_id,
785                       resp.getopt(pycurl.HTTPHEADER))
786
787
788 @tutil.skip_sleep
789 #@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
790 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
791     disk_cache = False
792
793     def setUp(self):
794         # expected_order[i] is the probe order for
795         # hash=md5(sprintf("%064x",i)) where there are 16 services
796         # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
797         # the first probe for the block consisting of 64 "0"
798         # characters is the service whose uuid is
799         # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
800         self.services = 16
801         self.expected_order = [
802             list('3eab2d5fc9681074'),
803             list('097dba52e648f1c3'),
804             list('c5b4e023f8a7d691'),
805             list('9d81c02e76a3bf54'),
806             ]
807         self.blocks = [
808             "{:064x}".format(x).encode()
809             for x in range(len(self.expected_order))]
810         self.hashes = [
811             hashlib.md5(self.blocks[x]).hexdigest()
812             for x in range(len(self.expected_order))]
813         self.api_client = self.mock_keep_services(count=self.services)
814         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
815
816     def tearDown(self):
817         DiskCacheBase.tearDown(self)
818
819     def test_weighted_service_roots_against_reference_set(self):
820         # Confirm weighted_service_roots() returns the correct order
821         for i, hash in enumerate(self.hashes):
822             roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash))
823             got_order = [
824                 re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
825                 for root in roots]
826             self.assertEqual(self.expected_order[i], got_order)
827
828     def test_get_probe_order_against_reference_set(self):
829         self._test_probe_order_against_reference_set(
830             lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
831
832     def test_head_probe_order_against_reference_set(self):
833         self._test_probe_order_against_reference_set(
834             lambda i: self.keep_client.head(self.hashes[i], num_retries=1))
835
836     def test_put_probe_order_against_reference_set(self):
837         # copies=1 prevents the test from being sensitive to races
838         # between writer threads.
839         self._test_probe_order_against_reference_set(
840             lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1))
841
842     def _test_probe_order_against_reference_set(self, op):
843         for i in range(len(self.blocks)):
844             with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \
845                  self.assertRaises(arvados.errors.KeepRequestError):
846                 op(i)
847             got_order = [
848                 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
849                 for resp in mock.responses]
850             self.assertEqual(self.expected_order[i]*2, got_order)
851
852     def test_put_probe_order_multiple_copies(self):
853         for copies in range(2, 4):
854             for i in range(len(self.blocks)):
855                 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \
856                      self.assertRaises(arvados.errors.KeepWriteError):
857                     self.keep_client.put(self.blocks[i], num_retries=2, copies=copies)
858                 got_order = [
859                     re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
860                     for resp in mock.responses]
861                 # With T threads racing to make requests, the position
862                 # of a given server in the sequence of HTTP requests
863                 # (got_order) cannot be more than T-1 positions
864                 # earlier than that server's position in the reference
865                 # probe sequence (expected_order).
866                 #
867                 # Loop invariant: we have accounted for +pos+ expected
868                 # probes, either by seeing them in +got_order+ or by
869                 # putting them in +pending+ in the hope of seeing them
870                 # later. As long as +len(pending)<T+, we haven't
871                 # started a request too early.
872                 pending = []
873                 for pos, expected in enumerate(self.expected_order[i]*3):
874                     got = got_order[pos-len(pending)]
875                     while got in pending:
876                         del pending[pending.index(got)]
877                         got = got_order[pos-len(pending)]
878                     if got != expected:
879                         pending.append(expected)
880                         self.assertLess(
881                             len(pending), copies,
882                             "pending={}, with copies={}, got {}, expected {}".format(
883                                 pending, copies, repr(got_order), repr(self.expected_order[i]*3)))
884
885     def test_probe_waste_adding_one_server(self):
886         hashes = [
887             hashlib.md5("{:064x}".format(x).encode()).hexdigest() for x in range(100)]
888         initial_services = 12
889         self.api_client = self.mock_keep_services(count=initial_services)
890         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
891         probes_before = [
892             self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
893         for added_services in range(1, 12):
894             api_client = self.mock_keep_services(count=initial_services+added_services)
895             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
896             total_penalty = 0
897             for hash_index in range(len(hashes)):
898                 probe_after = keep_client.weighted_service_roots(
899                     arvados.KeepLocator(hashes[hash_index]))
900                 penalty = probe_after.index(probes_before[hash_index][0])
901                 self.assertLessEqual(penalty, added_services)
902                 total_penalty += penalty
903             # Average penalty per block should not exceed
904             # N(added)/N(orig) by more than 20%, and should get closer
905             # to the ideal as we add data points.
906             expect_penalty = (
907                 added_services *
908                 len(hashes) / initial_services)
909             max_penalty = (
910                 expect_penalty *
911                 (120 - added_services)/100)
912             min_penalty = (
913                 expect_penalty * 8/10)
914             self.assertTrue(
915                 min_penalty <= total_penalty <= max_penalty,
916                 "With {}+{} services, {} blocks, penalty {} but expected {}..{}".format(
917                     initial_services,
918                     added_services,
919                     len(hashes),
920                     total_penalty,
921                     min_penalty,
922                     max_penalty))
923
924     def check_64_zeros_error_order(self, verb, exc_class):
925         data = b'0' * 64
926         if verb == 'get':
927             data = tutil.str_keep_locator(data)
928         # Arbitrary port number:
929         aport = random.randint(1024,65535)
930         api_client = self.mock_keep_services(service_port=aport, count=self.services)
931         keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
932         with mock.patch('pycurl.Curl') as curl_mock, \
933              self.assertRaises(exc_class) as err_check:
934             curl_mock.return_value = tutil.FakeCurl.make(code=500, body=b'')
935             getattr(keep_client, verb)(data)
936         urls = [urllib.parse.urlparse(url)
937                 for url in err_check.exception.request_errors()]
938         self.assertEqual([('keep0x' + c, aport) for c in '3eab2d5fc9681074'],
939                          [(url.hostname, url.port) for url in urls])
940
941     def test_get_error_shows_probe_order(self):
942         self.check_64_zeros_error_order('get', arvados.errors.KeepReadError)
943
944     def test_put_error_shows_probe_order(self):
945         self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
946
947 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
948 class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase, DiskCacheBase):
949     disk_cache = False
950
951     # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
952     # 1s worth of data and then trigger bandwidth errors before running
953     # out of data.
954     DATA = b'x'*2**11
955     BANDWIDTH_LOW_LIM = 1024
956     TIMEOUT_TIME = 1.0
957
958     def tearDown(self):
959         DiskCacheBase.tearDown(self)
960
961     class assertTakesBetween(unittest.TestCase):
962         def __init__(self, tmin, tmax):
963             self.tmin = tmin
964             self.tmax = tmax
965
966         def __enter__(self):
967             self.t0 = time.time()
968
969         def __exit__(self, *args, **kwargs):
970             # Round times to milliseconds, like CURL. Otherwise, we
971             # fail when CURL reaches a 1s timeout at 0.9998s.
972             delta = round(time.time() - self.t0, 3)
973             self.assertGreaterEqual(delta, self.tmin)
974             self.assertLessEqual(delta, self.tmax)
975
976     class assertTakesGreater(unittest.TestCase):
977         def __init__(self, tmin):
978             self.tmin = tmin
979
980         def __enter__(self):
981             self.t0 = time.time()
982
983         def __exit__(self, *args, **kwargs):
984             delta = round(time.time() - self.t0, 3)
985             self.assertGreaterEqual(delta, self.tmin)
986
987     def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
988         return arvados.KeepClient(
989             api_client=self.api_client,
990             timeout=timeouts, block_cache=self.make_block_cache(self.disk_cache))
991
992     def test_timeout_slow_connect(self):
993         # Can't simulate TCP delays with our own socket. Leave our
994         # stub server running uselessly, and try to connect to an
995         # unroutable IP address instead.
996         self.api_client = self.mock_keep_services(
997             count=1,
998             service_host='240.0.0.0',
999         )
1000         with self.assertTakesBetween(0.1, 0.5):
1001             with self.assertRaises(arvados.errors.KeepWriteError):
1002                 self.keepClient().put(self.DATA, copies=1, num_retries=0)
1003
1004     def test_low_bandwidth_no_delays_success(self):
1005         self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
1006         kc = self.keepClient()
1007         loc = kc.put(self.DATA, copies=1, num_retries=0)
1008         self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
1009
1010     def test_too_low_bandwidth_no_delays_failure(self):
1011         # Check that lessening bandwidth corresponds to failing
1012         kc = self.keepClient()
1013         loc = kc.put(self.DATA, copies=1, num_retries=0)
1014         self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
1015         with self.assertTakesGreater(self.TIMEOUT_TIME):
1016             with self.assertRaises(arvados.errors.KeepReadError):
1017                 kc.get(loc, num_retries=0)
1018         with self.assertTakesGreater(self.TIMEOUT_TIME):
1019             with self.assertRaises(arvados.errors.KeepWriteError):
1020                 kc.put(self.DATA, copies=1, num_retries=0)
1021
1022     def test_low_bandwidth_with_server_response_delay_failure(self):
1023         kc = self.keepClient()
1024         loc = kc.put(self.DATA, copies=1, num_retries=0)
1025         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
1026         # Note the actual delay must be 1s longer than the low speed
1027         # limit interval in order for curl to detect it reliably.
1028         self.server.setdelays(response=self.TIMEOUT_TIME+1)
1029         with self.assertTakesGreater(self.TIMEOUT_TIME):
1030             with self.assertRaises(arvados.errors.KeepReadError):
1031                 kc.get(loc, num_retries=0)
1032         with self.assertTakesGreater(self.TIMEOUT_TIME):
1033             with self.assertRaises(arvados.errors.KeepWriteError):
1034                 kc.put(self.DATA, copies=1, num_retries=0)
1035         with self.assertTakesGreater(self.TIMEOUT_TIME):
1036             kc.head(loc, num_retries=0)
1037
1038     def test_low_bandwidth_with_server_mid_delay_failure(self):
1039         kc = self.keepClient()
1040         loc = kc.put(self.DATA, copies=1, num_retries=0)
1041         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
1042         # Note the actual delay must be 1s longer than the low speed
1043         # limit interval in order for curl to detect it reliably.
1044         self.server.setdelays(mid_write=self.TIMEOUT_TIME+1, mid_read=self.TIMEOUT_TIME+1)
1045         with self.assertTakesGreater(self.TIMEOUT_TIME):
1046             with self.assertRaises(arvados.errors.KeepReadError) as e:
1047                 kc.get(loc, num_retries=0)
1048         with self.assertTakesGreater(self.TIMEOUT_TIME):
1049             with self.assertRaises(arvados.errors.KeepWriteError):
1050                 kc.put(self.DATA, copies=1, num_retries=0)
1051
1052     def test_timeout_slow_request(self):
1053         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
1054         self.server.setdelays(request=.2)
1055         self._test_connect_timeout_under_200ms(loc)
1056         self.server.setdelays(request=2)
1057         self._test_response_timeout_under_2s(loc)
1058
1059     def test_timeout_slow_response(self):
1060         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
1061         self.server.setdelays(response=.2)
1062         self._test_connect_timeout_under_200ms(loc)
1063         self.server.setdelays(response=2)
1064         self._test_response_timeout_under_2s(loc)
1065
1066     def test_timeout_slow_response_body(self):
1067         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
1068         self.server.setdelays(response_body=.2)
1069         self._test_connect_timeout_under_200ms(loc)
1070         self.server.setdelays(response_body=2)
1071         self._test_response_timeout_under_2s(loc)
1072
1073     def _test_connect_timeout_under_200ms(self, loc):
1074         # Allow 100ms to connect, then 1s for response. Everything
1075         # should work, and everything should take at least 200ms to
1076         # return.
1077         kc = self.keepClient(timeouts=(.1, 1))
1078         with self.assertTakesBetween(.2, .3):
1079             kc.put(self.DATA, copies=1, num_retries=0)
1080         with self.assertTakesBetween(.2, .3):
1081             self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
1082
1083     def _test_response_timeout_under_2s(self, loc):
1084         # Allow 10s to connect, then 1s for response. Nothing should
1085         # work, and everything should take at least 1s to return.
1086         kc = self.keepClient(timeouts=(10, 1))
1087         with self.assertTakesBetween(1, 9):
1088             with self.assertRaises(arvados.errors.KeepReadError):
1089                 kc.get(loc, num_retries=0)
1090         with self.assertTakesBetween(1, 9):
1091             with self.assertRaises(arvados.errors.KeepWriteError):
1092                 kc.put(self.DATA, copies=1, num_retries=0)
1093
1094 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1095 class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
1096     disk_cache = False
1097
1098     def tearDown(self):
1099         DiskCacheBase.tearDown(self)
1100
1101     def mock_disks_and_gateways(self, disks=3, gateways=1):
1102         self.gateways = [{
1103                 'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
1104                 'owner_uuid': 'zzzzz-tpzed-000000000000000',
1105                 'service_host': 'gatewayhost{}'.format(i),
1106                 'service_port': 12345,
1107                 'service_ssl_flag': True,
1108                 'service_type': 'gateway:test',
1109         } for i in range(gateways)]
1110         self.gateway_roots = [
1111             "https://{service_host}:{service_port}/".format(**gw)
1112             for gw in self.gateways]
1113         self.api_client = self.mock_keep_services(
1114             count=disks, additional_services=self.gateways)
1115         self.keepClient = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
1116
1117     @mock.patch('pycurl.Curl')
1118     def test_get_with_gateway_hint_first(self, MockCurl):
1119         MockCurl.return_value = tutil.FakeCurl.make(
1120             code=200, body='foo', headers={'Content-Length': 3})
1121         self.mock_disks_and_gateways()
1122         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@' + self.gateways[0]['uuid']
1123         self.assertEqual(b'foo', self.keepClient.get(locator))
1124         self.assertEqual(self.gateway_roots[0]+locator,
1125                          MockCurl.return_value.getopt(pycurl.URL).decode())
1126         self.assertEqual(True, self.keepClient.head(locator))
1127
1128     @mock.patch('pycurl.Curl')
1129     def test_get_with_gateway_hints_in_order(self, MockCurl):
1130         gateways = 4
1131         disks = 3
1132         mocks = [
1133             tutil.FakeCurl.make(code=404, body='')
1134             for _ in range(gateways+disks)
1135         ]
1136         MockCurl.side_effect = tutil.queue_with(mocks)
1137         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1138         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1139                            ['K@'+gw['uuid'] for gw in self.gateways])
1140         with self.assertRaises(arvados.errors.NotFoundError):
1141             self.keepClient.get(locator)
1142         # Gateways are tried first, in the order given.
1143         for i, root in enumerate(self.gateway_roots):
1144             self.assertEqual(root+locator,
1145                              mocks[i].getopt(pycurl.URL).decode())
1146         # Disk services are tried next.
1147         for i in range(gateways, gateways+disks):
1148             self.assertRegex(
1149                 mocks[i].getopt(pycurl.URL).decode(),
1150                 r'keep0x')
1151
1152     @mock.patch('pycurl.Curl')
1153     def test_head_with_gateway_hints_in_order(self, MockCurl):
1154         gateways = 4
1155         disks = 3
1156         mocks = [
1157             tutil.FakeCurl.make(code=404, body=b'')
1158             for _ in range(gateways+disks)
1159         ]
1160         MockCurl.side_effect = tutil.queue_with(mocks)
1161         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1162         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1163                            ['K@'+gw['uuid'] for gw in self.gateways])
1164         with self.assertRaises(arvados.errors.NotFoundError):
1165             self.keepClient.head(locator)
1166         # Gateways are tried first, in the order given.
1167         for i, root in enumerate(self.gateway_roots):
1168             self.assertEqual(root+locator,
1169                              mocks[i].getopt(pycurl.URL).decode())
1170         # Disk services are tried next.
1171         for i in range(gateways, gateways+disks):
1172             self.assertRegex(
1173                 mocks[i].getopt(pycurl.URL).decode(),
1174                 r'keep0x')
1175
1176     @mock.patch('pycurl.Curl')
1177     def test_get_with_remote_proxy_hint(self, MockCurl):
1178         MockCurl.return_value = tutil.FakeCurl.make(
1179             code=200, body=b'foo', headers={'Content-Length': 3})
1180         self.mock_disks_and_gateways()
1181         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1182         self.assertEqual(b'foo', self.keepClient.get(locator))
1183         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1184                          MockCurl.return_value.getopt(pycurl.URL).decode())
1185
1186     @mock.patch('pycurl.Curl')
1187     def test_head_with_remote_proxy_hint(self, MockCurl):
1188         MockCurl.return_value = tutil.FakeCurl.make(
1189             code=200, body=b'foo', headers={'Content-Length': 3})
1190         self.mock_disks_and_gateways()
1191         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1192         self.assertEqual(True, self.keepClient.head(locator))
1193         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1194                          MockCurl.return_value.getopt(pycurl.URL).decode())
1195
1196 class KeepClientRetryTestMixin(object):
1197     disk_cache = False
1198
1199     # Testing with a local Keep store won't exercise the retry behavior.
1200     # Instead, our strategy is:
1201     # * Create a client with one proxy specified (pointed at a black
1202     #   hole), so there's no need to instantiate an API client, and
1203     #   all HTTP requests come from one place.
1204     # * Mock httplib's request method to provide simulated responses.
1205     # This lets us test the retry logic extensively without relying on any
1206     # supporting servers, and prevents side effects in case something hiccups.
1207     # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
1208     # run_method().
1209     #
1210     # Test classes must define TEST_PATCHER to a method that mocks
1211     # out appropriate methods in the client.
1212
1213     PROXY_ADDR = 'http://[%s]:65535/' % (tutil.TEST_HOST,)
1214     TEST_DATA = b'testdata'
1215     TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
1216
1217     def setUp(self):
1218         self.client_kwargs = {'proxy': self.PROXY_ADDR, 'local_store': ''}
1219
1220     def new_client(self, **caller_kwargs):
1221         kwargs = self.client_kwargs.copy()
1222         kwargs.update(caller_kwargs)
1223         kwargs['block_cache'] = self.make_block_cache(self.disk_cache)
1224         return arvados.KeepClient(**kwargs)
1225
1226     def run_method(self, *args, **kwargs):
1227         raise NotImplementedError("test subclasses must define run_method")
1228
1229     def check_success(self, expected=None, *args, **kwargs):
1230         if expected is None:
1231             expected = self.DEFAULT_EXPECT
1232         self.assertEqual(expected, self.run_method(*args, **kwargs))
1233
1234     def check_exception(self, error_class=None, *args, **kwargs):
1235         if error_class is None:
1236             error_class = self.DEFAULT_EXCEPTION
1237         with self.assertRaises(error_class) as err:
1238             self.run_method(*args, **kwargs)
1239         return err
1240
1241     def test_immediate_success(self):
1242         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 200):
1243             self.check_success()
1244
1245     def test_retry_then_success(self):
1246         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1247             self.check_success(num_retries=3)
1248
1249     def test_exception_then_success(self):
1250         with self.TEST_PATCHER(self.DEFAULT_EXPECT, Exception('mock err'), 200):
1251             self.check_success(num_retries=3)
1252
1253     def test_no_default_retry(self):
1254         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1255             self.check_exception()
1256
1257     def test_no_retry_after_permanent_error(self):
1258         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 403, 200):
1259             self.check_exception(num_retries=3)
1260
1261     def test_error_after_retries_exhausted(self):
1262         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 500, 200):
1263             err = self.check_exception(num_retries=1)
1264         self.assertRegex(str(err.exception), r'failed to .* after 2 attempts')
1265
1266     def test_num_retries_instance_fallback(self):
1267         self.client_kwargs['num_retries'] = 3
1268         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1269             self.check_success()
1270
1271
1272 @tutil.skip_sleep
1273 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1274 class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1275     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
1276     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1277     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1278     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1279
1280     def tearDown(self):
1281         DiskCacheBase.tearDown(self)
1282
1283     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1284                    *args, **kwargs):
1285         return self.new_client().get(locator, *args, **kwargs)
1286
1287     def test_specific_exception_when_not_found(self):
1288         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1289             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1290
1291     def test_general_exception_with_mixed_errors(self):
1292         # get should raise a NotFoundError if no server returns the block,
1293         # and a high threshold of servers report that it's not found.
1294         # This test rigs up 50/50 disagreement between two servers, and
1295         # checks that it does not become a NotFoundError.
1296         client = self.new_client()
1297         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1298             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1299                 client.get(self.HINTED_LOCATOR)
1300             self.assertNotIsInstance(
1301                 exc_check.exception, arvados.errors.NotFoundError,
1302                 "mixed errors raised NotFoundError")
1303
1304     def test_hint_server_can_succeed_without_retries(self):
1305         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1306             self.check_success(locator=self.HINTED_LOCATOR)
1307
1308     def test_try_next_server_after_timeout(self):
1309         with tutil.mock_keep_responses(
1310                 (socket.timeout("timed out"), 200),
1311                 (self.DEFAULT_EXPECT, 200)):
1312             self.check_success(locator=self.HINTED_LOCATOR)
1313
1314     def test_retry_data_with_wrong_checksum(self):
1315         with tutil.mock_keep_responses(
1316                 ('baddata', 200),
1317                 (self.DEFAULT_EXPECT, 200)):
1318             self.check_success(locator=self.HINTED_LOCATOR)
1319
1320 @tutil.skip_sleep
1321 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1322 class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1323     DEFAULT_EXPECT = True
1324     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1325     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1326     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1327
1328     def tearDown(self):
1329         DiskCacheBase.tearDown(self)
1330
1331     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1332                    *args, **kwargs):
1333         return self.new_client().head(locator, *args, **kwargs)
1334
1335     def test_specific_exception_when_not_found(self):
1336         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1337             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1338
1339     def test_general_exception_with_mixed_errors(self):
1340         # head should raise a NotFoundError if no server returns the block,
1341         # and a high threshold of servers report that it's not found.
1342         # This test rigs up 50/50 disagreement between two servers, and
1343         # checks that it does not become a NotFoundError.
1344         client = self.new_client()
1345         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1346             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1347                 client.head(self.HINTED_LOCATOR)
1348             self.assertNotIsInstance(
1349                 exc_check.exception, arvados.errors.NotFoundError,
1350                 "mixed errors raised NotFoundError")
1351
1352     def test_hint_server_can_succeed_without_retries(self):
1353         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1354             self.check_success(locator=self.HINTED_LOCATOR)
1355
1356     def test_try_next_server_after_timeout(self):
1357         with tutil.mock_keep_responses(
1358                 (socket.timeout("timed out"), 200),
1359                 (self.DEFAULT_EXPECT, 200)):
1360             self.check_success(locator=self.HINTED_LOCATOR)
1361
1362 @tutil.skip_sleep
1363 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1364 class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1365     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
1366     DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
1367     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1368
1369     def tearDown(self):
1370         DiskCacheBase.tearDown(self)
1371
1372     def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
1373                    copies=1, *args, **kwargs):
1374         return self.new_client().put(data, copies, *args, **kwargs)
1375
1376     def test_do_not_send_multiple_copies_to_same_server(self):
1377         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200):
1378             self.check_exception(copies=2, num_retries=3)
1379
1380
1381 class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
1382
1383     class FakeKeepService(object):
1384         def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
1385             self.delay = delay
1386             self.will_succeed = will_succeed
1387             self.will_raise = will_raise
1388             self._result = {}
1389             self._result['headers'] = {}
1390             self._result['headers']['x-keep-replicas-stored'] = str(replicas)
1391             self._result['headers']['x-keep-storage-classes-confirmed'] = 'default={}'.format(replicas)
1392             self._result['body'] = 'foobar'
1393
1394         def put(self, data_hash, data, timeout, headers):
1395             time.sleep(self.delay)
1396             if self.will_raise is not None:
1397                 raise self.will_raise
1398             return self.will_succeed
1399
1400         def last_result(self):
1401             if self.will_succeed:
1402                 return self._result
1403             else:
1404                 return {"status_code": 500, "body": "didn't succeed"}
1405
1406         def finished(self):
1407             return False
1408
1409     def setUp(self):
1410         self.copies = 3
1411         self.pool = arvados.KeepClient.KeepWriterThreadPool(
1412             data = 'foo',
1413             data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1414             max_service_replicas = self.copies,
1415             copies = self.copies
1416         )
1417
1418     def test_only_write_enough_on_success(self):
1419         for i in range(10):
1420             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1421             self.pool.add_task(ks, None)
1422         self.pool.join()
1423         self.assertEqual(self.pool.done(), (self.copies, []))
1424
1425     def test_only_write_enough_on_partial_success(self):
1426         for i in range(5):
1427             ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
1428             self.pool.add_task(ks, None)
1429             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1430             self.pool.add_task(ks, None)
1431         self.pool.join()
1432         self.assertEqual(self.pool.done(), (self.copies, []))
1433
1434     def test_only_write_enough_when_some_crash(self):
1435         for i in range(5):
1436             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1437             self.pool.add_task(ks, None)
1438             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1439             self.pool.add_task(ks, None)
1440         self.pool.join()
1441         self.assertEqual(self.pool.done(), (self.copies, []))
1442
1443     def test_fail_when_too_many_crash(self):
1444         for i in range(self.copies+1):
1445             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1446             self.pool.add_task(ks, None)
1447         for i in range(self.copies-1):
1448             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1449             self.pool.add_task(ks, None)
1450         self.pool.join()
1451         self.assertEqual(self.pool.done(), (self.copies-1, []))
1452
1453
1454 @tutil.skip_sleep
1455 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1456 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
1457     block_cache = False
1458
1459     # Test put()s that need two distinct servers to succeed, possibly
1460     # requiring multiple passes through the retry loop.
1461
1462     def setUp(self):
1463         self.api_client = self.mock_keep_services(count=2)
1464         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
1465
1466     def tearDown(self):
1467         DiskCacheBase.tearDown(self)
1468
1469     def test_success_after_exception(self):
1470         with tutil.mock_keep_responses(
1471                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1472                 Exception('mock err'), 200, 200) as req_mock:
1473             self.keep_client.put('foo', num_retries=1, copies=2)
1474         self.assertEqual(3, req_mock.call_count)
1475
1476     def test_success_after_retryable_error(self):
1477         with tutil.mock_keep_responses(
1478                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1479                 500, 200, 200) as req_mock:
1480             self.keep_client.put('foo', num_retries=1, copies=2)
1481         self.assertEqual(3, req_mock.call_count)
1482
1483     def test_fail_after_final_error(self):
1484         # First retry loop gets a 200 (can't achieve replication by
1485         # storing again on that server) and a 400 (can't retry that
1486         # server at all), so we shouldn't try a third request.
1487         with tutil.mock_keep_responses(
1488                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1489                 200, 400, 200) as req_mock:
1490             with self.assertRaises(arvados.errors.KeepWriteError):
1491                 self.keep_client.put('foo', num_retries=1, copies=2)
1492         self.assertEqual(2, req_mock.call_count)
1493
1494 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1495 class KeepClientAPIErrorTest(unittest.TestCase, DiskCacheBase):
1496     disk_cache = False
1497
1498     def tearDown(self):
1499         DiskCacheBase.tearDown(self)
1500
1501     def test_api_fail(self):
1502         class ApiMock(object):
1503             def __getattr__(self, r):
1504                 if r == "api_token":
1505                     return "abc"
1506                 elif r == "insecure":
1507                     return False
1508                 elif r == "config":
1509                     return lambda: {}
1510                 else:
1511                     raise arvados.errors.KeepReadError()
1512         keep_client = arvados.KeepClient(api_client=ApiMock(),
1513                                          proxy='', local_store='',
1514                                          block_cache=self.make_block_cache(self.disk_cache))
1515
1516         # The bug this is testing for is that if an API (not
1517         # keepstore) exception is thrown as part of a get(), the next
1518         # attempt to get that same block will result in a deadlock.
1519         # This is why there are two get()s in a row.  Unfortunately,
1520         # the failure mode for this test is that the test suite
1521         # deadlocks, there isn't a good way to avoid that without
1522         # adding a special case that has no use except for this test.
1523
1524         with self.assertRaises(arvados.errors.KeepReadError):
1525             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1526         with self.assertRaises(arvados.errors.KeepReadError):
1527             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1528
1529
1530 class KeepDiskCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
1531     def setUp(self):
1532         self.api_client = self.mock_keep_services(count=2)
1533         self.data = b'xyzzy'
1534         self.locator = '1271ed5ef305aadabc605b1609e24c52'
1535         self.disk_cache_dir = tempfile.mkdtemp()
1536
1537     def tearDown(self):
1538         shutil.rmtree(self.disk_cache_dir)
1539
1540
1541     @mock.patch('arvados.KeepClient.KeepService.get')
1542     def test_disk_cache_read(self, get_mock):
1543         # confirm it finds an existing cache block when the cache is
1544         # initialized.
1545
1546         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1547         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1548             f.write(self.data)
1549
1550         # block cache should have found the existing block
1551         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1552                                                   disk_cache_dir=self.disk_cache_dir)
1553         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1554
1555         self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1556
1557         get_mock.assert_not_called()
1558
1559
1560     @mock.patch('arvados.KeepClient.KeepService.get')
1561     def test_disk_cache_share(self, get_mock):
1562         # confirm it finds a cache block written after the disk cache
1563         # was initialized.
1564
1565         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1566                                                   disk_cache_dir=self.disk_cache_dir)
1567         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1568
1569         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1570         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1571             f.write(self.data)
1572
1573         # when we try to get the block, it'll check the disk and find it.
1574         self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1575
1576         get_mock.assert_not_called()
1577
1578
1579     def test_disk_cache_write(self):
1580         # confirm the cache block was created
1581
1582         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1583                                                   disk_cache_dir=self.disk_cache_dir)
1584         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1585
1586         with tutil.mock_keep_responses(self.data, 200) as mock:
1587             self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1588
1589         self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1590
1591         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1592             self.assertTrue(tutil.binary_compare(f.read(), self.data))
1593
1594
1595     def test_disk_cache_clean(self):
1596         # confirm that a tmp file in the cache is cleaned up
1597
1598         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1599         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock"), "wb") as f:
1600             f.write(b"abc1")
1601
1602         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC"), "wb") as f:
1603             f.write(b"abc2")
1604
1605         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC"), "wb") as f:
1606             f.write(b"abc3")
1607
1608         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1609         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1610         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1611
1612         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1613                                                   disk_cache_dir=self.disk_cache_dir)
1614
1615         # The tmp still hasn't been deleted because it was created in the last 60 seconds
1616         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1617         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1618         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1619
1620         # Set the mtime to 61s in the past
1621         os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock"), times=(time.time()-61, time.time()-61))
1622         os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC"), times=(time.time()-61, time.time()-61))
1623         os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC"), times=(time.time()-61, time.time()-61))
1624
1625         block_cache2 = arvados.keep.KeepBlockCache(disk_cache=True,
1626                                                    disk_cache_dir=self.disk_cache_dir)
1627
1628         # Tmp should be gone but the other ones are safe.
1629         self.assertFalse(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1630         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1631         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1632
1633
1634     @mock.patch('arvados.KeepClient.KeepService.get')
1635     def test_disk_cache_cap(self, get_mock):
1636         # confirm that the cache is kept to the desired limit
1637
1638         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1639         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1640             f.write(self.data)
1641
1642         os.makedirs(os.path.join(self.disk_cache_dir, "acb"))
1643         with open(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock"), "wb") as f:
1644             f.write(b"foo")
1645
1646         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1647         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1648
1649         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1650                                                    disk_cache_dir=self.disk_cache_dir,
1651                                                    max_slots=1)
1652
1653         self.assertFalse(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1654         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1655
1656
1657     @mock.patch('arvados.KeepClient.KeepService.get')
1658     def test_disk_cache_share(self, get_mock):
1659         # confirm that a second cache doesn't delete files that belong to the first cache.
1660
1661         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1662         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1663             f.write(self.data)
1664
1665         os.makedirs(os.path.join(self.disk_cache_dir, "acb"))
1666         with open(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock"), "wb") as f:
1667             f.write(b"foo")
1668
1669         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1670         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1671
1672         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1673                                                    disk_cache_dir=self.disk_cache_dir,
1674                                                    max_slots=2)
1675
1676         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1677         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1678
1679         block_cache2 = arvados.keep.KeepBlockCache(disk_cache=True,
1680                                                    disk_cache_dir=self.disk_cache_dir,
1681                                                    max_slots=1)
1682
1683         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1684         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1685
1686
1687
1688     def test_disk_cache_error(self):
1689         os.chmod(self.disk_cache_dir, stat.S_IRUSR)
1690
1691         # Fail during cache initialization.
1692         with self.assertRaises(OSError):
1693             block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1694                                                       disk_cache_dir=self.disk_cache_dir)
1695
1696
1697     def test_disk_cache_write_error(self):
1698         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1699                                                   disk_cache_dir=self.disk_cache_dir)
1700
1701         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1702
1703         # Make the cache dir read-only
1704         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1705         os.chmod(os.path.join(self.disk_cache_dir, self.locator[0:3]), stat.S_IRUSR)
1706
1707         # Cache fails
1708         with self.assertRaises(arvados.errors.KeepCacheError):
1709             with tutil.mock_keep_responses(self.data, 200) as mock:
1710                 keep_client.get(self.locator)
1711
1712
1713     @mock.patch('mmap.mmap')
1714     def test_disk_cache_retry_write_error(self, mockmmap):
1715         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1716                                                   disk_cache_dir=self.disk_cache_dir)
1717
1718         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1719
1720         mockmmap.side_effect = (OSError(errno.ENOSPC, "no space"), self.data)
1721
1722         cache_max_before = block_cache.cache_max
1723
1724         with tutil.mock_keep_responses(self.data, 200) as mock:
1725             self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1726
1727         self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1728
1729         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1730             self.assertTrue(tutil.binary_compare(f.read(), self.data))
1731
1732         # shrank the cache in response to ENOSPC
1733         self.assertTrue(cache_max_before > block_cache.cache_max)
1734
1735
1736     @mock.patch('mmap.mmap')
1737     def test_disk_cache_retry_write_error2(self, mockmmap):
1738         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1739                                                   disk_cache_dir=self.disk_cache_dir)
1740
1741         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1742
1743         mockmmap.side_effect = (OSError(errno.ENOMEM, "no memory"), self.data)
1744
1745         slots_before = block_cache._max_slots
1746
1747         with tutil.mock_keep_responses(self.data, 200) as mock:
1748             self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1749
1750         self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1751
1752         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1753             self.assertTrue(tutil.binary_compare(f.read(), self.data))
1754
1755         # shrank the cache in response to ENOMEM
1756         self.assertTrue(slots_before > block_cache._max_slots)