18842: Added tests for specific disk cache behavior
[arvados.git] / sdk / python / tests / test_keep_client.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 standard_library.install_aliases()
9 from builtins import str
10 from builtins import range
11 from builtins import object
12 import hashlib
13 import mock
14 import os
15 import pycurl
16 import random
17 import re
18 import shutil
19 import socket
20 import sys
21 import tempfile
22 import time
23 import unittest
24 import urllib.parse
25
26 import parameterized
27
28 import arvados
29 import arvados.retry
30 import arvados.util
31 from . import arvados_testutil as tutil
32 from . import keepstub
33 from . import run_test_server
34
35 from .arvados_testutil import DiskCacheBase
36
37 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
38 class KeepTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
39     disk_cache = False
40     MAIN_SERVER = {}
41     KEEP_SERVER = {}
42     block_cache_test = None
43
44     @classmethod
45     def setUpClass(cls):
46         super(KeepTestCase, cls).setUpClass()
47         run_test_server.authorize_with("admin")
48         cls.api_client = arvados.api('v1')
49         cls.block_cache_test = DiskCacheBase()
50         cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
51                                              proxy='', local_store='',
52                                              block_cache=cls.block_cache_test.make_block_cache(cls.disk_cache))
53
54     @classmethod
55     def tearDownClass(cls):
56         super(KeepTestCase, cls).setUpClass()
57         cls.block_cache_test.tearDown()
58
59     def test_KeepBasicRWTest(self):
60         self.assertEqual(0, self.keep_client.upload_counter.get())
61         foo_locator = self.keep_client.put('foo')
62         self.assertRegex(
63             foo_locator,
64             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
65             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
66
67         # 6 bytes because uploaded 2 copies
68         self.assertEqual(6, self.keep_client.upload_counter.get())
69
70         self.assertEqual(0, self.keep_client.download_counter.get())
71         self.assertTrue(tutil.binary_compare(self.keep_client.get(foo_locator),
72                          b'foo'),
73                          'wrong content from Keep.get(md5("foo"))')
74         self.assertEqual(3, self.keep_client.download_counter.get())
75
76     def test_KeepBinaryRWTest(self):
77         blob_str = b'\xff\xfe\xf7\x00\x01\x02'
78         blob_locator = self.keep_client.put(blob_str)
79         self.assertRegex(
80             blob_locator,
81             '^7fc7c53b45e53926ba52821140fef396\+6',
82             ('wrong locator from Keep.put(<binarydata>):' + blob_locator))
83         self.assertEqual(self.keep_client.get(blob_locator),
84                          blob_str,
85                          'wrong content from Keep.get(md5(<binarydata>))')
86
87     def test_KeepLongBinaryRWTest(self):
88         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
89         for i in range(0, 23):
90             blob_data = blob_data + blob_data
91         blob_locator = self.keep_client.put(blob_data)
92         self.assertRegex(
93             blob_locator,
94             '^84d90fc0d8175dd5dcfab04b999bc956\+67108864',
95             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
96         self.assertEqual(self.keep_client.get(blob_locator),
97                          blob_data,
98                          'wrong content from Keep.get(md5(<binarydata>))')
99
100     @unittest.skip("unreliable test - please fix and close #8752")
101     def test_KeepSingleCopyRWTest(self):
102         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
103         blob_locator = self.keep_client.put(blob_data, copies=1)
104         self.assertRegex(
105             blob_locator,
106             '^c902006bc98a3eb4a3663b65ab4a6fab\+8',
107             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
108         self.assertEqual(self.keep_client.get(blob_locator),
109                          blob_data,
110                          'wrong content from Keep.get(md5(<binarydata>))')
111
112     def test_KeepEmptyCollectionTest(self):
113         blob_locator = self.keep_client.put('', copies=1)
114         self.assertRegex(
115             blob_locator,
116             '^d41d8cd98f00b204e9800998ecf8427e\+0',
117             ('wrong locator from Keep.put(""): ' + blob_locator))
118
119     def test_unicode_must_be_ascii(self):
120         # If unicode type, must only consist of valid ASCII
121         foo_locator = self.keep_client.put(u'foo')
122         self.assertRegex(
123             foo_locator,
124             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
125             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
126
127         if sys.version_info < (3, 0):
128             with self.assertRaises(UnicodeEncodeError):
129                 # Error if it is not ASCII
130                 self.keep_client.put(u'\xe2')
131
132         with self.assertRaises(AttributeError):
133             # Must be bytes or have an encode() method
134             self.keep_client.put({})
135
136     def test_KeepHeadTest(self):
137         locator = self.keep_client.put('test_head')
138         self.assertRegex(
139             locator,
140             '^b9a772c7049325feb7130fff1f8333e9\+9',
141             'wrong md5 hash from Keep.put for "test_head": ' + locator)
142         self.assertEqual(True, self.keep_client.head(locator))
143         self.assertEqual(self.keep_client.get(locator),
144                          b'test_head',
145                          'wrong content from Keep.get for "test_head"')
146
147 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
148 class KeepPermissionTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
149     disk_cache = False
150     MAIN_SERVER = {}
151     KEEP_SERVER = {'blob_signing': True}
152
153     def tearDown(self):
154         DiskCacheBase.tearDown(self)
155
156     def test_KeepBasicRWTest(self):
157         run_test_server.authorize_with('active')
158         keep_client = arvados.KeepClient(block_cache=self.make_block_cache(self.disk_cache))
159         foo_locator = keep_client.put('foo')
160         self.assertRegex(
161             foo_locator,
162             r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
163             'invalid locator from Keep.put("foo"): ' + foo_locator)
164         self.assertEqual(keep_client.get(foo_locator),
165                          b'foo',
166                          'wrong content from Keep.get(md5("foo"))')
167
168         # GET with an unsigned locator => NotFound
169         bar_locator = keep_client.put('bar')
170         unsigned_bar_locator = "37b51d194a7513e45b56f6524f2d51f2+3"
171         self.assertRegex(
172             bar_locator,
173             r'^37b51d194a7513e45b56f6524f2d51f2\+3\+A[a-f0-9]+@[a-f0-9]+$',
174             'invalid locator from Keep.put("bar"): ' + bar_locator)
175         self.assertRaises(arvados.errors.NotFoundError,
176                           keep_client.get,
177                           unsigned_bar_locator)
178
179         # GET from a different user => NotFound
180         run_test_server.authorize_with('spectator')
181         self.assertRaises(arvados.errors.NotFoundError,
182                           arvados.Keep.get,
183                           bar_locator)
184
185         # Unauthenticated GET for a signed locator => NotFound
186         # Unauthenticated GET for an unsigned locator => NotFound
187         keep_client.api_token = ''
188         self.assertRaises(arvados.errors.NotFoundError,
189                           keep_client.get,
190                           bar_locator)
191         self.assertRaises(arvados.errors.NotFoundError,
192                           keep_client.get,
193                           unsigned_bar_locator)
194
195 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
196 class KeepProxyTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
197     disk_cache = False
198     MAIN_SERVER = {}
199     KEEP_SERVER = {}
200     KEEP_PROXY_SERVER = {}
201
202     @classmethod
203     def setUpClass(cls):
204         super(KeepProxyTestCase, cls).setUpClass()
205         run_test_server.authorize_with('active')
206         cls.api_client = arvados.api('v1')
207
208     def tearDown(self):
209         super(KeepProxyTestCase, self).tearDown()
210         DiskCacheBase.tearDown(self)
211
212     def test_KeepProxyTest1(self):
213         # Will use ARVADOS_KEEP_SERVICES environment variable that
214         # is set by setUpClass().
215         keep_client = arvados.KeepClient(api_client=self.api_client,
216                                          local_store='', block_cache=self.make_block_cache(self.disk_cache))
217         baz_locator = keep_client.put('baz')
218         self.assertRegex(
219             baz_locator,
220             '^73feffa4b7f6bb68e44cf984c85f6e88\+3',
221             'wrong md5 hash from Keep.put("baz"): ' + baz_locator)
222         self.assertEqual(keep_client.get(baz_locator),
223                          b'baz',
224                          'wrong content from Keep.get(md5("baz"))')
225         self.assertTrue(keep_client.using_proxy)
226
227     def test_KeepProxyTestMultipleURIs(self):
228         # Test using ARVADOS_KEEP_SERVICES env var overriding any
229         # existing proxy setting and setting multiple proxies
230         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
231         keep_client = arvados.KeepClient(api_client=self.api_client,
232                                          local_store='',
233                                          block_cache=self.make_block_cache(self.disk_cache))
234         uris = [x['_service_root'] for x in keep_client._keep_services]
235         self.assertEqual(uris, ['http://10.0.0.1/',
236                                 'https://foo.example.org:1234/'])
237
238     def test_KeepProxyTestInvalidURI(self):
239         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org'
240         with self.assertRaises(arvados.errors.ArgumentError):
241             keep_client = arvados.KeepClient(api_client=self.api_client,
242                                              local_store='',
243                                              block_cache=self.make_block_cache(self.disk_cache))
244
245 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
246 class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
247     disk_cache = False
248
249     def tearDown(self):
250         DiskCacheBase.tearDown(self)
251
252     def get_service_roots(self, api_client):
253         keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
254         services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
255         return [urllib.parse.urlparse(url) for url in sorted(services)]
256
257     def test_ssl_flag_respected_in_roots(self):
258         for ssl_flag in [False, True]:
259             services = self.get_service_roots(self.mock_keep_services(
260                 service_ssl_flag=ssl_flag))
261             self.assertEqual(
262                 ('https' if ssl_flag else 'http'), services[0].scheme)
263
264     def test_correct_ports_with_ipv6_addresses(self):
265         service = self.get_service_roots(self.mock_keep_services(
266             service_type='proxy', service_host='100::1', service_port=10, count=1))[0]
267         self.assertEqual('100::1', service.hostname)
268         self.assertEqual(10, service.port)
269
270     def test_recognize_proxy_services_in_controller_response(self):
271         keep_client = arvados.KeepClient(api_client=self.mock_keep_services(
272             service_type='proxy', service_host='localhost', service_port=9, count=1),
273                                          block_cache=self.make_block_cache(self.disk_cache))
274         try:
275             # this will fail, but it ensures we get the service
276             # discovery response
277             keep_client.put('baz2')
278         except:
279             pass
280         self.assertTrue(keep_client.using_proxy)
281
282     def test_insecure_disables_tls_verify(self):
283         api_client = self.mock_keep_services(count=1)
284         force_timeout = socket.timeout("timed out")
285
286         api_client.insecure = True
287         with tutil.mock_keep_responses(b'foo', 200) as mock:
288             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
289             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
290             self.assertEqual(
291                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
292                 0)
293             self.assertEqual(
294                 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
295                 0)
296
297         api_client.insecure = False
298         with tutil.mock_keep_responses(b'foo', 200) as mock:
299             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
300             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
301             # getopt()==None here means we didn't change the
302             # default. If we were using real pycurl instead of a mock,
303             # it would return the default value 1.
304             self.assertEqual(
305                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
306                 None)
307             self.assertEqual(
308                 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
309                 None)
310
311     def test_refresh_signature(self):
312         blk_digest = '6f5902ac237024bdd0c176cb93063dc4+11'
313         blk_sig = 'da39a3ee5e6b4b0d3255bfef95601890afd80709@53bed294'
314         local_loc = blk_digest+'+A'+blk_sig
315         remote_loc = blk_digest+'+R'+blk_sig
316         api_client = self.mock_keep_services(count=1)
317         headers = {'X-Keep-Locator':local_loc}
318         with tutil.mock_keep_responses('', 200, **headers):
319             # Check that the translated locator gets returned
320             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
321             self.assertEqual(local_loc, keep_client.refresh_signature(remote_loc))
322             # Check that refresh_signature() uses the correct method and headers
323             keep_client._get_or_head = mock.MagicMock()
324             keep_client.refresh_signature(remote_loc)
325             args, kwargs = keep_client._get_or_head.call_args_list[0]
326             self.assertIn(remote_loc, args)
327             self.assertEqual("HEAD", kwargs['method'])
328             self.assertIn('X-Keep-Signature', kwargs['headers'])
329
330     # test_*_timeout verify that KeepClient instructs pycurl to use
331     # the appropriate connection and read timeouts. They don't care
332     # whether pycurl actually exhibits the expected timeout behavior
333     # -- those tests are in the KeepClientTimeout test class.
334
335     def test_get_timeout(self):
336         api_client = self.mock_keep_services(count=1)
337         force_timeout = socket.timeout("timed out")
338         with tutil.mock_keep_responses(force_timeout, 0) as mock:
339             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
340             with self.assertRaises(arvados.errors.KeepReadError):
341                 keep_client.get('ffffffffffffffffffffffffffffffff')
342             self.assertEqual(
343                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
344                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
345             self.assertEqual(
346                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
347                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
348             self.assertEqual(
349                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
350                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
351
352     def test_put_timeout(self):
353         api_client = self.mock_keep_services(count=1)
354         force_timeout = socket.timeout("timed out")
355         with tutil.mock_keep_responses(force_timeout, 0) as mock:
356             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
357             with self.assertRaises(arvados.errors.KeepWriteError):
358                 keep_client.put(b'foo')
359             self.assertEqual(
360                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
361                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
362             self.assertEqual(
363                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
364                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
365             self.assertEqual(
366                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
367                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
368
369     def test_head_timeout(self):
370         api_client = self.mock_keep_services(count=1)
371         force_timeout = socket.timeout("timed out")
372         with tutil.mock_keep_responses(force_timeout, 0) as mock:
373             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
374             with self.assertRaises(arvados.errors.KeepReadError):
375                 keep_client.head('ffffffffffffffffffffffffffffffff')
376             self.assertEqual(
377                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
378                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
379             self.assertEqual(
380                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
381                 None)
382             self.assertEqual(
383                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
384                 None)
385
386     def test_proxy_get_timeout(self):
387         api_client = self.mock_keep_services(service_type='proxy', count=1)
388         force_timeout = socket.timeout("timed out")
389         with tutil.mock_keep_responses(force_timeout, 0) as mock:
390             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
391             with self.assertRaises(arvados.errors.KeepReadError):
392                 keep_client.get('ffffffffffffffffffffffffffffffff')
393             self.assertEqual(
394                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
395                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
396             self.assertEqual(
397                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
398                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
399             self.assertEqual(
400                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
401                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
402
403     def test_proxy_head_timeout(self):
404         api_client = self.mock_keep_services(service_type='proxy', count=1)
405         force_timeout = socket.timeout("timed out")
406         with tutil.mock_keep_responses(force_timeout, 0) as mock:
407             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
408             with self.assertRaises(arvados.errors.KeepReadError):
409                 keep_client.head('ffffffffffffffffffffffffffffffff')
410             self.assertEqual(
411                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
412                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
413             self.assertEqual(
414                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
415                 None)
416             self.assertEqual(
417                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
418                 None)
419
420     def test_proxy_put_timeout(self):
421         self.disk_cache_dir = None
422         api_client = self.mock_keep_services(service_type='proxy', count=1)
423         force_timeout = socket.timeout("timed out")
424         with tutil.mock_keep_responses(force_timeout, 0) as mock:
425             keep_client = arvados.KeepClient(api_client=api_client)
426             with self.assertRaises(arvados.errors.KeepWriteError):
427                 keep_client.put('foo')
428             self.assertEqual(
429                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
430                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
431             self.assertEqual(
432                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
433                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
434             self.assertEqual(
435                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
436                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
437
438     def check_no_services_error(self, verb, exc_class):
439         api_client = mock.MagicMock(name='api_client')
440         api_client.keep_services().accessible().execute.side_effect = (
441             arvados.errors.ApiError)
442         keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
443         with self.assertRaises(exc_class) as err_check:
444             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
445         self.assertEqual(0, len(err_check.exception.request_errors()))
446
447     def test_get_error_with_no_services(self):
448         self.check_no_services_error('get', arvados.errors.KeepReadError)
449
450     def test_head_error_with_no_services(self):
451         self.check_no_services_error('head', arvados.errors.KeepReadError)
452
453     def test_put_error_with_no_services(self):
454         self.check_no_services_error('put', arvados.errors.KeepWriteError)
455
456     def check_errors_from_last_retry(self, verb, exc_class):
457         api_client = self.mock_keep_services(count=2)
458         req_mock = tutil.mock_keep_responses(
459             "retry error reporting test", 500, 500, 500, 500, 500, 500, 502, 502)
460         with req_mock, tutil.skip_sleep, \
461                 self.assertRaises(exc_class) as err_check:
462             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
463             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
464                                        num_retries=3)
465         self.assertEqual([502, 502], [
466                 getattr(error, 'status_code', None)
467                 for error in err_check.exception.request_errors().values()])
468         self.assertRegex(str(err_check.exception), r'failed to (read|write) .* after 4 attempts')
469
470     def test_get_error_reflects_last_retry(self):
471         self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
472
473     def test_head_error_reflects_last_retry(self):
474         self.check_errors_from_last_retry('head', arvados.errors.KeepReadError)
475
476     def test_put_error_reflects_last_retry(self):
477         self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
478
479     def test_put_error_does_not_include_successful_puts(self):
480         data = 'partial failure test'
481         data_loc = tutil.str_keep_locator(data)
482         api_client = self.mock_keep_services(count=3)
483         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
484                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
485             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
486             keep_client.put(data)
487         self.assertEqual(2, len(exc_check.exception.request_errors()))
488
489     def test_proxy_put_with_no_writable_services(self):
490         data = 'test with no writable services'
491         data_loc = tutil.str_keep_locator(data)
492         api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
493         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
494                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
495           keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
496           keep_client.put(data)
497         self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
498         self.assertEqual(0, len(exc_check.exception.request_errors()))
499
500     def test_oddball_service_get(self):
501         body = b'oddball service get'
502         api_client = self.mock_keep_services(service_type='fancynewblobstore')
503         with tutil.mock_keep_responses(body, 200):
504             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
505             actual = keep_client.get(tutil.str_keep_locator(body))
506         self.assertEqual(body, actual)
507
508     def test_oddball_service_put(self):
509         body = b'oddball service put'
510         pdh = tutil.str_keep_locator(body)
511         api_client = self.mock_keep_services(service_type='fancynewblobstore')
512         with tutil.mock_keep_responses(pdh, 200):
513             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
514             actual = keep_client.put(body, copies=1)
515         self.assertEqual(pdh, actual)
516
517     def test_oddball_service_writer_count(self):
518         body = b'oddball service writer count'
519         pdh = tutil.str_keep_locator(body)
520         api_client = self.mock_keep_services(service_type='fancynewblobstore',
521                                              count=4)
522         headers = {'x-keep-replicas-stored': 3}
523         with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
524                                        **headers) as req_mock:
525             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
526             actual = keep_client.put(body, copies=2)
527         self.assertEqual(pdh, actual)
528         self.assertEqual(1, req_mock.call_count)
529
530 @tutil.skip_sleep
531 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
532 class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
533     disk_cache = False
534
535     def setUp(self):
536         self.api_client = self.mock_keep_services(count=2)
537         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
538         self.data = b'xyzzy'
539         self.locator = '1271ed5ef305aadabc605b1609e24c52'
540
541     def tearDown(self):
542         DiskCacheBase.tearDown(self)
543
544     @mock.patch('arvados.KeepClient.KeepService.get')
545     def test_get_request_cache(self, get_mock):
546         with tutil.mock_keep_responses(self.data, 200, 200):
547             self.keep_client.get(self.locator)
548             self.keep_client.get(self.locator)
549         # Request already cached, don't require more than one request
550         get_mock.assert_called_once()
551
552     @mock.patch('arvados.KeepClient.KeepService.get')
553     def test_head_request_cache(self, get_mock):
554         with tutil.mock_keep_responses(self.data, 200, 200):
555             self.keep_client.head(self.locator)
556             self.keep_client.head(self.locator)
557         # Don't cache HEAD requests so that they're not confused with GET reqs
558         self.assertEqual(2, get_mock.call_count)
559
560     @mock.patch('arvados.KeepClient.KeepService.get')
561     def test_head_and_then_get_return_different_responses(self, get_mock):
562         head_resp = None
563         get_resp = None
564         get_mock.side_effect = [b'first response', b'second response']
565         with tutil.mock_keep_responses(self.data, 200, 200):
566             head_resp = self.keep_client.head(self.locator)
567             get_resp = self.keep_client.get(self.locator)
568         self.assertEqual(b'first response', head_resp)
569         # First reponse was not cached because it was from a HEAD request.
570         self.assertNotEqual(head_resp, get_resp)
571
572
573
574
575 @tutil.skip_sleep
576 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
577 class KeepStorageClassesTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
578     disk_cache = False
579
580     def setUp(self):
581         self.api_client = self.mock_keep_services(count=2)
582         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
583         self.data = b'xyzzy'
584         self.locator = '1271ed5ef305aadabc605b1609e24c52'
585
586     def tearDown(self):
587         DiskCacheBase.tearDown(self)
588
589     def test_multiple_default_storage_classes_req_header(self):
590         api_mock = self.api_client_mock()
591         api_mock.config.return_value = {
592             'StorageClasses': {
593                 'foo': { 'Default': True },
594                 'bar': { 'Default': True },
595                 'baz': { 'Default': False }
596             }
597         }
598         api_client = self.mock_keep_services(api_mock=api_mock, count=2)
599         keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
600         resp_hdr = {
601             'x-keep-storage-classes-confirmed': 'foo=1, bar=1',
602             'x-keep-replicas-stored': 1
603         }
604         with tutil.mock_keep_responses(self.locator, 200, **resp_hdr) as mock:
605             keep_client.put(self.data, copies=1)
606             req_hdr = mock.responses[0]
607             self.assertIn(
608                 'X-Keep-Storage-Classes: bar, foo', req_hdr.getopt(pycurl.HTTPHEADER))
609
610     def test_storage_classes_req_header(self):
611         self.assertEqual(
612             self.api_client.config()['StorageClasses'],
613             {'default': {'Default': True}})
614         cases = [
615             # requested, expected
616             [['foo'], 'X-Keep-Storage-Classes: foo'],
617             [['bar', 'foo'], 'X-Keep-Storage-Classes: bar, foo'],
618             [[], 'X-Keep-Storage-Classes: default'],
619             [None, 'X-Keep-Storage-Classes: default'],
620         ]
621         for req_classes, expected_header in cases:
622             headers = {'x-keep-replicas-stored': 1}
623             if req_classes is None or len(req_classes) == 0:
624                 confirmed_hdr = 'default=1'
625             elif len(req_classes) > 0:
626                 confirmed_hdr = ', '.join(["{}=1".format(cls) for cls in req_classes])
627             headers.update({'x-keep-storage-classes-confirmed': confirmed_hdr})
628             with tutil.mock_keep_responses(self.locator, 200, **headers) as mock:
629                 self.keep_client.put(self.data, copies=1, classes=req_classes)
630                 req_hdr = mock.responses[0]
631                 self.assertIn(expected_header, req_hdr.getopt(pycurl.HTTPHEADER))
632
633     def test_partial_storage_classes_put(self):
634         headers = {
635             'x-keep-replicas-stored': 1,
636             'x-keep-storage-classes-confirmed': 'foo=1'}
637         with tutil.mock_keep_responses(self.locator, 200, 503, **headers) as mock:
638             with self.assertRaises(arvados.errors.KeepWriteError):
639                 self.keep_client.put(self.data, copies=1, classes=['foo', 'bar'])
640             # 1st request, both classes pending
641             req1_headers = mock.responses[0].getopt(pycurl.HTTPHEADER)
642             self.assertIn('X-Keep-Storage-Classes: bar, foo', req1_headers)
643             # 2nd try, 'foo' class already satisfied
644             req2_headers = mock.responses[1].getopt(pycurl.HTTPHEADER)
645             self.assertIn('X-Keep-Storage-Classes: bar', req2_headers)
646
647     def test_successful_storage_classes_put_requests(self):
648         cases = [
649             # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, expected_requests
650             [ 1, ['foo'], 1, 'foo=1', 1],
651             [ 1, ['foo'], 2, 'foo=2', 1],
652             [ 2, ['foo'], 2, 'foo=2', 1],
653             [ 2, ['foo'], 1, 'foo=1', 2],
654             [ 1, ['foo', 'bar'], 1, 'foo=1, bar=1', 1],
655             [ 1, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
656             [ 2, ['foo', 'bar'], 2, 'foo=2, bar=2', 1],
657             [ 2, ['foo', 'bar'], 1, 'foo=1, bar=1', 2],
658             [ 1, ['foo', 'bar'], 1, None, 1],
659             [ 1, ['foo'], 1, None, 1],
660             [ 2, ['foo'], 2, None, 1],
661             [ 2, ['foo'], 1, None, 2],
662         ]
663         for w_copies, w_classes, c_copies, c_classes, e_reqs in cases:
664             headers = {'x-keep-replicas-stored': c_copies}
665             if c_classes is not None:
666                 headers.update({'x-keep-storage-classes-confirmed': c_classes})
667             with tutil.mock_keep_responses(self.locator, 200, 200, **headers) as mock:
668                 case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}", expected_requests={}'.format(w_copies, ', '.join(w_classes), c_copies, c_classes, e_reqs)
669                 self.assertEqual(self.locator,
670                     self.keep_client.put(self.data, copies=w_copies, classes=w_classes),
671                     case_desc)
672                 self.assertEqual(e_reqs, mock.call_count, case_desc)
673
674     def test_failed_storage_classes_put_requests(self):
675         cases = [
676             # wanted_copies, wanted_classes, confirmed_copies, confirmed_classes, return_code
677             [ 1, ['foo'], 1, 'bar=1', 200],
678             [ 1, ['foo'], 1, None, 503],
679             [ 2, ['foo'], 1, 'bar=1, foo=0', 200],
680             [ 3, ['foo'], 1, 'bar=1, foo=1', 200],
681             [ 3, ['foo', 'bar'], 1, 'bar=2, foo=1', 200],
682         ]
683         for w_copies, w_classes, c_copies, c_classes, return_code in cases:
684             headers = {'x-keep-replicas-stored': c_copies}
685             if c_classes is not None:
686                 headers.update({'x-keep-storage-classes-confirmed': c_classes})
687             with tutil.mock_keep_responses(self.locator, return_code, return_code, **headers):
688                 case_desc = 'wanted_copies={}, wanted_classes="{}", confirmed_copies={}, confirmed_classes="{}"'.format(w_copies, ', '.join(w_classes), c_copies, c_classes)
689                 with self.assertRaises(arvados.errors.KeepWriteError, msg=case_desc):
690                     self.keep_client.put(self.data, copies=w_copies, classes=w_classes)
691
692 @tutil.skip_sleep
693 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
694 class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
695     disk_cache = False
696
697     def setUp(self):
698         self.api_client = self.mock_keep_services(count=2)
699         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
700         self.data = b'xyzzy'
701         self.locator = '1271ed5ef305aadabc605b1609e24c52'
702         self.test_id = arvados.util.new_request_id()
703         self.assertRegex(self.test_id, r'^req-[a-z0-9]{20}$')
704         # If we don't set request_id to None explicitly here, it will
705         # return <MagicMock name='api_client_mock.request_id'
706         # id='123456789'>:
707         self.api_client.request_id = None
708
709     def tearDown(self):
710         DiskCacheBase.tearDown(self)
711
712     def test_default_to_api_client_request_id(self):
713         self.api_client.request_id = self.test_id
714         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
715             self.keep_client.put(self.data)
716         self.assertEqual(2, len(mock.responses))
717         for resp in mock.responses:
718             self.assertProvidedRequestId(resp)
719
720         with tutil.mock_keep_responses(self.data, 200) as mock:
721             self.keep_client.get(self.locator)
722         self.assertProvidedRequestId(mock.responses[0])
723
724         with tutil.mock_keep_responses(b'', 200) as mock:
725             self.keep_client.head(self.locator)
726         self.assertProvidedRequestId(mock.responses[0])
727
728     def test_explicit_request_id(self):
729         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
730             self.keep_client.put(self.data, request_id=self.test_id)
731         self.assertEqual(2, len(mock.responses))
732         for resp in mock.responses:
733             self.assertProvidedRequestId(resp)
734
735         with tutil.mock_keep_responses(self.data, 200) as mock:
736             self.keep_client.get(self.locator, request_id=self.test_id)
737         self.assertProvidedRequestId(mock.responses[0])
738
739         with tutil.mock_keep_responses(b'', 200) as mock:
740             self.keep_client.head(self.locator, request_id=self.test_id)
741         self.assertProvidedRequestId(mock.responses[0])
742
743     def test_automatic_request_id(self):
744         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
745             self.keep_client.put(self.data)
746         self.assertEqual(2, len(mock.responses))
747         for resp in mock.responses:
748             self.assertAutomaticRequestId(resp)
749
750         with tutil.mock_keep_responses(self.data, 200) as mock:
751             self.keep_client.get(self.locator)
752         self.assertAutomaticRequestId(mock.responses[0])
753
754         with tutil.mock_keep_responses(b'', 200) as mock:
755             self.keep_client.head(self.locator)
756         self.assertAutomaticRequestId(mock.responses[0])
757
758     def test_request_id_in_exception(self):
759         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
760             with self.assertRaisesRegex(arvados.errors.KeepReadError, self.test_id):
761                 self.keep_client.head(self.locator, request_id=self.test_id)
762
763         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
764             with self.assertRaisesRegex(arvados.errors.KeepReadError, r'req-[a-z0-9]{20}'):
765                 self.keep_client.get(self.locator)
766
767         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
768             with self.assertRaisesRegex(arvados.errors.KeepWriteError, self.test_id):
769                 self.keep_client.put(self.data, request_id=self.test_id)
770
771         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
772             with self.assertRaisesRegex(arvados.errors.KeepWriteError, r'req-[a-z0-9]{20}'):
773                 self.keep_client.put(self.data)
774
775     def assertAutomaticRequestId(self, resp):
776         hdr = [x for x in resp.getopt(pycurl.HTTPHEADER)
777                if x.startswith('X-Request-Id: ')][0]
778         self.assertNotEqual(hdr, 'X-Request-Id: '+self.test_id)
779         self.assertRegex(hdr, r'^X-Request-Id: req-[a-z0-9]{20}$')
780
781     def assertProvidedRequestId(self, resp):
782         self.assertIn('X-Request-Id: '+self.test_id,
783                       resp.getopt(pycurl.HTTPHEADER))
784
785
786 @tutil.skip_sleep
787 #@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
788 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
789     disk_cache = False
790
791     def setUp(self):
792         # expected_order[i] is the probe order for
793         # hash=md5(sprintf("%064x",i)) where there are 16 services
794         # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
795         # the first probe for the block consisting of 64 "0"
796         # characters is the service whose uuid is
797         # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
798         self.services = 16
799         self.expected_order = [
800             list('3eab2d5fc9681074'),
801             list('097dba52e648f1c3'),
802             list('c5b4e023f8a7d691'),
803             list('9d81c02e76a3bf54'),
804             ]
805         self.blocks = [
806             "{:064x}".format(x).encode()
807             for x in range(len(self.expected_order))]
808         self.hashes = [
809             hashlib.md5(self.blocks[x]).hexdigest()
810             for x in range(len(self.expected_order))]
811         self.api_client = self.mock_keep_services(count=self.services)
812         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
813
814     def tearDown(self):
815         DiskCacheBase.tearDown(self)
816
817     def test_weighted_service_roots_against_reference_set(self):
818         # Confirm weighted_service_roots() returns the correct order
819         for i, hash in enumerate(self.hashes):
820             roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash))
821             got_order = [
822                 re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
823                 for root in roots]
824             self.assertEqual(self.expected_order[i], got_order)
825
826     def test_get_probe_order_against_reference_set(self):
827         self._test_probe_order_against_reference_set(
828             lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
829
830     def test_head_probe_order_against_reference_set(self):
831         self._test_probe_order_against_reference_set(
832             lambda i: self.keep_client.head(self.hashes[i], num_retries=1))
833
834     def test_put_probe_order_against_reference_set(self):
835         # copies=1 prevents the test from being sensitive to races
836         # between writer threads.
837         self._test_probe_order_against_reference_set(
838             lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1))
839
840     def _test_probe_order_against_reference_set(self, op):
841         for i in range(len(self.blocks)):
842             with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \
843                  self.assertRaises(arvados.errors.KeepRequestError):
844                 op(i)
845             got_order = [
846                 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
847                 for resp in mock.responses]
848             self.assertEqual(self.expected_order[i]*2, got_order)
849
850     def test_put_probe_order_multiple_copies(self):
851         for copies in range(2, 4):
852             for i in range(len(self.blocks)):
853                 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \
854                      self.assertRaises(arvados.errors.KeepWriteError):
855                     self.keep_client.put(self.blocks[i], num_retries=2, copies=copies)
856                 got_order = [
857                     re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
858                     for resp in mock.responses]
859                 # With T threads racing to make requests, the position
860                 # of a given server in the sequence of HTTP requests
861                 # (got_order) cannot be more than T-1 positions
862                 # earlier than that server's position in the reference
863                 # probe sequence (expected_order).
864                 #
865                 # Loop invariant: we have accounted for +pos+ expected
866                 # probes, either by seeing them in +got_order+ or by
867                 # putting them in +pending+ in the hope of seeing them
868                 # later. As long as +len(pending)<T+, we haven't
869                 # started a request too early.
870                 pending = []
871                 for pos, expected in enumerate(self.expected_order[i]*3):
872                     got = got_order[pos-len(pending)]
873                     while got in pending:
874                         del pending[pending.index(got)]
875                         got = got_order[pos-len(pending)]
876                     if got != expected:
877                         pending.append(expected)
878                         self.assertLess(
879                             len(pending), copies,
880                             "pending={}, with copies={}, got {}, expected {}".format(
881                                 pending, copies, repr(got_order), repr(self.expected_order[i]*3)))
882
883     def test_probe_waste_adding_one_server(self):
884         hashes = [
885             hashlib.md5("{:064x}".format(x).encode()).hexdigest() for x in range(100)]
886         initial_services = 12
887         self.api_client = self.mock_keep_services(count=initial_services)
888         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
889         probes_before = [
890             self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
891         for added_services in range(1, 12):
892             api_client = self.mock_keep_services(count=initial_services+added_services)
893             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
894             total_penalty = 0
895             for hash_index in range(len(hashes)):
896                 probe_after = keep_client.weighted_service_roots(
897                     arvados.KeepLocator(hashes[hash_index]))
898                 penalty = probe_after.index(probes_before[hash_index][0])
899                 self.assertLessEqual(penalty, added_services)
900                 total_penalty += penalty
901             # Average penalty per block should not exceed
902             # N(added)/N(orig) by more than 20%, and should get closer
903             # to the ideal as we add data points.
904             expect_penalty = (
905                 added_services *
906                 len(hashes) / initial_services)
907             max_penalty = (
908                 expect_penalty *
909                 (120 - added_services)/100)
910             min_penalty = (
911                 expect_penalty * 8/10)
912             self.assertTrue(
913                 min_penalty <= total_penalty <= max_penalty,
914                 "With {}+{} services, {} blocks, penalty {} but expected {}..{}".format(
915                     initial_services,
916                     added_services,
917                     len(hashes),
918                     total_penalty,
919                     min_penalty,
920                     max_penalty))
921
922     def check_64_zeros_error_order(self, verb, exc_class):
923         data = b'0' * 64
924         if verb == 'get':
925             data = tutil.str_keep_locator(data)
926         # Arbitrary port number:
927         aport = random.randint(1024,65535)
928         api_client = self.mock_keep_services(service_port=aport, count=self.services)
929         keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
930         with mock.patch('pycurl.Curl') as curl_mock, \
931              self.assertRaises(exc_class) as err_check:
932             curl_mock.return_value = tutil.FakeCurl.make(code=500, body=b'')
933             getattr(keep_client, verb)(data)
934         urls = [urllib.parse.urlparse(url)
935                 for url in err_check.exception.request_errors()]
936         self.assertEqual([('keep0x' + c, aport) for c in '3eab2d5fc9681074'],
937                          [(url.hostname, url.port) for url in urls])
938
939     def test_get_error_shows_probe_order(self):
940         self.check_64_zeros_error_order('get', arvados.errors.KeepReadError)
941
942     def test_put_error_shows_probe_order(self):
943         self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
944
945 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
946 class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase, DiskCacheBase):
947     disk_cache = False
948
949     # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
950     # 1s worth of data and then trigger bandwidth errors before running
951     # out of data.
952     DATA = b'x'*2**11
953     BANDWIDTH_LOW_LIM = 1024
954     TIMEOUT_TIME = 1.0
955
956     def tearDown(self):
957         DiskCacheBase.tearDown(self)
958
959     class assertTakesBetween(unittest.TestCase):
960         def __init__(self, tmin, tmax):
961             self.tmin = tmin
962             self.tmax = tmax
963
964         def __enter__(self):
965             self.t0 = time.time()
966
967         def __exit__(self, *args, **kwargs):
968             # Round times to milliseconds, like CURL. Otherwise, we
969             # fail when CURL reaches a 1s timeout at 0.9998s.
970             delta = round(time.time() - self.t0, 3)
971             self.assertGreaterEqual(delta, self.tmin)
972             self.assertLessEqual(delta, self.tmax)
973
974     class assertTakesGreater(unittest.TestCase):
975         def __init__(self, tmin):
976             self.tmin = tmin
977
978         def __enter__(self):
979             self.t0 = time.time()
980
981         def __exit__(self, *args, **kwargs):
982             delta = round(time.time() - self.t0, 3)
983             self.assertGreaterEqual(delta, self.tmin)
984
985     def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
986         return arvados.KeepClient(
987             api_client=self.api_client,
988             timeout=timeouts, block_cache=self.make_block_cache(self.disk_cache))
989
990     def test_timeout_slow_connect(self):
991         # Can't simulate TCP delays with our own socket. Leave our
992         # stub server running uselessly, and try to connect to an
993         # unroutable IP address instead.
994         self.api_client = self.mock_keep_services(
995             count=1,
996             service_host='240.0.0.0',
997         )
998         with self.assertTakesBetween(0.1, 0.5):
999             with self.assertRaises(arvados.errors.KeepWriteError):
1000                 self.keepClient().put(self.DATA, copies=1, num_retries=0)
1001
1002     def test_low_bandwidth_no_delays_success(self):
1003         self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
1004         kc = self.keepClient()
1005         loc = kc.put(self.DATA, copies=1, num_retries=0)
1006         self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
1007
1008     def test_too_low_bandwidth_no_delays_failure(self):
1009         # Check that lessening bandwidth corresponds to failing
1010         kc = self.keepClient()
1011         loc = kc.put(self.DATA, copies=1, num_retries=0)
1012         self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
1013         with self.assertTakesGreater(self.TIMEOUT_TIME):
1014             with self.assertRaises(arvados.errors.KeepReadError):
1015                 kc.get(loc, num_retries=0)
1016         with self.assertTakesGreater(self.TIMEOUT_TIME):
1017             with self.assertRaises(arvados.errors.KeepWriteError):
1018                 kc.put(self.DATA, copies=1, num_retries=0)
1019
1020     def test_low_bandwidth_with_server_response_delay_failure(self):
1021         kc = self.keepClient()
1022         loc = kc.put(self.DATA, copies=1, num_retries=0)
1023         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
1024         # Note the actual delay must be 1s longer than the low speed
1025         # limit interval in order for curl to detect it reliably.
1026         self.server.setdelays(response=self.TIMEOUT_TIME+1)
1027         with self.assertTakesGreater(self.TIMEOUT_TIME):
1028             with self.assertRaises(arvados.errors.KeepReadError):
1029                 kc.get(loc, num_retries=0)
1030         with self.assertTakesGreater(self.TIMEOUT_TIME):
1031             with self.assertRaises(arvados.errors.KeepWriteError):
1032                 kc.put(self.DATA, copies=1, num_retries=0)
1033         with self.assertTakesGreater(self.TIMEOUT_TIME):
1034             kc.head(loc, num_retries=0)
1035
1036     def test_low_bandwidth_with_server_mid_delay_failure(self):
1037         kc = self.keepClient()
1038         loc = kc.put(self.DATA, copies=1, num_retries=0)
1039         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
1040         # Note the actual delay must be 1s longer than the low speed
1041         # limit interval in order for curl to detect it reliably.
1042         self.server.setdelays(mid_write=self.TIMEOUT_TIME+1, mid_read=self.TIMEOUT_TIME+1)
1043         with self.assertTakesGreater(self.TIMEOUT_TIME):
1044             with self.assertRaises(arvados.errors.KeepReadError) as e:
1045                 kc.get(loc, num_retries=0)
1046         with self.assertTakesGreater(self.TIMEOUT_TIME):
1047             with self.assertRaises(arvados.errors.KeepWriteError):
1048                 kc.put(self.DATA, copies=1, num_retries=0)
1049
1050     def test_timeout_slow_request(self):
1051         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
1052         self.server.setdelays(request=.2)
1053         self._test_connect_timeout_under_200ms(loc)
1054         self.server.setdelays(request=2)
1055         self._test_response_timeout_under_2s(loc)
1056
1057     def test_timeout_slow_response(self):
1058         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
1059         self.server.setdelays(response=.2)
1060         self._test_connect_timeout_under_200ms(loc)
1061         self.server.setdelays(response=2)
1062         self._test_response_timeout_under_2s(loc)
1063
1064     def test_timeout_slow_response_body(self):
1065         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
1066         self.server.setdelays(response_body=.2)
1067         self._test_connect_timeout_under_200ms(loc)
1068         self.server.setdelays(response_body=2)
1069         self._test_response_timeout_under_2s(loc)
1070
1071     def _test_connect_timeout_under_200ms(self, loc):
1072         # Allow 100ms to connect, then 1s for response. Everything
1073         # should work, and everything should take at least 200ms to
1074         # return.
1075         kc = self.keepClient(timeouts=(.1, 1))
1076         with self.assertTakesBetween(.2, .3):
1077             kc.put(self.DATA, copies=1, num_retries=0)
1078         with self.assertTakesBetween(.2, .3):
1079             self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
1080
1081     def _test_response_timeout_under_2s(self, loc):
1082         # Allow 10s to connect, then 1s for response. Nothing should
1083         # work, and everything should take at least 1s to return.
1084         kc = self.keepClient(timeouts=(10, 1))
1085         with self.assertTakesBetween(1, 9):
1086             with self.assertRaises(arvados.errors.KeepReadError):
1087                 kc.get(loc, num_retries=0)
1088         with self.assertTakesBetween(1, 9):
1089             with self.assertRaises(arvados.errors.KeepWriteError):
1090                 kc.put(self.DATA, copies=1, num_retries=0)
1091
1092 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1093 class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
1094     disk_cache = False
1095
1096     def tearDown(self):
1097         DiskCacheBase.tearDown(self)
1098
1099     def mock_disks_and_gateways(self, disks=3, gateways=1):
1100         self.gateways = [{
1101                 'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
1102                 'owner_uuid': 'zzzzz-tpzed-000000000000000',
1103                 'service_host': 'gatewayhost{}'.format(i),
1104                 'service_port': 12345,
1105                 'service_ssl_flag': True,
1106                 'service_type': 'gateway:test',
1107         } for i in range(gateways)]
1108         self.gateway_roots = [
1109             "https://{service_host}:{service_port}/".format(**gw)
1110             for gw in self.gateways]
1111         self.api_client = self.mock_keep_services(
1112             count=disks, additional_services=self.gateways)
1113         self.keepClient = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
1114
1115     @mock.patch('pycurl.Curl')
1116     def test_get_with_gateway_hint_first(self, MockCurl):
1117         MockCurl.return_value = tutil.FakeCurl.make(
1118             code=200, body='foo', headers={'Content-Length': 3})
1119         self.mock_disks_and_gateways()
1120         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@' + self.gateways[0]['uuid']
1121         self.assertEqual(b'foo', self.keepClient.get(locator))
1122         self.assertEqual(self.gateway_roots[0]+locator,
1123                          MockCurl.return_value.getopt(pycurl.URL).decode())
1124         self.assertEqual(True, self.keepClient.head(locator))
1125
1126     @mock.patch('pycurl.Curl')
1127     def test_get_with_gateway_hints_in_order(self, MockCurl):
1128         gateways = 4
1129         disks = 3
1130         mocks = [
1131             tutil.FakeCurl.make(code=404, body='')
1132             for _ in range(gateways+disks)
1133         ]
1134         MockCurl.side_effect = tutil.queue_with(mocks)
1135         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1136         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1137                            ['K@'+gw['uuid'] for gw in self.gateways])
1138         with self.assertRaises(arvados.errors.NotFoundError):
1139             self.keepClient.get(locator)
1140         # Gateways are tried first, in the order given.
1141         for i, root in enumerate(self.gateway_roots):
1142             self.assertEqual(root+locator,
1143                              mocks[i].getopt(pycurl.URL).decode())
1144         # Disk services are tried next.
1145         for i in range(gateways, gateways+disks):
1146             self.assertRegex(
1147                 mocks[i].getopt(pycurl.URL).decode(),
1148                 r'keep0x')
1149
1150     @mock.patch('pycurl.Curl')
1151     def test_head_with_gateway_hints_in_order(self, MockCurl):
1152         gateways = 4
1153         disks = 3
1154         mocks = [
1155             tutil.FakeCurl.make(code=404, body=b'')
1156             for _ in range(gateways+disks)
1157         ]
1158         MockCurl.side_effect = tutil.queue_with(mocks)
1159         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1160         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1161                            ['K@'+gw['uuid'] for gw in self.gateways])
1162         with self.assertRaises(arvados.errors.NotFoundError):
1163             self.keepClient.head(locator)
1164         # Gateways are tried first, in the order given.
1165         for i, root in enumerate(self.gateway_roots):
1166             self.assertEqual(root+locator,
1167                              mocks[i].getopt(pycurl.URL).decode())
1168         # Disk services are tried next.
1169         for i in range(gateways, gateways+disks):
1170             self.assertRegex(
1171                 mocks[i].getopt(pycurl.URL).decode(),
1172                 r'keep0x')
1173
1174     @mock.patch('pycurl.Curl')
1175     def test_get_with_remote_proxy_hint(self, MockCurl):
1176         MockCurl.return_value = tutil.FakeCurl.make(
1177             code=200, body=b'foo', headers={'Content-Length': 3})
1178         self.mock_disks_and_gateways()
1179         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1180         self.assertEqual(b'foo', self.keepClient.get(locator))
1181         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1182                          MockCurl.return_value.getopt(pycurl.URL).decode())
1183
1184     @mock.patch('pycurl.Curl')
1185     def test_head_with_remote_proxy_hint(self, MockCurl):
1186         MockCurl.return_value = tutil.FakeCurl.make(
1187             code=200, body=b'foo', headers={'Content-Length': 3})
1188         self.mock_disks_and_gateways()
1189         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1190         self.assertEqual(True, self.keepClient.head(locator))
1191         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1192                          MockCurl.return_value.getopt(pycurl.URL).decode())
1193
1194 class KeepClientRetryTestMixin(object):
1195     disk_cache = False
1196
1197     # Testing with a local Keep store won't exercise the retry behavior.
1198     # Instead, our strategy is:
1199     # * Create a client with one proxy specified (pointed at a black
1200     #   hole), so there's no need to instantiate an API client, and
1201     #   all HTTP requests come from one place.
1202     # * Mock httplib's request method to provide simulated responses.
1203     # This lets us test the retry logic extensively without relying on any
1204     # supporting servers, and prevents side effects in case something hiccups.
1205     # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
1206     # run_method().
1207     #
1208     # Test classes must define TEST_PATCHER to a method that mocks
1209     # out appropriate methods in the client.
1210
1211     PROXY_ADDR = 'http://[%s]:65535/' % (tutil.TEST_HOST,)
1212     TEST_DATA = b'testdata'
1213     TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
1214
1215     def setUp(self):
1216         self.client_kwargs = {'proxy': self.PROXY_ADDR, 'local_store': ''}
1217
1218     def new_client(self, **caller_kwargs):
1219         kwargs = self.client_kwargs.copy()
1220         kwargs.update(caller_kwargs)
1221         kwargs['block_cache'] = self.make_block_cache(self.disk_cache)
1222         return arvados.KeepClient(**kwargs)
1223
1224     def run_method(self, *args, **kwargs):
1225         raise NotImplementedError("test subclasses must define run_method")
1226
1227     def check_success(self, expected=None, *args, **kwargs):
1228         if expected is None:
1229             expected = self.DEFAULT_EXPECT
1230         self.assertEqual(expected, self.run_method(*args, **kwargs))
1231
1232     def check_exception(self, error_class=None, *args, **kwargs):
1233         if error_class is None:
1234             error_class = self.DEFAULT_EXCEPTION
1235         with self.assertRaises(error_class) as err:
1236             self.run_method(*args, **kwargs)
1237         return err
1238
1239     def test_immediate_success(self):
1240         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 200):
1241             self.check_success()
1242
1243     def test_retry_then_success(self):
1244         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1245             self.check_success(num_retries=3)
1246
1247     def test_exception_then_success(self):
1248         with self.TEST_PATCHER(self.DEFAULT_EXPECT, Exception('mock err'), 200):
1249             self.check_success(num_retries=3)
1250
1251     def test_no_default_retry(self):
1252         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1253             self.check_exception()
1254
1255     def test_no_retry_after_permanent_error(self):
1256         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 403, 200):
1257             self.check_exception(num_retries=3)
1258
1259     def test_error_after_retries_exhausted(self):
1260         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 500, 200):
1261             err = self.check_exception(num_retries=1)
1262         self.assertRegex(str(err.exception), r'failed to .* after 2 attempts')
1263
1264     def test_num_retries_instance_fallback(self):
1265         self.client_kwargs['num_retries'] = 3
1266         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1267             self.check_success()
1268
1269
1270 @tutil.skip_sleep
1271 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1272 class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1273     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
1274     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1275     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1276     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1277
1278     def tearDown(self):
1279         DiskCacheBase.tearDown(self)
1280
1281     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1282                    *args, **kwargs):
1283         return self.new_client().get(locator, *args, **kwargs)
1284
1285     def test_specific_exception_when_not_found(self):
1286         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1287             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1288
1289     def test_general_exception_with_mixed_errors(self):
1290         # get should raise a NotFoundError if no server returns the block,
1291         # and a high threshold of servers report that it's not found.
1292         # This test rigs up 50/50 disagreement between two servers, and
1293         # checks that it does not become a NotFoundError.
1294         client = self.new_client()
1295         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1296             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1297                 client.get(self.HINTED_LOCATOR)
1298             self.assertNotIsInstance(
1299                 exc_check.exception, arvados.errors.NotFoundError,
1300                 "mixed errors raised NotFoundError")
1301
1302     def test_hint_server_can_succeed_without_retries(self):
1303         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1304             self.check_success(locator=self.HINTED_LOCATOR)
1305
1306     def test_try_next_server_after_timeout(self):
1307         with tutil.mock_keep_responses(
1308                 (socket.timeout("timed out"), 200),
1309                 (self.DEFAULT_EXPECT, 200)):
1310             self.check_success(locator=self.HINTED_LOCATOR)
1311
1312     def test_retry_data_with_wrong_checksum(self):
1313         with tutil.mock_keep_responses(
1314                 ('baddata', 200),
1315                 (self.DEFAULT_EXPECT, 200)):
1316             self.check_success(locator=self.HINTED_LOCATOR)
1317
1318 @tutil.skip_sleep
1319 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1320 class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1321     DEFAULT_EXPECT = True
1322     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1323     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1324     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1325
1326     def tearDown(self):
1327         DiskCacheBase.tearDown(self)
1328
1329     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1330                    *args, **kwargs):
1331         return self.new_client().head(locator, *args, **kwargs)
1332
1333     def test_specific_exception_when_not_found(self):
1334         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1335             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1336
1337     def test_general_exception_with_mixed_errors(self):
1338         # head should raise a NotFoundError if no server returns the block,
1339         # and a high threshold of servers report that it's not found.
1340         # This test rigs up 50/50 disagreement between two servers, and
1341         # checks that it does not become a NotFoundError.
1342         client = self.new_client()
1343         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1344             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1345                 client.head(self.HINTED_LOCATOR)
1346             self.assertNotIsInstance(
1347                 exc_check.exception, arvados.errors.NotFoundError,
1348                 "mixed errors raised NotFoundError")
1349
1350     def test_hint_server_can_succeed_without_retries(self):
1351         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1352             self.check_success(locator=self.HINTED_LOCATOR)
1353
1354     def test_try_next_server_after_timeout(self):
1355         with tutil.mock_keep_responses(
1356                 (socket.timeout("timed out"), 200),
1357                 (self.DEFAULT_EXPECT, 200)):
1358             self.check_success(locator=self.HINTED_LOCATOR)
1359
1360 @tutil.skip_sleep
1361 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1362 class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1363     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
1364     DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
1365     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1366
1367     def tearDown(self):
1368         DiskCacheBase.tearDown(self)
1369
1370     def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
1371                    copies=1, *args, **kwargs):
1372         return self.new_client().put(data, copies, *args, **kwargs)
1373
1374     def test_do_not_send_multiple_copies_to_same_server(self):
1375         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200):
1376             self.check_exception(copies=2, num_retries=3)
1377
1378
1379 class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
1380
1381     class FakeKeepService(object):
1382         def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
1383             self.delay = delay
1384             self.will_succeed = will_succeed
1385             self.will_raise = will_raise
1386             self._result = {}
1387             self._result['headers'] = {}
1388             self._result['headers']['x-keep-replicas-stored'] = str(replicas)
1389             self._result['headers']['x-keep-storage-classes-confirmed'] = 'default={}'.format(replicas)
1390             self._result['body'] = 'foobar'
1391
1392         def put(self, data_hash, data, timeout, headers):
1393             time.sleep(self.delay)
1394             if self.will_raise is not None:
1395                 raise self.will_raise
1396             return self.will_succeed
1397
1398         def last_result(self):
1399             if self.will_succeed:
1400                 return self._result
1401             else:
1402                 return {"status_code": 500, "body": "didn't succeed"}
1403
1404         def finished(self):
1405             return False
1406
1407     def setUp(self):
1408         self.copies = 3
1409         self.pool = arvados.KeepClient.KeepWriterThreadPool(
1410             data = 'foo',
1411             data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1412             max_service_replicas = self.copies,
1413             copies = self.copies
1414         )
1415
1416     def test_only_write_enough_on_success(self):
1417         for i in range(10):
1418             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1419             self.pool.add_task(ks, None)
1420         self.pool.join()
1421         self.assertEqual(self.pool.done(), (self.copies, []))
1422
1423     def test_only_write_enough_on_partial_success(self):
1424         for i in range(5):
1425             ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
1426             self.pool.add_task(ks, None)
1427             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1428             self.pool.add_task(ks, None)
1429         self.pool.join()
1430         self.assertEqual(self.pool.done(), (self.copies, []))
1431
1432     def test_only_write_enough_when_some_crash(self):
1433         for i in range(5):
1434             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1435             self.pool.add_task(ks, None)
1436             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1437             self.pool.add_task(ks, None)
1438         self.pool.join()
1439         self.assertEqual(self.pool.done(), (self.copies, []))
1440
1441     def test_fail_when_too_many_crash(self):
1442         for i in range(self.copies+1):
1443             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1444             self.pool.add_task(ks, None)
1445         for i in range(self.copies-1):
1446             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1447             self.pool.add_task(ks, None)
1448         self.pool.join()
1449         self.assertEqual(self.pool.done(), (self.copies-1, []))
1450
1451
1452 @tutil.skip_sleep
1453 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1454 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
1455     block_cache = False
1456
1457     # Test put()s that need two distinct servers to succeed, possibly
1458     # requiring multiple passes through the retry loop.
1459
1460     def setUp(self):
1461         self.api_client = self.mock_keep_services(count=2)
1462         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
1463
1464     def tearDown(self):
1465         DiskCacheBase.tearDown(self)
1466
1467     def test_success_after_exception(self):
1468         with tutil.mock_keep_responses(
1469                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1470                 Exception('mock err'), 200, 200) as req_mock:
1471             self.keep_client.put('foo', num_retries=1, copies=2)
1472         self.assertEqual(3, req_mock.call_count)
1473
1474     def test_success_after_retryable_error(self):
1475         with tutil.mock_keep_responses(
1476                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1477                 500, 200, 200) as req_mock:
1478             self.keep_client.put('foo', num_retries=1, copies=2)
1479         self.assertEqual(3, req_mock.call_count)
1480
1481     def test_fail_after_final_error(self):
1482         # First retry loop gets a 200 (can't achieve replication by
1483         # storing again on that server) and a 400 (can't retry that
1484         # server at all), so we shouldn't try a third request.
1485         with tutil.mock_keep_responses(
1486                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1487                 200, 400, 200) as req_mock:
1488             with self.assertRaises(arvados.errors.KeepWriteError):
1489                 self.keep_client.put('foo', num_retries=1, copies=2)
1490         self.assertEqual(2, req_mock.call_count)
1491
1492 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1493 class KeepClientAPIErrorTest(unittest.TestCase, DiskCacheBase):
1494     disk_cache = False
1495
1496     def tearDown(self):
1497         DiskCacheBase.tearDown(self)
1498
1499     def test_api_fail(self):
1500         class ApiMock(object):
1501             def __getattr__(self, r):
1502                 if r == "api_token":
1503                     return "abc"
1504                 elif r == "insecure":
1505                     return False
1506                 elif r == "config":
1507                     return lambda: {}
1508                 else:
1509                     raise arvados.errors.KeepReadError()
1510         keep_client = arvados.KeepClient(api_client=ApiMock(),
1511                                          proxy='', local_store='',
1512                                          block_cache=self.make_block_cache(self.disk_cache))
1513
1514         # The bug this is testing for is that if an API (not
1515         # keepstore) exception is thrown as part of a get(), the next
1516         # attempt to get that same block will result in a deadlock.
1517         # This is why there are two get()s in a row.  Unfortunately,
1518         # the failure mode for this test is that the test suite
1519         # deadlocks, there isn't a good way to avoid that without
1520         # adding a special case that has no use except for this test.
1521
1522         with self.assertRaises(arvados.errors.KeepReadError):
1523             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1524         with self.assertRaises(arvados.errors.KeepReadError):
1525             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1526
1527
1528 class KeepDiskCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
1529     def setUp(self):
1530         self.api_client = self.mock_keep_services(count=2)
1531         self.data = b'xyzzy'
1532         self.locator = '1271ed5ef305aadabc605b1609e24c52'
1533         self.disk_cache_dir = tempfile.mkdtemp()
1534
1535     def tearDown(self):
1536         shutil.rmtree(self.disk_cache_dir)
1537
1538
1539     @mock.patch('arvados.KeepClient.KeepService.get')
1540     def test_disk_cache_read(self, get_mock):
1541         # confirm it finds an existing cache block when the cache is
1542         # initialized.
1543
1544         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1545         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1546             f.write(self.data)
1547
1548         # block cache should have found the existing block
1549         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1550                                                   disk_cache_dir=self.disk_cache_dir)
1551         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1552
1553         self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1554
1555         get_mock.assert_not_called()
1556
1557
1558     @mock.patch('arvados.KeepClient.KeepService.get')
1559     def test_disk_cache_share(self, get_mock):
1560         # confirm it finds a cache block written after the disk cache
1561         # was initialized.
1562
1563         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1564                                                   disk_cache_dir=self.disk_cache_dir)
1565         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1566
1567         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1568         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1569             f.write(self.data)
1570
1571         # when we try to get the block, it'll check the disk and find it.
1572         self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1573
1574         get_mock.assert_not_called()
1575
1576
1577     def test_disk_cache_write(self):
1578         # confirm the cache block was created
1579
1580         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1581                                                   disk_cache_dir=self.disk_cache_dir)
1582         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1583
1584         with tutil.mock_keep_responses(self.data, 200) as mock:
1585             self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1586
1587         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1588             self.assertTrue(tutil.binary_compare(f.read(), self.data))
1589
1590
1591     def test_disk_cache_clean(self):
1592         # confirm that a tmp file in the cache is cleaned up
1593
1594         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1595         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock"), "wb") as f:
1596             f.write(b"abc1")
1597
1598         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC"), "wb") as f:
1599             f.write(b"abc2")
1600
1601         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC"), "wb") as f:
1602             f.write(b"abc3")
1603
1604         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1605         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1606         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1607
1608         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1609                                                   disk_cache_dir=self.disk_cache_dir)
1610
1611         # The tmp still hasn't been deleted because it was created in the last 60 seconds
1612         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1613         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1614         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1615
1616         # Set the mtime to 61s in the past
1617         os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock"), times=(time.time()-61, time.time()-61))
1618         os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC"), times=(time.time()-61, time.time()-61))
1619         os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC"), times=(time.time()-61, time.time()-61))
1620
1621         block_cache2 = arvados.keep.KeepBlockCache(disk_cache=True,
1622                                                    disk_cache_dir=self.disk_cache_dir)
1623
1624         # Tmp should be gone but the other ones are safe.
1625         self.assertFalse(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1626         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1627         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1628
1629
1630     @mock.patch('arvados.KeepClient.KeepService.get')
1631     def test_disk_cache_cap(self, get_mock):
1632         # confirm that the cache is kept to the desired limit
1633
1634         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1635         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1636             f.write(self.data)
1637
1638         os.makedirs(os.path.join(self.disk_cache_dir, "acb"))
1639         with open(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock"), "wb") as f:
1640             f.write(b"foo")
1641
1642         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1643         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1644
1645         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1646                                                    disk_cache_dir=self.disk_cache_dir,
1647                                                    max_slots=1)
1648
1649         self.assertFalse(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1650         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1651
1652
1653     @mock.patch('arvados.KeepClient.KeepService.get')
1654     def test_disk_cache_share(self, get_mock):
1655         # confirm that a second cache doesn't delete files that belong to the first cache.
1656
1657         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1658         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1659             f.write(self.data)
1660
1661         os.makedirs(os.path.join(self.disk_cache_dir, "acb"))
1662         with open(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock"), "wb") as f:
1663             f.write(b"foo")
1664
1665         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1666         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1667
1668         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1669                                                    disk_cache_dir=self.disk_cache_dir,
1670                                                    max_slots=2)
1671
1672         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1673         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1674
1675
1676         block_cache2 = arvados.keep.KeepBlockCache(disk_cache=True,
1677                                                    disk_cache_dir=self.disk_cache_dir,
1678                                                    max_slots=1)
1679
1680         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1681         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))