15397: Remove deprecated arvados.Keep class.
[arvados.git] / sdk / python / tests / test_keep_client.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import hashlib
6 import os
7 import errno
8 import pycurl
9 import random
10 import re
11 import shutil
12 import socket
13 import sys
14 import stat
15 import tempfile
16 import time
17 import unittest
18 import urllib.parse
19 import mmap
20
21 from unittest import mock
22 from unittest.mock import patch
23
24 import parameterized
25
26 import arvados
27 import arvados.retry
28 import arvados.util
29 from . import arvados_testutil as tutil
30 from . import keepstub
31 from . import run_test_server
32
33 from .arvados_testutil import DiskCacheBase
34
35 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
36 class KeepTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
37     disk_cache = False
38     MAIN_SERVER = {}
39     KEEP_SERVER = {}
40     block_cache_test = None
41
42     @classmethod
43     def setUpClass(cls):
44         super(KeepTestCase, cls).setUpClass()
45         run_test_server.authorize_with("admin")
46         cls.api_client = arvados.api('v1')
47         cls.block_cache_test = DiskCacheBase()
48         cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
49                                              proxy='', local_store='',
50                                              block_cache=cls.block_cache_test.make_block_cache(cls.disk_cache))
51
52     @classmethod
53     def tearDownClass(cls):
54         super(KeepTestCase, cls).setUpClass()
55         cls.block_cache_test.tearDown()
56
57     def test_KeepBasicRWTest(self):
58         self.assertEqual(0, self.keep_client.upload_counter.get())
59         foo_locator = self.keep_client.put('foo')
60         self.assertRegex(
61             foo_locator,
62             r'^acbd18db4cc2f85cedef654fccc4a4d8\+3',
63             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
64
65         # 6 bytes because uploaded 2 copies
66         self.assertEqual(6, self.keep_client.upload_counter.get())
67
68         self.assertEqual(0, self.keep_client.download_counter.get())
69         self.assertTrue(tutil.binary_compare(self.keep_client.get(foo_locator),
70                          b'foo'),
71                          'wrong content from Keep.get(md5("foo"))')
72         self.assertEqual(3, self.keep_client.download_counter.get())
73
74     def test_KeepBinaryRWTest(self):
75         blob_str = b'\xff\xfe\xf7\x00\x01\x02'
76         blob_locator = self.keep_client.put(blob_str)
77         self.assertRegex(
78             blob_locator,
79             r'^7fc7c53b45e53926ba52821140fef396\+6',
80             ('wrong locator from Keep.put(<binarydata>):' + blob_locator))
81         self.assertEqual(self.keep_client.get(blob_locator),
82                          blob_str,
83                          'wrong content from Keep.get(md5(<binarydata>))')
84
85     def test_KeepLongBinaryRWTest(self):
86         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
87         for i in range(0, 23):
88             blob_data = blob_data + blob_data
89         blob_locator = self.keep_client.put(blob_data)
90         self.assertRegex(
91             blob_locator,
92             r'^84d90fc0d8175dd5dcfab04b999bc956\+67108864',
93             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
94         self.assertEqual(self.keep_client.get(blob_locator),
95                          blob_data,
96                          'wrong content from Keep.get(md5(<binarydata>))')
97
98     @unittest.skip("unreliable test - please fix and close #8752")
99     def test_KeepSingleCopyRWTest(self):
100         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
101         blob_locator = self.keep_client.put(blob_data, copies=1)
102         self.assertRegex(
103             blob_locator,
104             r'^c902006bc98a3eb4a3663b65ab4a6fab\+8',
105             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
106         self.assertEqual(self.keep_client.get(blob_locator),
107                          blob_data,
108                          'wrong content from Keep.get(md5(<binarydata>))')
109
110     def test_KeepEmptyCollectionTest(self):
111         blob_locator = self.keep_client.put('', copies=1)
112         self.assertRegex(
113             blob_locator,
114             r'^d41d8cd98f00b204e9800998ecf8427e\+0',
115             ('wrong locator from Keep.put(""): ' + blob_locator))
116
117     def test_KeepPutDataType(self):
118         with self.assertRaises(AttributeError):
119             # Must be bytes or have an encode() method
120             self.keep_client.put({})
121
122     def test_KeepHeadTest(self):
123         locator = self.keep_client.put('test_head')
124         self.assertRegex(
125             locator,
126             r'^b9a772c7049325feb7130fff1f8333e9\+9',
127             'wrong md5 hash from Keep.put for "test_head": ' + locator)
128         self.assertEqual(True, self.keep_client.head(locator))
129         self.assertEqual(self.keep_client.get(locator),
130                          b'test_head',
131                          'wrong content from Keep.get for "test_head"')
132
133 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
134 class KeepPermissionTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
135     disk_cache = False
136     MAIN_SERVER = {}
137     KEEP_SERVER = {'blob_signing': True}
138
139     def tearDown(self):
140         DiskCacheBase.tearDown(self)
141
142     def test_KeepBasicRWTest(self):
143         run_test_server.authorize_with('active')
144         keep_client = arvados.KeepClient(block_cache=self.make_block_cache(self.disk_cache))
145         foo_locator = keep_client.put('foo')
146         self.assertRegex(
147             foo_locator,
148             r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
149             'invalid locator from Keep.put("foo"): ' + foo_locator)
150         self.assertEqual(keep_client.get(foo_locator),
151                          b'foo',
152                          'wrong content from Keep.get(md5("foo"))')
153
154         # GET with an unsigned locator => bad request
155         bar_locator = keep_client.put('bar')
156         unsigned_bar_locator = "37b51d194a7513e45b56f6524f2d51f2+3"
157         self.assertRegex(
158             bar_locator,
159             r'^37b51d194a7513e45b56f6524f2d51f2\+3\+A[a-f0-9]+@[a-f0-9]+$',
160             'invalid locator from Keep.put("bar"): ' + bar_locator)
161         self.assertRaises(arvados.errors.KeepReadError,
162                           keep_client.get,
163                           unsigned_bar_locator)
164
165         # GET from a different user => bad request
166         run_test_server.authorize_with('spectator')
167         keep_client2 = arvados.KeepClient(block_cache=self.make_block_cache(self.disk_cache))
168         self.assertRaises(arvados.errors.KeepReadError,
169                           keep_client2.get,
170                           bar_locator)
171
172         # Unauthenticated GET for a signed locator => bad request
173         # Unauthenticated GET for an unsigned locator => bad request
174         keep_client.api_token = ''
175         self.assertRaises(arvados.errors.KeepReadError,
176                           keep_client.get,
177                           bar_locator)
178         self.assertRaises(arvados.errors.KeepReadError,
179                           keep_client.get,
180                           unsigned_bar_locator)
181
182 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
183 class KeepProxyTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
184     disk_cache = False
185     MAIN_SERVER = {}
186     KEEP_SERVER = {}
187     KEEP_PROXY_SERVER = {}
188
189     @classmethod
190     def setUpClass(cls):
191         super(KeepProxyTestCase, cls).setUpClass()
192         run_test_server.authorize_with('active')
193         cls.api_client = arvados.api('v1')
194
195     def tearDown(self):
196         super(KeepProxyTestCase, self).tearDown()
197         DiskCacheBase.tearDown(self)
198
199     def test_KeepProxyTest1(self):
200         # Will use ARVADOS_KEEP_SERVICES environment variable that
201         # is set by setUpClass().
202         keep_client = arvados.KeepClient(api_client=self.api_client,
203                                          local_store='', block_cache=self.make_block_cache(self.disk_cache))
204         baz_locator = keep_client.put('baz')
205         self.assertRegex(
206             baz_locator,
207             r'^73feffa4b7f6bb68e44cf984c85f6e88\+3',
208             'wrong md5 hash from Keep.put("baz"): ' + baz_locator)
209         self.assertEqual(keep_client.get(baz_locator),
210                          b'baz',
211                          'wrong content from Keep.get(md5("baz"))')
212         self.assertTrue(keep_client.using_proxy)
213
214     def test_KeepProxyTestMultipleURIs(self):
215         # Test using ARVADOS_KEEP_SERVICES env var overriding any
216         # existing proxy setting and setting multiple proxies
217         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
218         keep_client = arvados.KeepClient(api_client=self.api_client,
219                                          local_store='',
220                                          block_cache=self.make_block_cache(self.disk_cache))
221         uris = [x['_service_root'] for x in keep_client._keep_services]
222         self.assertEqual(uris, ['http://10.0.0.1/',
223                                 'https://foo.example.org:1234/'])
224
225     def test_KeepProxyTestInvalidURI(self):
226         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org'
227         with self.assertRaises(arvados.errors.ArgumentError):
228             keep_client = arvados.KeepClient(api_client=self.api_client,
229                                              local_store='',
230                                              block_cache=self.make_block_cache(self.disk_cache))
231
232 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
233 class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
234     disk_cache = False
235
236     def tearDown(self):
237         DiskCacheBase.tearDown(self)
238
239     def get_service_roots(self, api_client):
240         keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
241         services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
242         return [urllib.parse.urlparse(url) for url in sorted(services)]
243
244     def test_ssl_flag_respected_in_roots(self):
245         for ssl_flag in [False, True]:
246             services = self.get_service_roots(self.mock_keep_services(
247                 service_ssl_flag=ssl_flag))
248             self.assertEqual(
249                 ('https' if ssl_flag else 'http'), services[0].scheme)
250
251     def test_correct_ports_with_ipv6_addresses(self):
252         service = self.get_service_roots(self.mock_keep_services(
253             service_type='proxy', service_host='100::1', service_port=10, count=1))[0]
254         self.assertEqual('100::1', service.hostname)
255         self.assertEqual(10, service.port)
256
257     def test_recognize_proxy_services_in_controller_response(self):
258         keep_client = arvados.KeepClient(api_client=self.mock_keep_services(
259             service_type='proxy', service_host='localhost', service_port=9, count=1),
260                                          block_cache=self.make_block_cache(self.disk_cache))
261         try:
262             # this will fail, but it ensures we get the service
263             # discovery response
264             keep_client.put('baz2', num_retries=0)
265         except:
266             pass
267         self.assertTrue(keep_client.using_proxy)
268
269     def test_insecure_disables_tls_verify(self):
270         api_client = self.mock_keep_services(count=1)
271         force_timeout = socket.timeout("timed out")
272
273         api_client.insecure = True
274         with tutil.mock_keep_responses(b'foo', 200) as mock:
275             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
276             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
277             self.assertEqual(
278                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
279                 0)
280             self.assertEqual(
281                 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
282                 0)
283
284         api_client.insecure = False
285         with tutil.mock_keep_responses(b'foo', 200) as mock:
286             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
287             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
288             # getopt()==None here means we didn't change the
289             # default. If we were using real pycurl instead of a mock,
290             # it would return the default value 1.
291             self.assertEqual(
292                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
293                 None)
294             self.assertEqual(
295                 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
296                 None)
297
298     def test_refresh_signature(self):
299         blk_digest = '6f5902ac237024bdd0c176cb93063dc4+11'
300         blk_sig = 'da39a3ee5e6b4b0d3255bfef95601890afd80709@53bed294'
301         local_loc = blk_digest+'+A'+blk_sig
302         remote_loc = blk_digest+'+R'+blk_sig
303         api_client = self.mock_keep_services(count=1)
304         headers = {'X-Keep-Locator':local_loc}
305         with tutil.mock_keep_responses('', 200, **headers):
306             # Check that the translated locator gets returned
307             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
308             self.assertEqual(local_loc, keep_client.refresh_signature(remote_loc))
309             # Check that refresh_signature() uses the correct method and headers
310             keep_client._get_or_head = mock.MagicMock()
311             keep_client.refresh_signature(remote_loc)
312             args, kwargs = keep_client._get_or_head.call_args_list[0]
313             self.assertIn(remote_loc, args)
314             self.assertEqual("HEAD", kwargs['method'])
315             self.assertIn('X-Keep-Signature', kwargs['headers'])
316
317     # test_*_timeout verify that KeepClient instructs pycurl to use
318     # the appropriate connection and read timeouts. They don't care
319     # whether pycurl actually exhibits the expected timeout behavior
320     # -- those tests are in the KeepClientTimeout test class.
321
322     def test_get_timeout(self):
323         api_client = self.mock_keep_services(count=1)
324         force_timeout = socket.timeout("timed out")
325         with tutil.mock_keep_responses(force_timeout, 0) as mock:
326             keep_client = arvados.KeepClient(
327                 api_client=api_client,
328                 block_cache=self.make_block_cache(self.disk_cache),
329                 num_retries=0,
330             )
331             with self.assertRaises(arvados.errors.KeepReadError):
332                 keep_client.get('ffffffffffffffffffffffffffffffff')
333             self.assertEqual(
334                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
335                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
336             self.assertEqual(
337                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
338                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
339             self.assertEqual(
340                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
341                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
342
343     def test_put_timeout(self):
344         api_client = self.mock_keep_services(count=1)
345         force_timeout = socket.timeout("timed out")
346         with tutil.mock_keep_responses(force_timeout, 0) as mock:
347             keep_client = arvados.KeepClient(
348                 api_client=api_client,
349                 block_cache=self.make_block_cache(self.disk_cache),
350                 num_retries=0,
351             )
352             with self.assertRaises(arvados.errors.KeepWriteError):
353                 keep_client.put(b'foo')
354             self.assertEqual(
355                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
356                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
357             self.assertEqual(
358                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
359                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
360             self.assertEqual(
361                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
362                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
363
364     def test_head_timeout(self):
365         api_client = self.mock_keep_services(count=1)
366         force_timeout = socket.timeout("timed out")
367         with tutil.mock_keep_responses(force_timeout, 0) as mock:
368             keep_client = arvados.KeepClient(
369                 api_client=api_client,
370                 block_cache=self.make_block_cache(self.disk_cache),
371                 num_retries=0,
372             )
373             with self.assertRaises(arvados.errors.KeepReadError):
374                 keep_client.head('ffffffffffffffffffffffffffffffff')
375             self.assertEqual(
376                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
377                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
378             self.assertEqual(
379                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
380                 None)
381             self.assertEqual(
382                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
383                 None)
384
385     def test_proxy_get_timeout(self):
386         api_client = self.mock_keep_services(service_type='proxy', count=1)
387         force_timeout = socket.timeout("timed out")
388         with tutil.mock_keep_responses(force_timeout, 0) as mock:
389             keep_client = arvados.KeepClient(
390                 api_client=api_client,
391                 block_cache=self.make_block_cache(self.disk_cache),
392                 num_retries=0,
393             )
394             with self.assertRaises(arvados.errors.KeepReadError):
395                 keep_client.get('ffffffffffffffffffffffffffffffff')
396             self.assertEqual(
397                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
398                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
399             self.assertEqual(
400                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
401                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
402             self.assertEqual(
403                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
404                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
405
406     def test_proxy_head_timeout(self):
407         api_client = self.mock_keep_services(service_type='proxy', count=1)
408         force_timeout = socket.timeout("timed out")
409         with tutil.mock_keep_responses(force_timeout, 0) as mock:
410             keep_client = arvados.KeepClient(
411                 api_client=api_client,
412                 block_cache=self.make_block_cache(self.disk_cache),
413                 num_retries=0,
414             )
415             with self.assertRaises(arvados.errors.KeepReadError):
416                 keep_client.head('ffffffffffffffffffffffffffffffff')
417             self.assertEqual(
418                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
419                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
420             self.assertEqual(
421                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
422                 None)
423             self.assertEqual(
424                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
425                 None)
426
427     def test_proxy_put_timeout(self):
428         self.disk_cache_dir = None
429         api_client = self.mock_keep_services(service_type='proxy', count=1)
430         force_timeout = socket.timeout("timed out")
431         with tutil.mock_keep_responses(force_timeout, 0) as mock:
432             keep_client = arvados.KeepClient(
433                 api_client=api_client,
434                 num_retries=0,
435             )
436             with self.assertRaises(arvados.errors.KeepWriteError):
437                 keep_client.put('foo')
438             self.assertEqual(
439                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
440                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
441             self.assertEqual(
442                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
443                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
444             self.assertEqual(
445                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
446                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
447
448     def check_no_services_error(self, verb, exc_class):
449         api_client = mock.MagicMock(name='api_client')
450         api_client.keep_services().accessible().execute.side_effect = (
451             arvados.errors.ApiError)
452         keep_client = arvados.KeepClient(
453             api_client=api_client,
454             block_cache=self.make_block_cache(self.disk_cache),
455             num_retries=0,
456         )
457         with self.assertRaises(exc_class) as err_check:
458             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
459         self.assertEqual(0, len(err_check.exception.request_errors()))
460
461     def test_get_error_with_no_services(self):
462         self.check_no_services_error('get', arvados.errors.KeepReadError)
463
464     def test_head_error_with_no_services(self):
465         self.check_no_services_error('head', arvados.errors.KeepReadError)
466
467     def test_put_error_with_no_services(self):
468         self.check_no_services_error('put', arvados.errors.KeepWriteError)
469
470     def check_errors_from_last_retry(self, verb, exc_class):
471         api_client = self.mock_keep_services(count=2)
472         req_mock = tutil.mock_keep_responses(
473             "retry error reporting test", 500, 500, 500, 500, 500, 500, 502, 502)
474         with req_mock, tutil.skip_sleep, \
475                 self.assertRaises(exc_class) as err_check:
476             keep_client = arvados.KeepClient(
477                 api_client=api_client,
478                 block_cache=self.make_block_cache(self.disk_cache),
479                 num_retries=0,
480             )
481             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
482                                        num_retries=3)
483         self.assertEqual([502, 502], [
484                 getattr(error, 'status_code', None)
485                 for error in err_check.exception.request_errors().values()])
486         self.assertRegex(str(err_check.exception), r'failed to (read|write) .* after 4 attempts')
487
488     def test_get_error_reflects_last_retry(self):
489         self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
490
491     def test_head_error_reflects_last_retry(self):
492         self.check_errors_from_last_retry('head', arvados.errors.KeepReadError)
493
494     def test_put_error_reflects_last_retry(self):
495         self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
496
497     def test_put_error_does_not_include_successful_puts(self):
498         data = 'partial failure test'
499         data_loc = tutil.str_keep_locator(data)
500         api_client = self.mock_keep_services(count=3)
501         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
502                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
503             keep_client = arvados.KeepClient(
504                 api_client=api_client,
505                 block_cache=self.make_block_cache(self.disk_cache),
506                 num_retries=0,
507             )
508             keep_client.put(data)
509         self.assertEqual(2, len(exc_check.exception.request_errors()))
510
511     def test_proxy_put_with_no_writable_services(self):
512         data = 'test with no writable services'
513         data_loc = tutil.str_keep_locator(data)
514         api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
515         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
516                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
517             keep_client = arvados.KeepClient(
518                 api_client=api_client,
519                 block_cache=self.make_block_cache(self.disk_cache),
520                 num_retries=0,
521             )
522             keep_client.put(data)
523         self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
524         self.assertEqual(0, len(exc_check.exception.request_errors()))
525
526     def test_oddball_service_get(self):
527         body = b'oddball service get'
528         api_client = self.mock_keep_services(service_type='fancynewblobstore')
529         with tutil.mock_keep_responses(body, 200):
530             keep_client = arvados.KeepClient(
531                 api_client=api_client,
532                 block_cache=self.make_block_cache(self.disk_cache),
533                 num_retries=0,
534             )
535             actual = keep_client.get(tutil.str_keep_locator(body))
536         self.assertEqual(body, actual)
537
538     def test_oddball_service_put(self):
539         body = b'oddball service put'
540         pdh = tutil.str_keep_locator(body)
541         api_client = self.mock_keep_services(service_type='fancynewblobstore')
542         with tutil.mock_keep_responses(pdh, 200):
543             keep_client = arvados.KeepClient(
544                 api_client=api_client,
545                 block_cache=self.make_block_cache(self.disk_cache),
546                 num_retries=0,
547             )
548             actual = keep_client.put(body, copies=1)
549         self.assertEqual(pdh, actual)
550
551     def test_oddball_service_writer_count(self):
552         body = b'oddball service writer count'
553         pdh = tutil.str_keep_locator(body)
554         api_client = self.mock_keep_services(service_type='fancynewblobstore',
555                                              count=4)
556         headers = {'x-keep-replicas-stored': 3}
557         with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
558                                        **headers) as req_mock:
559             keep_client = arvados.KeepClient(
560                 api_client=api_client,
561                 block_cache=self.make_block_cache(self.disk_cache),
562                 num_retries=0,
563             )
564             actual = keep_client.put(body, copies=2)
565         self.assertEqual(pdh, actual)
566         self.assertEqual(1, req_mock.call_count)
567
568 @tutil.skip_sleep
569 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
570 class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
571     disk_cache = False
572
573     def setUp(self):
574         self.api_client = self.mock_keep_services(count=2)
575         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
576         self.data = b'xyzzy'
577         self.locator = '1271ed5ef305aadabc605b1609e24c52'
578
579     def tearDown(self):
580         DiskCacheBase.tearDown(self)
581
582     @mock.patch('arvados.KeepClient.KeepService.get')
583     def test_get_request_cache(self, get_mock):
584         with tutil.mock_keep_responses(self.data, 200, 200):
585             self.keep_client.get(self.locator)
586             self.keep_client.get(self.locator)
587         # Request already cached, don't require more than one request
588         get_mock.assert_called_once()
589
590     @mock.patch('arvados.KeepClient.KeepService.get')
591     def test_head_request_cache(self, get_mock):
592         with tutil.mock_keep_responses(self.data, 200, 200):
593             self.keep_client.head(self.locator)
594             self.keep_client.head(self.locator)
595         # Don't cache HEAD requests so that they're not confused with GET reqs
596         self.assertEqual(2, get_mock.call_count)
597
598     @mock.patch('arvados.KeepClient.KeepService.get')
599     def test_head_and_then_get_return_different_responses(self, get_mock):
600         head_resp = None
601         get_resp = None
602         get_mock.side_effect = [b'first response', b'second response']
603         with tutil.mock_keep_responses(self.data, 200, 200):
604             head_resp = self.keep_client.head(self.locator)
605             get_resp = self.keep_client.get(self.locator)
606         self.assertEqual(b'first response', head_resp)
607         # First reponse was not cached because it was from a HEAD request.
608         self.assertNotEqual(head_resp, get_resp)
609
610
611
612
613
614 @tutil.skip_sleep
615 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
616 class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
617     disk_cache = False
618
619     def setUp(self):
620         self.api_client = self.mock_keep_services(count=2)
621         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
622         self.data = b'xyzzy'
623         self.locator = '1271ed5ef305aadabc605b1609e24c52'
624         self.test_id = arvados.util.new_request_id()
625         self.assertRegex(self.test_id, r'^req-[a-z0-9]{20}$')
626         # If we don't set request_id to None explicitly here, it will
627         # return <MagicMock name='api_client_mock.request_id'
628         # id='123456789'>:
629         self.api_client.request_id = None
630
631     def tearDown(self):
632         DiskCacheBase.tearDown(self)
633
634     def test_default_to_api_client_request_id(self):
635         self.api_client.request_id = self.test_id
636         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
637             self.keep_client.put(self.data)
638         self.assertEqual(2, len(mock.responses))
639         for resp in mock.responses:
640             self.assertProvidedRequestId(resp)
641
642         with tutil.mock_keep_responses(self.data, 200) as mock:
643             self.keep_client.get(self.locator)
644         self.assertProvidedRequestId(mock.responses[0])
645
646         with tutil.mock_keep_responses(b'', 200) as mock:
647             self.keep_client.head(self.locator)
648         self.assertProvidedRequestId(mock.responses[0])
649
650     def test_explicit_request_id(self):
651         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
652             self.keep_client.put(self.data, request_id=self.test_id)
653         self.assertEqual(2, len(mock.responses))
654         for resp in mock.responses:
655             self.assertProvidedRequestId(resp)
656
657         with tutil.mock_keep_responses(self.data, 200) as mock:
658             self.keep_client.get(self.locator, request_id=self.test_id)
659         self.assertProvidedRequestId(mock.responses[0])
660
661         with tutil.mock_keep_responses(b'', 200) as mock:
662             self.keep_client.head(self.locator, request_id=self.test_id)
663         self.assertProvidedRequestId(mock.responses[0])
664
665     def test_automatic_request_id(self):
666         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
667             self.keep_client.put(self.data)
668         self.assertEqual(2, len(mock.responses))
669         for resp in mock.responses:
670             self.assertAutomaticRequestId(resp)
671
672         with tutil.mock_keep_responses(self.data, 200) as mock:
673             self.keep_client.get(self.locator)
674         self.assertAutomaticRequestId(mock.responses[0])
675
676         with tutil.mock_keep_responses(b'', 200) as mock:
677             self.keep_client.head(self.locator)
678         self.assertAutomaticRequestId(mock.responses[0])
679
680     def test_request_id_in_exception(self):
681         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
682             with self.assertRaisesRegex(arvados.errors.KeepReadError, self.test_id):
683                 self.keep_client.head(self.locator, request_id=self.test_id)
684
685         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
686             with self.assertRaisesRegex(arvados.errors.KeepReadError, r'req-[a-z0-9]{20}'):
687                 self.keep_client.get(self.locator)
688
689         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
690             with self.assertRaisesRegex(arvados.errors.KeepWriteError, self.test_id):
691                 self.keep_client.put(self.data, request_id=self.test_id)
692
693         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
694             with self.assertRaisesRegex(arvados.errors.KeepWriteError, r'req-[a-z0-9]{20}'):
695                 self.keep_client.put(self.data)
696
697     def assertAutomaticRequestId(self, resp):
698         hdr = [x for x in resp.getopt(pycurl.HTTPHEADER)
699                if x.startswith('X-Request-Id: ')][0]
700         self.assertNotEqual(hdr, 'X-Request-Id: '+self.test_id)
701         self.assertRegex(hdr, r'^X-Request-Id: req-[a-z0-9]{20}$')
702
703     def assertProvidedRequestId(self, resp):
704         self.assertIn('X-Request-Id: '+self.test_id,
705                       resp.getopt(pycurl.HTTPHEADER))
706
707
708 @tutil.skip_sleep
709 #@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
710 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
711     disk_cache = False
712
713     def setUp(self):
714         # expected_order[i] is the probe order for
715         # hash=md5(sprintf("%064x",i)) where there are 16 services
716         # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
717         # the first probe for the block consisting of 64 "0"
718         # characters is the service whose uuid is
719         # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
720         self.services = 16
721         self.expected_order = [
722             list('3eab2d5fc9681074'),
723             list('097dba52e648f1c3'),
724             list('c5b4e023f8a7d691'),
725             list('9d81c02e76a3bf54'),
726             ]
727         self.blocks = [
728             "{:064x}".format(x).encode()
729             for x in range(len(self.expected_order))]
730         self.hashes = [
731             hashlib.md5(self.blocks[x]).hexdigest()
732             for x in range(len(self.expected_order))]
733         self.api_client = self.mock_keep_services(count=self.services)
734         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
735
736     def tearDown(self):
737         DiskCacheBase.tearDown(self)
738
739     def test_weighted_service_roots_against_reference_set(self):
740         # Confirm weighted_service_roots() returns the correct order
741         for i, hash in enumerate(self.hashes):
742             roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash))
743             got_order = [
744                 re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
745                 for root in roots]
746             self.assertEqual(self.expected_order[i], got_order)
747
748     def test_get_probe_order_against_reference_set(self):
749         self._test_probe_order_against_reference_set(
750             lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
751
752     def test_head_probe_order_against_reference_set(self):
753         self._test_probe_order_against_reference_set(
754             lambda i: self.keep_client.head(self.hashes[i], num_retries=1))
755
756     def test_put_probe_order_against_reference_set(self):
757         # copies=1 prevents the test from being sensitive to races
758         # between writer threads.
759         self._test_probe_order_against_reference_set(
760             lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1))
761
762     def _test_probe_order_against_reference_set(self, op):
763         for i in range(len(self.blocks)):
764             with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \
765                  self.assertRaises(arvados.errors.KeepRequestError):
766                 op(i)
767             got_order = [
768                 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
769                 for resp in mock.responses]
770             self.assertEqual(self.expected_order[i]*2, got_order)
771
772     def test_put_probe_order_multiple_copies(self):
773         for copies in range(2, 4):
774             for i in range(len(self.blocks)):
775                 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \
776                      self.assertRaises(arvados.errors.KeepWriteError):
777                     self.keep_client.put(self.blocks[i], num_retries=2, copies=copies)
778                 got_order = [
779                     re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
780                     for resp in mock.responses]
781                 # With T threads racing to make requests, the position
782                 # of a given server in the sequence of HTTP requests
783                 # (got_order) cannot be more than T-1 positions
784                 # earlier than that server's position in the reference
785                 # probe sequence (expected_order).
786                 #
787                 # Loop invariant: we have accounted for +pos+ expected
788                 # probes, either by seeing them in +got_order+ or by
789                 # putting them in +pending+ in the hope of seeing them
790                 # later. As long as +len(pending)<T+, we haven't
791                 # started a request too early.
792                 pending = []
793                 for pos, expected in enumerate(self.expected_order[i]*3):
794                     got = got_order[pos-len(pending)]
795                     while got in pending:
796                         del pending[pending.index(got)]
797                         got = got_order[pos-len(pending)]
798                     if got != expected:
799                         pending.append(expected)
800                         self.assertLess(
801                             len(pending), copies,
802                             "pending={}, with copies={}, got {}, expected {}".format(
803                                 pending, copies, repr(got_order), repr(self.expected_order[i]*3)))
804
805     def test_probe_waste_adding_one_server(self):
806         hashes = [
807             hashlib.md5("{:064x}".format(x).encode()).hexdigest() for x in range(100)]
808         initial_services = 12
809         self.api_client = self.mock_keep_services(count=initial_services)
810         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
811         probes_before = [
812             self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
813         for added_services in range(1, 12):
814             api_client = self.mock_keep_services(count=initial_services+added_services)
815             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
816             total_penalty = 0
817             for hash_index in range(len(hashes)):
818                 probe_after = keep_client.weighted_service_roots(
819                     arvados.KeepLocator(hashes[hash_index]))
820                 penalty = probe_after.index(probes_before[hash_index][0])
821                 self.assertLessEqual(penalty, added_services)
822                 total_penalty += penalty
823             # Average penalty per block should not exceed
824             # N(added)/N(orig) by more than 20%, and should get closer
825             # to the ideal as we add data points.
826             expect_penalty = (
827                 added_services *
828                 len(hashes) / initial_services)
829             max_penalty = (
830                 expect_penalty *
831                 (120 - added_services)/100)
832             min_penalty = (
833                 expect_penalty * 8/10)
834             self.assertTrue(
835                 min_penalty <= total_penalty <= max_penalty,
836                 "With {}+{} services, {} blocks, penalty {} but expected {}..{}".format(
837                     initial_services,
838                     added_services,
839                     len(hashes),
840                     total_penalty,
841                     min_penalty,
842                     max_penalty))
843
844     def check_64_zeros_error_order(self, verb, exc_class):
845         data = b'0' * 64
846         if verb == 'get':
847             data = tutil.str_keep_locator(data)
848         # Arbitrary port number:
849         aport = random.randint(1024,65535)
850         api_client = self.mock_keep_services(service_port=aport, count=self.services)
851         keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
852         with mock.patch('pycurl.Curl') as curl_mock, \
853              self.assertRaises(exc_class) as err_check:
854             curl_mock.return_value = tutil.FakeCurl.make(code=500, body=b'')
855             getattr(keep_client, verb)(data)
856         urls = [urllib.parse.urlparse(url)
857                 for url in err_check.exception.request_errors()]
858         self.assertEqual([('keep0x' + c, aport) for c in '3eab2d5fc9681074'],
859                          [(url.hostname, url.port) for url in urls])
860
861     def test_get_error_shows_probe_order(self):
862         self.check_64_zeros_error_order('get', arvados.errors.KeepReadError)
863
864     def test_put_error_shows_probe_order(self):
865         self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
866
867 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
868 class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase, DiskCacheBase):
869     disk_cache = False
870
871     # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
872     # 1s worth of data and then trigger bandwidth errors before running
873     # out of data.
874     DATA = b'x'*2**11
875     BANDWIDTH_LOW_LIM = 1024
876     TIMEOUT_TIME = 1.0
877
878     def tearDown(self):
879         DiskCacheBase.tearDown(self)
880
881     class assertTakesBetween(unittest.TestCase):
882         def __init__(self, tmin, tmax):
883             self.tmin = tmin
884             self.tmax = tmax
885
886         def __enter__(self):
887             self.t0 = time.time()
888
889         def __exit__(self, *args, **kwargs):
890             # Round times to milliseconds, like CURL. Otherwise, we
891             # fail when CURL reaches a 1s timeout at 0.9998s.
892             delta = round(time.time() - self.t0, 3)
893             self.assertGreaterEqual(delta, self.tmin)
894             self.assertLessEqual(delta, self.tmax)
895
896     class assertTakesGreater(unittest.TestCase):
897         def __init__(self, tmin):
898             self.tmin = tmin
899
900         def __enter__(self):
901             self.t0 = time.time()
902
903         def __exit__(self, *args, **kwargs):
904             delta = round(time.time() - self.t0, 3)
905             self.assertGreaterEqual(delta, self.tmin)
906
907     def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
908         return arvados.KeepClient(
909             api_client=self.api_client,
910             timeout=timeouts, block_cache=self.make_block_cache(self.disk_cache))
911
912     def test_timeout_slow_connect(self):
913         # Can't simulate TCP delays with our own socket. Leave our
914         # stub server running uselessly, and try to connect to an
915         # unroutable IP address instead.
916         self.api_client = self.mock_keep_services(
917             count=1,
918             service_host='240.0.0.0',
919         )
920         with self.assertTakesBetween(0.1, 0.5):
921             with self.assertRaises(arvados.errors.KeepWriteError):
922                 self.keepClient().put(self.DATA, copies=1, num_retries=0)
923
924     def test_low_bandwidth_no_delays_success(self):
925         self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
926         kc = self.keepClient()
927         loc = kc.put(self.DATA, copies=1, num_retries=0)
928         self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
929
930     def test_too_low_bandwidth_no_delays_failure(self):
931         # Check that lessening bandwidth corresponds to failing
932         kc = self.keepClient()
933         loc = kc.put(self.DATA, copies=1, num_retries=0)
934         self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
935         with self.assertTakesGreater(self.TIMEOUT_TIME):
936             with self.assertRaises(arvados.errors.KeepReadError):
937                 kc.get(loc, num_retries=0)
938         with self.assertTakesGreater(self.TIMEOUT_TIME):
939             with self.assertRaises(arvados.errors.KeepWriteError):
940                 kc.put(self.DATA, copies=1, num_retries=0)
941
942     def test_low_bandwidth_with_server_response_delay_failure(self):
943         kc = self.keepClient()
944         loc = kc.put(self.DATA, copies=1, num_retries=0)
945         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
946         # Note the actual delay must be 1s longer than the low speed
947         # limit interval in order for curl to detect it reliably.
948         self.server.setdelays(response=self.TIMEOUT_TIME+1)
949         with self.assertTakesGreater(self.TIMEOUT_TIME):
950             with self.assertRaises(arvados.errors.KeepReadError):
951                 kc.get(loc, num_retries=0)
952         with self.assertTakesGreater(self.TIMEOUT_TIME):
953             with self.assertRaises(arvados.errors.KeepWriteError):
954                 kc.put(self.DATA, copies=1, num_retries=0)
955         with self.assertTakesGreater(self.TIMEOUT_TIME):
956             kc.head(loc, num_retries=0)
957
958     def test_low_bandwidth_with_server_mid_delay_failure(self):
959         kc = self.keepClient()
960         loc = kc.put(self.DATA, copies=1, num_retries=0)
961         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
962         # Note the actual delay must be 1s longer than the low speed
963         # limit interval in order for curl to detect it reliably.
964         self.server.setdelays(mid_write=self.TIMEOUT_TIME+1, mid_read=self.TIMEOUT_TIME+1)
965         with self.assertTakesGreater(self.TIMEOUT_TIME):
966             with self.assertRaises(arvados.errors.KeepReadError) as e:
967                 kc.get(loc, num_retries=0)
968         with self.assertTakesGreater(self.TIMEOUT_TIME):
969             with self.assertRaises(arvados.errors.KeepWriteError):
970                 kc.put(self.DATA, copies=1, num_retries=0)
971
972     def test_timeout_slow_request(self):
973         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
974         self.server.setdelays(request=.2)
975         self._test_connect_timeout_under_200ms(loc)
976         self.server.setdelays(request=2)
977         self._test_response_timeout_under_2s(loc)
978
979     def test_timeout_slow_response(self):
980         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
981         self.server.setdelays(response=.2)
982         self._test_connect_timeout_under_200ms(loc)
983         self.server.setdelays(response=2)
984         self._test_response_timeout_under_2s(loc)
985
986     def test_timeout_slow_response_body(self):
987         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
988         self.server.setdelays(response_body=.2)
989         self._test_connect_timeout_under_200ms(loc)
990         self.server.setdelays(response_body=2)
991         self._test_response_timeout_under_2s(loc)
992
993     def _test_connect_timeout_under_200ms(self, loc):
994         # Allow 100ms to connect, then 1s for response. Everything
995         # should work, and everything should take at least 200ms to
996         # return.
997         kc = self.keepClient(timeouts=(.1, 1))
998         with self.assertTakesBetween(.2, .3):
999             kc.put(self.DATA, copies=1, num_retries=0)
1000         with self.assertTakesBetween(.2, .3):
1001             self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
1002
1003     def _test_response_timeout_under_2s(self, loc):
1004         # Allow 10s to connect, then 1s for response. Nothing should
1005         # work, and everything should take at least 1s to return.
1006         kc = self.keepClient(timeouts=(10, 1))
1007         with self.assertTakesBetween(1, 9):
1008             with self.assertRaises(arvados.errors.KeepReadError):
1009                 kc.get(loc, num_retries=0)
1010         with self.assertTakesBetween(1, 9):
1011             with self.assertRaises(arvados.errors.KeepWriteError):
1012                 kc.put(self.DATA, copies=1, num_retries=0)
1013
1014 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1015 class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
1016     disk_cache = False
1017
1018     def tearDown(self):
1019         DiskCacheBase.tearDown(self)
1020
1021     def mock_disks_and_gateways(self, disks=3, gateways=1):
1022         self.gateways = [{
1023                 'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
1024                 'owner_uuid': 'zzzzz-tpzed-000000000000000',
1025                 'service_host': 'gatewayhost{}'.format(i),
1026                 'service_port': 12345,
1027                 'service_ssl_flag': True,
1028                 'service_type': 'gateway:test',
1029         } for i in range(gateways)]
1030         self.gateway_roots = [
1031             "https://{service_host}:{service_port}/".format(**gw)
1032             for gw in self.gateways]
1033         self.api_client = self.mock_keep_services(
1034             count=disks, additional_services=self.gateways)
1035         self.keepClient = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
1036
1037     @mock.patch('pycurl.Curl')
1038     def test_get_with_gateway_hint_first(self, MockCurl):
1039         MockCurl.return_value = tutil.FakeCurl.make(
1040             code=200, body='foo', headers={'Content-Length': 3})
1041         self.mock_disks_and_gateways()
1042         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@' + self.gateways[0]['uuid']
1043         self.assertEqual(b'foo', self.keepClient.get(locator))
1044         self.assertEqual(self.gateway_roots[0]+locator,
1045                          MockCurl.return_value.getopt(pycurl.URL).decode())
1046         self.assertEqual(True, self.keepClient.head(locator))
1047
1048     @mock.patch('pycurl.Curl')
1049     def test_get_with_gateway_hints_in_order(self, MockCurl):
1050         gateways = 4
1051         disks = 3
1052         mocks = [
1053             tutil.FakeCurl.make(code=404, body='')
1054             for _ in range(gateways+disks)
1055         ]
1056         MockCurl.side_effect = tutil.queue_with(mocks)
1057         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1058         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1059                            ['K@'+gw['uuid'] for gw in self.gateways])
1060         with self.assertRaises(arvados.errors.NotFoundError):
1061             self.keepClient.get(locator)
1062         # Gateways are tried first, in the order given.
1063         for i, root in enumerate(self.gateway_roots):
1064             self.assertEqual(root+locator,
1065                              mocks[i].getopt(pycurl.URL).decode())
1066         # Disk services are tried next.
1067         for i in range(gateways, gateways+disks):
1068             self.assertRegex(
1069                 mocks[i].getopt(pycurl.URL).decode(),
1070                 r'keep0x')
1071
1072     @mock.patch('pycurl.Curl')
1073     def test_head_with_gateway_hints_in_order(self, MockCurl):
1074         gateways = 4
1075         disks = 3
1076         mocks = [
1077             tutil.FakeCurl.make(code=404, body=b'')
1078             for _ in range(gateways+disks)
1079         ]
1080         MockCurl.side_effect = tutil.queue_with(mocks)
1081         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1082         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1083                            ['K@'+gw['uuid'] for gw in self.gateways])
1084         with self.assertRaises(arvados.errors.NotFoundError):
1085             self.keepClient.head(locator)
1086         # Gateways are tried first, in the order given.
1087         for i, root in enumerate(self.gateway_roots):
1088             self.assertEqual(root+locator,
1089                              mocks[i].getopt(pycurl.URL).decode())
1090         # Disk services are tried next.
1091         for i in range(gateways, gateways+disks):
1092             self.assertRegex(
1093                 mocks[i].getopt(pycurl.URL).decode(),
1094                 r'keep0x')
1095
1096     @mock.patch('pycurl.Curl')
1097     def test_get_with_remote_proxy_hint(self, MockCurl):
1098         MockCurl.return_value = tutil.FakeCurl.make(
1099             code=200, body=b'foo', headers={'Content-Length': 3})
1100         self.mock_disks_and_gateways()
1101         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1102         self.assertEqual(b'foo', self.keepClient.get(locator))
1103         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1104                          MockCurl.return_value.getopt(pycurl.URL).decode())
1105
1106     @mock.patch('pycurl.Curl')
1107     def test_head_with_remote_proxy_hint(self, MockCurl):
1108         MockCurl.return_value = tutil.FakeCurl.make(
1109             code=200, body=b'foo', headers={'Content-Length': 3})
1110         self.mock_disks_and_gateways()
1111         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1112         self.assertEqual(True, self.keepClient.head(locator))
1113         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1114                          MockCurl.return_value.getopt(pycurl.URL).decode())
1115
1116 class KeepClientRetryTestMixin(object):
1117     disk_cache = False
1118
1119     # Testing with a local Keep store won't exercise the retry behavior.
1120     # Instead, our strategy is:
1121     # * Create a client with one proxy specified (pointed at a black
1122     #   hole), so there's no need to instantiate an API client, and
1123     #   all HTTP requests come from one place.
1124     # * Mock httplib's request method to provide simulated responses.
1125     # This lets us test the retry logic extensively without relying on any
1126     # supporting servers, and prevents side effects in case something hiccups.
1127     # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
1128     # run_method().
1129     #
1130     # Test classes must define TEST_PATCHER to a method that mocks
1131     # out appropriate methods in the client.
1132
1133     PROXY_ADDR = 'http://[%s]:65535/' % (tutil.TEST_HOST,)
1134     TEST_DATA = b'testdata'
1135     TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
1136
1137     def setUp(self):
1138         self.client_kwargs = {'proxy': self.PROXY_ADDR, 'local_store': ''}
1139
1140     def new_client(self, **caller_kwargs):
1141         kwargs = self.client_kwargs.copy()
1142         kwargs.update(caller_kwargs)
1143         kwargs['block_cache'] = self.make_block_cache(self.disk_cache)
1144         return arvados.KeepClient(**kwargs)
1145
1146     def run_method(self, *args, **kwargs):
1147         raise NotImplementedError("test subclasses must define run_method")
1148
1149     def check_success(self, expected=None, *args, **kwargs):
1150         if expected is None:
1151             expected = self.DEFAULT_EXPECT
1152         self.assertEqual(expected, self.run_method(*args, **kwargs))
1153
1154     def check_exception(self, error_class=None, *args, **kwargs):
1155         if error_class is None:
1156             error_class = self.DEFAULT_EXCEPTION
1157         with self.assertRaises(error_class) as err:
1158             self.run_method(*args, **kwargs)
1159         return err
1160
1161     def test_immediate_success(self):
1162         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 200):
1163             self.check_success()
1164
1165     def test_retry_then_success(self):
1166         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1167             self.check_success(num_retries=3)
1168
1169     def test_exception_then_success(self):
1170         with self.TEST_PATCHER(self.DEFAULT_EXPECT, Exception('mock err'), 200):
1171             self.check_success(num_retries=3)
1172
1173     def test_no_retry_after_permanent_error(self):
1174         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 403, 200):
1175             self.check_exception(num_retries=3)
1176
1177     def test_error_after_retries_exhausted(self):
1178         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 500, 200):
1179             err = self.check_exception(num_retries=1)
1180         self.assertRegex(str(err.exception), r'failed to .* after 2 attempts')
1181
1182     def test_num_retries_instance_fallback(self):
1183         self.client_kwargs['num_retries'] = 3
1184         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1185             self.check_success()
1186
1187
1188 @tutil.skip_sleep
1189 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1190 class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1191     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
1192     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1193     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1194     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1195
1196     def tearDown(self):
1197         DiskCacheBase.tearDown(self)
1198
1199     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1200                    *args, **kwargs):
1201         return self.new_client().get(locator, *args, **kwargs)
1202
1203     def test_specific_exception_when_not_found(self):
1204         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1205             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1206
1207     def test_general_exception_with_mixed_errors(self):
1208         # get should raise a NotFoundError if no server returns the block,
1209         # and a high threshold of servers report that it's not found.
1210         # This test rigs up 50/50 disagreement between two servers, and
1211         # checks that it does not become a NotFoundError.
1212         client = self.new_client(num_retries=0)
1213         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1214             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1215                 client.get(self.HINTED_LOCATOR)
1216             self.assertNotIsInstance(
1217                 exc_check.exception, arvados.errors.NotFoundError,
1218                 "mixed errors raised NotFoundError")
1219
1220     def test_hint_server_can_succeed_without_retries(self):
1221         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1222             self.check_success(locator=self.HINTED_LOCATOR)
1223
1224     def test_try_next_server_after_timeout(self):
1225         with tutil.mock_keep_responses(
1226                 (socket.timeout("timed out"), 200),
1227                 (self.DEFAULT_EXPECT, 200)):
1228             self.check_success(locator=self.HINTED_LOCATOR)
1229
1230     def test_retry_data_with_wrong_checksum(self):
1231         with tutil.mock_keep_responses(
1232                 ('baddata', 200),
1233                 (self.DEFAULT_EXPECT, 200)):
1234             self.check_success(locator=self.HINTED_LOCATOR)
1235
1236 @tutil.skip_sleep
1237 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1238 class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1239     DEFAULT_EXPECT = True
1240     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1241     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1242     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1243
1244     def tearDown(self):
1245         DiskCacheBase.tearDown(self)
1246
1247     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1248                    *args, **kwargs):
1249         return self.new_client().head(locator, *args, **kwargs)
1250
1251     def test_specific_exception_when_not_found(self):
1252         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1253             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1254
1255     def test_general_exception_with_mixed_errors(self):
1256         # head should raise a NotFoundError if no server returns the block,
1257         # and a high threshold of servers report that it's not found.
1258         # This test rigs up 50/50 disagreement between two servers, and
1259         # checks that it does not become a NotFoundError.
1260         client = self.new_client(num_retries=0)
1261         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1262             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1263                 client.head(self.HINTED_LOCATOR)
1264             self.assertNotIsInstance(
1265                 exc_check.exception, arvados.errors.NotFoundError,
1266                 "mixed errors raised NotFoundError")
1267
1268     def test_hint_server_can_succeed_without_retries(self):
1269         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1270             self.check_success(locator=self.HINTED_LOCATOR)
1271
1272     def test_try_next_server_after_timeout(self):
1273         with tutil.mock_keep_responses(
1274                 (socket.timeout("timed out"), 200),
1275                 (self.DEFAULT_EXPECT, 200)):
1276             self.check_success(locator=self.HINTED_LOCATOR)
1277
1278 @tutil.skip_sleep
1279 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1280 class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1281     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
1282     DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
1283     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1284
1285     def tearDown(self):
1286         DiskCacheBase.tearDown(self)
1287
1288     def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
1289                    copies=1, *args, **kwargs):
1290         return self.new_client().put(data, copies, *args, **kwargs)
1291
1292     def test_do_not_send_multiple_copies_to_same_server(self):
1293         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200):
1294             self.check_exception(copies=2, num_retries=3)
1295
1296
1297 class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
1298
1299     class FakeKeepService(object):
1300         def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
1301             self.delay = delay
1302             self.will_succeed = will_succeed
1303             self.will_raise = will_raise
1304             self._result = {}
1305             self._result['headers'] = {}
1306             self._result['headers']['x-keep-replicas-stored'] = str(replicas)
1307             self._result['headers']['x-keep-storage-classes-confirmed'] = 'default={}'.format(replicas)
1308             self._result['body'] = 'foobar'
1309
1310         def put(self, data_hash, data, timeout, headers):
1311             time.sleep(self.delay)
1312             if self.will_raise is not None:
1313                 raise self.will_raise
1314             return self.will_succeed
1315
1316         def last_result(self):
1317             if self.will_succeed:
1318                 return self._result
1319             else:
1320                 return {"status_code": 500, "body": "didn't succeed"}
1321
1322         def finished(self):
1323             return False
1324
1325     def setUp(self):
1326         self.copies = 3
1327         self.pool = arvados.KeepClient.KeepWriterThreadPool(
1328             data = 'foo',
1329             data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1330             max_service_replicas = self.copies,
1331             copies = self.copies
1332         )
1333
1334     def test_only_write_enough_on_success(self):
1335         for i in range(10):
1336             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1337             self.pool.add_task(ks, None)
1338         self.pool.join()
1339         self.assertEqual(self.pool.done(), (self.copies, []))
1340
1341     def test_only_write_enough_on_partial_success(self):
1342         for i in range(5):
1343             ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
1344             self.pool.add_task(ks, None)
1345             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1346             self.pool.add_task(ks, None)
1347         self.pool.join()
1348         self.assertEqual(self.pool.done(), (self.copies, []))
1349
1350     def test_only_write_enough_when_some_crash(self):
1351         for i in range(5):
1352             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1353             self.pool.add_task(ks, None)
1354             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1355             self.pool.add_task(ks, None)
1356         self.pool.join()
1357         self.assertEqual(self.pool.done(), (self.copies, []))
1358
1359     def test_fail_when_too_many_crash(self):
1360         for i in range(self.copies+1):
1361             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1362             self.pool.add_task(ks, None)
1363         for i in range(self.copies-1):
1364             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1365             self.pool.add_task(ks, None)
1366         self.pool.join()
1367         self.assertEqual(self.pool.done(), (self.copies-1, []))
1368
1369
1370 @tutil.skip_sleep
1371 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1372 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
1373     block_cache = False
1374
1375     # Test put()s that need two distinct servers to succeed, possibly
1376     # requiring multiple passes through the retry loop.
1377
1378     def setUp(self):
1379         self.api_client = self.mock_keep_services(count=2)
1380         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
1381
1382     def tearDown(self):
1383         DiskCacheBase.tearDown(self)
1384
1385     def test_success_after_exception(self):
1386         with tutil.mock_keep_responses(
1387                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1388                 Exception('mock err'), 200, 200) as req_mock:
1389             self.keep_client.put('foo', num_retries=1, copies=2)
1390         self.assertEqual(3, req_mock.call_count)
1391
1392     def test_success_after_retryable_error(self):
1393         with tutil.mock_keep_responses(
1394                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1395                 500, 200, 200) as req_mock:
1396             self.keep_client.put('foo', num_retries=1, copies=2)
1397         self.assertEqual(3, req_mock.call_count)
1398
1399     def test_fail_after_final_error(self):
1400         # First retry loop gets a 200 (can't achieve replication by
1401         # storing again on that server) and a 400 (can't retry that
1402         # server at all), so we shouldn't try a third request.
1403         with tutil.mock_keep_responses(
1404                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1405                 200, 400, 200) as req_mock:
1406             with self.assertRaises(arvados.errors.KeepWriteError):
1407                 self.keep_client.put('foo', num_retries=1, copies=2)
1408         self.assertEqual(2, req_mock.call_count)
1409
1410 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1411 class KeepClientAPIErrorTest(unittest.TestCase, DiskCacheBase):
1412     disk_cache = False
1413
1414     def tearDown(self):
1415         DiskCacheBase.tearDown(self)
1416
1417     def test_api_fail(self):
1418         class ApiMock(object):
1419             def __getattr__(self, r):
1420                 if r == "api_token":
1421                     return "abc"
1422                 elif r == "insecure":
1423                     return False
1424                 elif r == "config":
1425                     return lambda: {}
1426                 else:
1427                     raise arvados.errors.KeepReadError()
1428         keep_client = arvados.KeepClient(api_client=ApiMock(),
1429                                          proxy='', local_store='',
1430                                          block_cache=self.make_block_cache(self.disk_cache))
1431
1432         # The bug this is testing for is that if an API (not
1433         # keepstore) exception is thrown as part of a get(), the next
1434         # attempt to get that same block will result in a deadlock.
1435         # This is why there are two get()s in a row.  Unfortunately,
1436         # the failure mode for this test is that the test suite
1437         # deadlocks, there isn't a good way to avoid that without
1438         # adding a special case that has no use except for this test.
1439
1440         with self.assertRaises(arvados.errors.KeepReadError):
1441             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1442         with self.assertRaises(arvados.errors.KeepReadError):
1443             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1444
1445
1446 class KeepDiskCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
1447     def setUp(self):
1448         self.api_client = self.mock_keep_services(count=2)
1449         self.data = b'xyzzy'
1450         self.locator = '1271ed5ef305aadabc605b1609e24c52'
1451         self.disk_cache_dir = tempfile.mkdtemp()
1452
1453     def tearDown(self):
1454         shutil.rmtree(self.disk_cache_dir)
1455
1456
1457     @mock.patch('arvados.KeepClient.KeepService.get')
1458     def test_disk_cache_read(self, get_mock):
1459         # confirm it finds an existing cache block when the cache is
1460         # initialized.
1461
1462         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1463         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1464             f.write(self.data)
1465
1466         # block cache should have found the existing block
1467         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1468                                                   disk_cache_dir=self.disk_cache_dir)
1469         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1470
1471         self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1472
1473         get_mock.assert_not_called()
1474
1475
1476     @mock.patch('arvados.KeepClient.KeepService.get')
1477     def test_disk_cache_share(self, get_mock):
1478         # confirm it finds a cache block written after the disk cache
1479         # was initialized.
1480
1481         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1482                                                   disk_cache_dir=self.disk_cache_dir)
1483         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1484
1485         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1486         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1487             f.write(self.data)
1488
1489         # when we try to get the block, it'll check the disk and find it.
1490         self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1491
1492         get_mock.assert_not_called()
1493
1494
1495     def test_disk_cache_write(self):
1496         # confirm the cache block was created
1497
1498         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1499                                                   disk_cache_dir=self.disk_cache_dir)
1500         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1501
1502         with tutil.mock_keep_responses(self.data, 200) as mock:
1503             self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1504
1505         self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1506
1507         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1508             self.assertTrue(tutil.binary_compare(f.read(), self.data))
1509
1510
1511     def test_disk_cache_clean(self):
1512         # confirm that a tmp file in the cache is cleaned up
1513
1514         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1515         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock"), "wb") as f:
1516             f.write(b"abc1")
1517
1518         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC"), "wb") as f:
1519             f.write(b"abc2")
1520
1521         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC"), "wb") as f:
1522             f.write(b"abc3")
1523
1524         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1525         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1526         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1527
1528         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1529                                                   disk_cache_dir=self.disk_cache_dir)
1530
1531         # The tmp still hasn't been deleted because it was created in the last 60 seconds
1532         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1533         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1534         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1535
1536         # Set the mtime to 61s in the past
1537         os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock"), times=(time.time()-61, time.time()-61))
1538         os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC"), times=(time.time()-61, time.time()-61))
1539         os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC"), times=(time.time()-61, time.time()-61))
1540
1541         block_cache2 = arvados.keep.KeepBlockCache(disk_cache=True,
1542                                                    disk_cache_dir=self.disk_cache_dir)
1543
1544         # Tmp should be gone but the other ones are safe.
1545         self.assertFalse(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1546         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1547         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1548
1549
1550     @mock.patch('arvados.KeepClient.KeepService.get')
1551     def test_disk_cache_cap(self, get_mock):
1552         # confirm that the cache is kept to the desired limit
1553
1554         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1555         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1556             f.write(self.data)
1557
1558         os.makedirs(os.path.join(self.disk_cache_dir, "acb"))
1559         with open(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock"), "wb") as f:
1560             f.write(b"foo")
1561
1562         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1563         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1564
1565         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1566                                                    disk_cache_dir=self.disk_cache_dir,
1567                                                    max_slots=1)
1568
1569         self.assertFalse(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1570         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1571
1572
1573     @mock.patch('arvados.KeepClient.KeepService.get')
1574     def test_disk_cache_share(self, get_mock):
1575         # confirm that a second cache doesn't delete files that belong to the first cache.
1576
1577         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1578         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1579             f.write(self.data)
1580
1581         os.makedirs(os.path.join(self.disk_cache_dir, "acb"))
1582         with open(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock"), "wb") as f:
1583             f.write(b"foo")
1584
1585         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1586         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1587
1588         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1589                                                    disk_cache_dir=self.disk_cache_dir,
1590                                                    max_slots=2)
1591
1592         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1593         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1594
1595         block_cache2 = arvados.keep.KeepBlockCache(disk_cache=True,
1596                                                    disk_cache_dir=self.disk_cache_dir,
1597                                                    max_slots=1)
1598
1599         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1600         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1601
1602
1603
1604     def test_disk_cache_error(self):
1605         os.chmod(self.disk_cache_dir, stat.S_IRUSR)
1606
1607         # Fail during cache initialization.
1608         with self.assertRaises(OSError):
1609             block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1610                                                       disk_cache_dir=self.disk_cache_dir)
1611
1612
1613     def test_disk_cache_write_error(self):
1614         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1615                                                   disk_cache_dir=self.disk_cache_dir)
1616
1617         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1618
1619         # Make the cache dir read-only
1620         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1621         os.chmod(os.path.join(self.disk_cache_dir, self.locator[0:3]), stat.S_IRUSR)
1622
1623         # Cache fails
1624         with self.assertRaises(arvados.errors.KeepCacheError):
1625             with tutil.mock_keep_responses(self.data, 200) as mock:
1626                 keep_client.get(self.locator)
1627
1628
1629     def test_disk_cache_retry_write_error(self):
1630         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1631                                                   disk_cache_dir=self.disk_cache_dir)
1632
1633         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1634
1635         called = False
1636         realmmap = mmap.mmap
1637         def sideeffect_mmap(*args, **kwargs):
1638             nonlocal called
1639             if not called:
1640                 called = True
1641                 raise OSError(errno.ENOSPC, "no space")
1642             else:
1643                 return realmmap(*args, **kwargs)
1644
1645         with patch('mmap.mmap') as mockmmap:
1646             mockmmap.side_effect = sideeffect_mmap
1647
1648             cache_max_before = block_cache.cache_max
1649
1650             with tutil.mock_keep_responses(self.data, 200) as mock:
1651                 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1652
1653             self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1654
1655         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1656             self.assertTrue(tutil.binary_compare(f.read(), self.data))
1657
1658         # shrank the cache in response to ENOSPC
1659         self.assertTrue(cache_max_before > block_cache.cache_max)
1660
1661
1662     def test_disk_cache_retry_write_error2(self):
1663         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1664                                                   disk_cache_dir=self.disk_cache_dir)
1665
1666         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1667
1668         called = False
1669         realmmap = mmap.mmap
1670         def sideeffect_mmap(*args, **kwargs):
1671             nonlocal called
1672             if not called:
1673                 called = True
1674                 raise OSError(errno.ENOMEM, "no memory")
1675             else:
1676                 return realmmap(*args, **kwargs)
1677
1678         with patch('mmap.mmap') as mockmmap:
1679             mockmmap.side_effect = sideeffect_mmap
1680
1681             slots_before = block_cache._max_slots
1682
1683             with tutil.mock_keep_responses(self.data, 200) as mock:
1684                 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1685
1686             self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1687
1688         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1689             self.assertTrue(tutil.binary_compare(f.read(), self.data))
1690
1691         # shrank the cache in response to ENOMEM
1692         self.assertTrue(slots_before > block_cache._max_slots)