21720: added type assertions for AxiosInstance get
[arvados.git] / sdk / python / tests / test_keep_client.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import hashlib
6 import os
7 import errno
8 import pycurl
9 import random
10 import re
11 import shutil
12 import socket
13 import sys
14 import stat
15 import tempfile
16 import time
17 import unittest
18 import urllib.parse
19 import mmap
20
21 from unittest import mock
22 from unittest.mock import patch
23
24 import parameterized
25
26 import arvados
27 import arvados.retry
28 import arvados.util
29 from . import arvados_testutil as tutil
30 from . import keepstub
31 from . import run_test_server
32
33 from .arvados_testutil import DiskCacheBase
34
35 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
36 class KeepTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
37     disk_cache = False
38     MAIN_SERVER = {}
39     KEEP_SERVER = {}
40     block_cache_test = None
41
42     @classmethod
43     def setUpClass(cls):
44         super(KeepTestCase, cls).setUpClass()
45         run_test_server.authorize_with("admin")
46         cls.api_client = arvados.api('v1')
47         cls.block_cache_test = DiskCacheBase()
48         cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
49                                              proxy='', local_store='',
50                                              block_cache=cls.block_cache_test.make_block_cache(cls.disk_cache))
51
52     @classmethod
53     def tearDownClass(cls):
54         super(KeepTestCase, cls).setUpClass()
55         cls.block_cache_test.tearDown()
56
57     def test_KeepBasicRWTest(self):
58         self.assertEqual(0, self.keep_client.upload_counter.get())
59         foo_locator = self.keep_client.put('foo')
60         self.assertRegex(
61             foo_locator,
62             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
63             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
64
65         # 6 bytes because uploaded 2 copies
66         self.assertEqual(6, self.keep_client.upload_counter.get())
67
68         self.assertEqual(0, self.keep_client.download_counter.get())
69         self.assertTrue(tutil.binary_compare(self.keep_client.get(foo_locator),
70                          b'foo'),
71                          'wrong content from Keep.get(md5("foo"))')
72         self.assertEqual(3, self.keep_client.download_counter.get())
73
74     def test_KeepBinaryRWTest(self):
75         blob_str = b'\xff\xfe\xf7\x00\x01\x02'
76         blob_locator = self.keep_client.put(blob_str)
77         self.assertRegex(
78             blob_locator,
79             '^7fc7c53b45e53926ba52821140fef396\+6',
80             ('wrong locator from Keep.put(<binarydata>):' + blob_locator))
81         self.assertEqual(self.keep_client.get(blob_locator),
82                          blob_str,
83                          'wrong content from Keep.get(md5(<binarydata>))')
84
85     def test_KeepLongBinaryRWTest(self):
86         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
87         for i in range(0, 23):
88             blob_data = blob_data + blob_data
89         blob_locator = self.keep_client.put(blob_data)
90         self.assertRegex(
91             blob_locator,
92             '^84d90fc0d8175dd5dcfab04b999bc956\+67108864',
93             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
94         self.assertEqual(self.keep_client.get(blob_locator),
95                          blob_data,
96                          'wrong content from Keep.get(md5(<binarydata>))')
97
98     @unittest.skip("unreliable test - please fix and close #8752")
99     def test_KeepSingleCopyRWTest(self):
100         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
101         blob_locator = self.keep_client.put(blob_data, copies=1)
102         self.assertRegex(
103             blob_locator,
104             '^c902006bc98a3eb4a3663b65ab4a6fab\+8',
105             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
106         self.assertEqual(self.keep_client.get(blob_locator),
107                          blob_data,
108                          'wrong content from Keep.get(md5(<binarydata>))')
109
110     def test_KeepEmptyCollectionTest(self):
111         blob_locator = self.keep_client.put('', copies=1)
112         self.assertRegex(
113             blob_locator,
114             '^d41d8cd98f00b204e9800998ecf8427e\+0',
115             ('wrong locator from Keep.put(""): ' + blob_locator))
116
117     def test_unicode_must_be_ascii(self):
118         # If unicode type, must only consist of valid ASCII
119         foo_locator = self.keep_client.put(u'foo')
120         self.assertRegex(
121             foo_locator,
122             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
123             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
124
125         if sys.version_info < (3, 0):
126             with self.assertRaises(UnicodeEncodeError):
127                 # Error if it is not ASCII
128                 self.keep_client.put(u'\xe2')
129
130         with self.assertRaises(AttributeError):
131             # Must be bytes or have an encode() method
132             self.keep_client.put({})
133
134     def test_KeepHeadTest(self):
135         locator = self.keep_client.put('test_head')
136         self.assertRegex(
137             locator,
138             '^b9a772c7049325feb7130fff1f8333e9\+9',
139             'wrong md5 hash from Keep.put for "test_head": ' + locator)
140         self.assertEqual(True, self.keep_client.head(locator))
141         self.assertEqual(self.keep_client.get(locator),
142                          b'test_head',
143                          'wrong content from Keep.get for "test_head"')
144
145 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
146 class KeepPermissionTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
147     disk_cache = False
148     MAIN_SERVER = {}
149     KEEP_SERVER = {'blob_signing': True}
150
151     def tearDown(self):
152         DiskCacheBase.tearDown(self)
153
154     def test_KeepBasicRWTest(self):
155         run_test_server.authorize_with('active')
156         keep_client = arvados.KeepClient(block_cache=self.make_block_cache(self.disk_cache))
157         foo_locator = keep_client.put('foo')
158         self.assertRegex(
159             foo_locator,
160             r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
161             'invalid locator from Keep.put("foo"): ' + foo_locator)
162         self.assertEqual(keep_client.get(foo_locator),
163                          b'foo',
164                          'wrong content from Keep.get(md5("foo"))')
165
166         # GET with an unsigned locator => bad request
167         bar_locator = keep_client.put('bar')
168         unsigned_bar_locator = "37b51d194a7513e45b56f6524f2d51f2+3"
169         self.assertRegex(
170             bar_locator,
171             r'^37b51d194a7513e45b56f6524f2d51f2\+3\+A[a-f0-9]+@[a-f0-9]+$',
172             'invalid locator from Keep.put("bar"): ' + bar_locator)
173         self.assertRaises(arvados.errors.KeepReadError,
174                           keep_client.get,
175                           unsigned_bar_locator)
176
177         # GET from a different user => bad request
178         run_test_server.authorize_with('spectator')
179         self.assertRaises(arvados.errors.KeepReadError,
180                           arvados.Keep.get,
181                           bar_locator)
182
183         # Unauthenticated GET for a signed locator => bad request
184         # Unauthenticated GET for an unsigned locator => bad request
185         keep_client.api_token = ''
186         self.assertRaises(arvados.errors.KeepReadError,
187                           keep_client.get,
188                           bar_locator)
189         self.assertRaises(arvados.errors.KeepReadError,
190                           keep_client.get,
191                           unsigned_bar_locator)
192
193 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
194 class KeepProxyTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
195     disk_cache = False
196     MAIN_SERVER = {}
197     KEEP_SERVER = {}
198     KEEP_PROXY_SERVER = {}
199
200     @classmethod
201     def setUpClass(cls):
202         super(KeepProxyTestCase, cls).setUpClass()
203         run_test_server.authorize_with('active')
204         cls.api_client = arvados.api('v1')
205
206     def tearDown(self):
207         super(KeepProxyTestCase, self).tearDown()
208         DiskCacheBase.tearDown(self)
209
210     def test_KeepProxyTest1(self):
211         # Will use ARVADOS_KEEP_SERVICES environment variable that
212         # is set by setUpClass().
213         keep_client = arvados.KeepClient(api_client=self.api_client,
214                                          local_store='', block_cache=self.make_block_cache(self.disk_cache))
215         baz_locator = keep_client.put('baz')
216         self.assertRegex(
217             baz_locator,
218             '^73feffa4b7f6bb68e44cf984c85f6e88\+3',
219             'wrong md5 hash from Keep.put("baz"): ' + baz_locator)
220         self.assertEqual(keep_client.get(baz_locator),
221                          b'baz',
222                          'wrong content from Keep.get(md5("baz"))')
223         self.assertTrue(keep_client.using_proxy)
224
225     def test_KeepProxyTestMultipleURIs(self):
226         # Test using ARVADOS_KEEP_SERVICES env var overriding any
227         # existing proxy setting and setting multiple proxies
228         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
229         keep_client = arvados.KeepClient(api_client=self.api_client,
230                                          local_store='',
231                                          block_cache=self.make_block_cache(self.disk_cache))
232         uris = [x['_service_root'] for x in keep_client._keep_services]
233         self.assertEqual(uris, ['http://10.0.0.1/',
234                                 'https://foo.example.org:1234/'])
235
236     def test_KeepProxyTestInvalidURI(self):
237         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org'
238         with self.assertRaises(arvados.errors.ArgumentError):
239             keep_client = arvados.KeepClient(api_client=self.api_client,
240                                              local_store='',
241                                              block_cache=self.make_block_cache(self.disk_cache))
242
243 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
244 class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
245     disk_cache = False
246
247     def tearDown(self):
248         DiskCacheBase.tearDown(self)
249
250     def get_service_roots(self, api_client):
251         keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
252         services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
253         return [urllib.parse.urlparse(url) for url in sorted(services)]
254
255     def test_ssl_flag_respected_in_roots(self):
256         for ssl_flag in [False, True]:
257             services = self.get_service_roots(self.mock_keep_services(
258                 service_ssl_flag=ssl_flag))
259             self.assertEqual(
260                 ('https' if ssl_flag else 'http'), services[0].scheme)
261
262     def test_correct_ports_with_ipv6_addresses(self):
263         service = self.get_service_roots(self.mock_keep_services(
264             service_type='proxy', service_host='100::1', service_port=10, count=1))[0]
265         self.assertEqual('100::1', service.hostname)
266         self.assertEqual(10, service.port)
267
268     def test_recognize_proxy_services_in_controller_response(self):
269         keep_client = arvados.KeepClient(api_client=self.mock_keep_services(
270             service_type='proxy', service_host='localhost', service_port=9, count=1),
271                                          block_cache=self.make_block_cache(self.disk_cache))
272         try:
273             # this will fail, but it ensures we get the service
274             # discovery response
275             keep_client.put('baz2', num_retries=0)
276         except:
277             pass
278         self.assertTrue(keep_client.using_proxy)
279
280     def test_insecure_disables_tls_verify(self):
281         api_client = self.mock_keep_services(count=1)
282         force_timeout = socket.timeout("timed out")
283
284         api_client.insecure = True
285         with tutil.mock_keep_responses(b'foo', 200) as mock:
286             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
287             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
288             self.assertEqual(
289                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
290                 0)
291             self.assertEqual(
292                 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
293                 0)
294
295         api_client.insecure = False
296         with tutil.mock_keep_responses(b'foo', 200) as mock:
297             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
298             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
299             # getopt()==None here means we didn't change the
300             # default. If we were using real pycurl instead of a mock,
301             # it would return the default value 1.
302             self.assertEqual(
303                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
304                 None)
305             self.assertEqual(
306                 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
307                 None)
308
309     def test_refresh_signature(self):
310         blk_digest = '6f5902ac237024bdd0c176cb93063dc4+11'
311         blk_sig = 'da39a3ee5e6b4b0d3255bfef95601890afd80709@53bed294'
312         local_loc = blk_digest+'+A'+blk_sig
313         remote_loc = blk_digest+'+R'+blk_sig
314         api_client = self.mock_keep_services(count=1)
315         headers = {'X-Keep-Locator':local_loc}
316         with tutil.mock_keep_responses('', 200, **headers):
317             # Check that the translated locator gets returned
318             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
319             self.assertEqual(local_loc, keep_client.refresh_signature(remote_loc))
320             # Check that refresh_signature() uses the correct method and headers
321             keep_client._get_or_head = mock.MagicMock()
322             keep_client.refresh_signature(remote_loc)
323             args, kwargs = keep_client._get_or_head.call_args_list[0]
324             self.assertIn(remote_loc, args)
325             self.assertEqual("HEAD", kwargs['method'])
326             self.assertIn('X-Keep-Signature', kwargs['headers'])
327
328     # test_*_timeout verify that KeepClient instructs pycurl to use
329     # the appropriate connection and read timeouts. They don't care
330     # whether pycurl actually exhibits the expected timeout behavior
331     # -- those tests are in the KeepClientTimeout test class.
332
333     def test_get_timeout(self):
334         api_client = self.mock_keep_services(count=1)
335         force_timeout = socket.timeout("timed out")
336         with tutil.mock_keep_responses(force_timeout, 0) as mock:
337             keep_client = arvados.KeepClient(
338                 api_client=api_client,
339                 block_cache=self.make_block_cache(self.disk_cache),
340                 num_retries=0,
341             )
342             with self.assertRaises(arvados.errors.KeepReadError):
343                 keep_client.get('ffffffffffffffffffffffffffffffff')
344             self.assertEqual(
345                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
346                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
347             self.assertEqual(
348                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
349                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
350             self.assertEqual(
351                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
352                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
353
354     def test_put_timeout(self):
355         api_client = self.mock_keep_services(count=1)
356         force_timeout = socket.timeout("timed out")
357         with tutil.mock_keep_responses(force_timeout, 0) as mock:
358             keep_client = arvados.KeepClient(
359                 api_client=api_client,
360                 block_cache=self.make_block_cache(self.disk_cache),
361                 num_retries=0,
362             )
363             with self.assertRaises(arvados.errors.KeepWriteError):
364                 keep_client.put(b'foo')
365             self.assertEqual(
366                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
367                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
368             self.assertEqual(
369                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
370                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
371             self.assertEqual(
372                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
373                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
374
375     def test_head_timeout(self):
376         api_client = self.mock_keep_services(count=1)
377         force_timeout = socket.timeout("timed out")
378         with tutil.mock_keep_responses(force_timeout, 0) as mock:
379             keep_client = arvados.KeepClient(
380                 api_client=api_client,
381                 block_cache=self.make_block_cache(self.disk_cache),
382                 num_retries=0,
383             )
384             with self.assertRaises(arvados.errors.KeepReadError):
385                 keep_client.head('ffffffffffffffffffffffffffffffff')
386             self.assertEqual(
387                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
388                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
389             self.assertEqual(
390                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
391                 None)
392             self.assertEqual(
393                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
394                 None)
395
396     def test_proxy_get_timeout(self):
397         api_client = self.mock_keep_services(service_type='proxy', count=1)
398         force_timeout = socket.timeout("timed out")
399         with tutil.mock_keep_responses(force_timeout, 0) as mock:
400             keep_client = arvados.KeepClient(
401                 api_client=api_client,
402                 block_cache=self.make_block_cache(self.disk_cache),
403                 num_retries=0,
404             )
405             with self.assertRaises(arvados.errors.KeepReadError):
406                 keep_client.get('ffffffffffffffffffffffffffffffff')
407             self.assertEqual(
408                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
409                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
410             self.assertEqual(
411                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
412                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
413             self.assertEqual(
414                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
415                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
416
417     def test_proxy_head_timeout(self):
418         api_client = self.mock_keep_services(service_type='proxy', count=1)
419         force_timeout = socket.timeout("timed out")
420         with tutil.mock_keep_responses(force_timeout, 0) as mock:
421             keep_client = arvados.KeepClient(
422                 api_client=api_client,
423                 block_cache=self.make_block_cache(self.disk_cache),
424                 num_retries=0,
425             )
426             with self.assertRaises(arvados.errors.KeepReadError):
427                 keep_client.head('ffffffffffffffffffffffffffffffff')
428             self.assertEqual(
429                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
430                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
431             self.assertEqual(
432                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
433                 None)
434             self.assertEqual(
435                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
436                 None)
437
438     def test_proxy_put_timeout(self):
439         self.disk_cache_dir = None
440         api_client = self.mock_keep_services(service_type='proxy', count=1)
441         force_timeout = socket.timeout("timed out")
442         with tutil.mock_keep_responses(force_timeout, 0) as mock:
443             keep_client = arvados.KeepClient(
444                 api_client=api_client,
445                 num_retries=0,
446             )
447             with self.assertRaises(arvados.errors.KeepWriteError):
448                 keep_client.put('foo')
449             self.assertEqual(
450                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
451                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
452             self.assertEqual(
453                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
454                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
455             self.assertEqual(
456                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
457                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
458
459     def check_no_services_error(self, verb, exc_class):
460         api_client = mock.MagicMock(name='api_client')
461         api_client.keep_services().accessible().execute.side_effect = (
462             arvados.errors.ApiError)
463         keep_client = arvados.KeepClient(
464             api_client=api_client,
465             block_cache=self.make_block_cache(self.disk_cache),
466             num_retries=0,
467         )
468         with self.assertRaises(exc_class) as err_check:
469             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
470         self.assertEqual(0, len(err_check.exception.request_errors()))
471
472     def test_get_error_with_no_services(self):
473         self.check_no_services_error('get', arvados.errors.KeepReadError)
474
475     def test_head_error_with_no_services(self):
476         self.check_no_services_error('head', arvados.errors.KeepReadError)
477
478     def test_put_error_with_no_services(self):
479         self.check_no_services_error('put', arvados.errors.KeepWriteError)
480
481     def check_errors_from_last_retry(self, verb, exc_class):
482         api_client = self.mock_keep_services(count=2)
483         req_mock = tutil.mock_keep_responses(
484             "retry error reporting test", 500, 500, 500, 500, 500, 500, 502, 502)
485         with req_mock, tutil.skip_sleep, \
486                 self.assertRaises(exc_class) as err_check:
487             keep_client = arvados.KeepClient(
488                 api_client=api_client,
489                 block_cache=self.make_block_cache(self.disk_cache),
490                 num_retries=0,
491             )
492             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
493                                        num_retries=3)
494         self.assertEqual([502, 502], [
495                 getattr(error, 'status_code', None)
496                 for error in err_check.exception.request_errors().values()])
497         self.assertRegex(str(err_check.exception), r'failed to (read|write) .* after 4 attempts')
498
499     def test_get_error_reflects_last_retry(self):
500         self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
501
502     def test_head_error_reflects_last_retry(self):
503         self.check_errors_from_last_retry('head', arvados.errors.KeepReadError)
504
505     def test_put_error_reflects_last_retry(self):
506         self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
507
508     def test_put_error_does_not_include_successful_puts(self):
509         data = 'partial failure test'
510         data_loc = tutil.str_keep_locator(data)
511         api_client = self.mock_keep_services(count=3)
512         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
513                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
514             keep_client = arvados.KeepClient(
515                 api_client=api_client,
516                 block_cache=self.make_block_cache(self.disk_cache),
517                 num_retries=0,
518             )
519             keep_client.put(data)
520         self.assertEqual(2, len(exc_check.exception.request_errors()))
521
522     def test_proxy_put_with_no_writable_services(self):
523         data = 'test with no writable services'
524         data_loc = tutil.str_keep_locator(data)
525         api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
526         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
527                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
528             keep_client = arvados.KeepClient(
529                 api_client=api_client,
530                 block_cache=self.make_block_cache(self.disk_cache),
531                 num_retries=0,
532             )
533             keep_client.put(data)
534         self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
535         self.assertEqual(0, len(exc_check.exception.request_errors()))
536
537     def test_oddball_service_get(self):
538         body = b'oddball service get'
539         api_client = self.mock_keep_services(service_type='fancynewblobstore')
540         with tutil.mock_keep_responses(body, 200):
541             keep_client = arvados.KeepClient(
542                 api_client=api_client,
543                 block_cache=self.make_block_cache(self.disk_cache),
544                 num_retries=0,
545             )
546             actual = keep_client.get(tutil.str_keep_locator(body))
547         self.assertEqual(body, actual)
548
549     def test_oddball_service_put(self):
550         body = b'oddball service put'
551         pdh = tutil.str_keep_locator(body)
552         api_client = self.mock_keep_services(service_type='fancynewblobstore')
553         with tutil.mock_keep_responses(pdh, 200):
554             keep_client = arvados.KeepClient(
555                 api_client=api_client,
556                 block_cache=self.make_block_cache(self.disk_cache),
557                 num_retries=0,
558             )
559             actual = keep_client.put(body, copies=1)
560         self.assertEqual(pdh, actual)
561
562     def test_oddball_service_writer_count(self):
563         body = b'oddball service writer count'
564         pdh = tutil.str_keep_locator(body)
565         api_client = self.mock_keep_services(service_type='fancynewblobstore',
566                                              count=4)
567         headers = {'x-keep-replicas-stored': 3}
568         with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
569                                        **headers) as req_mock:
570             keep_client = arvados.KeepClient(
571                 api_client=api_client,
572                 block_cache=self.make_block_cache(self.disk_cache),
573                 num_retries=0,
574             )
575             actual = keep_client.put(body, copies=2)
576         self.assertEqual(pdh, actual)
577         self.assertEqual(1, req_mock.call_count)
578
579 @tutil.skip_sleep
580 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
581 class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
582     disk_cache = False
583
584     def setUp(self):
585         self.api_client = self.mock_keep_services(count=2)
586         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
587         self.data = b'xyzzy'
588         self.locator = '1271ed5ef305aadabc605b1609e24c52'
589
590     def tearDown(self):
591         DiskCacheBase.tearDown(self)
592
593     @mock.patch('arvados.KeepClient.KeepService.get')
594     def test_get_request_cache(self, get_mock):
595         with tutil.mock_keep_responses(self.data, 200, 200):
596             self.keep_client.get(self.locator)
597             self.keep_client.get(self.locator)
598         # Request already cached, don't require more than one request
599         get_mock.assert_called_once()
600
601     @mock.patch('arvados.KeepClient.KeepService.get')
602     def test_head_request_cache(self, get_mock):
603         with tutil.mock_keep_responses(self.data, 200, 200):
604             self.keep_client.head(self.locator)
605             self.keep_client.head(self.locator)
606         # Don't cache HEAD requests so that they're not confused with GET reqs
607         self.assertEqual(2, get_mock.call_count)
608
609     @mock.patch('arvados.KeepClient.KeepService.get')
610     def test_head_and_then_get_return_different_responses(self, get_mock):
611         head_resp = None
612         get_resp = None
613         get_mock.side_effect = [b'first response', b'second response']
614         with tutil.mock_keep_responses(self.data, 200, 200):
615             head_resp = self.keep_client.head(self.locator)
616             get_resp = self.keep_client.get(self.locator)
617         self.assertEqual(b'first response', head_resp)
618         # First reponse was not cached because it was from a HEAD request.
619         self.assertNotEqual(head_resp, get_resp)
620
621
622
623
624
625 @tutil.skip_sleep
626 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
627 class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
628     disk_cache = False
629
630     def setUp(self):
631         self.api_client = self.mock_keep_services(count=2)
632         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
633         self.data = b'xyzzy'
634         self.locator = '1271ed5ef305aadabc605b1609e24c52'
635         self.test_id = arvados.util.new_request_id()
636         self.assertRegex(self.test_id, r'^req-[a-z0-9]{20}$')
637         # If we don't set request_id to None explicitly here, it will
638         # return <MagicMock name='api_client_mock.request_id'
639         # id='123456789'>:
640         self.api_client.request_id = None
641
642     def tearDown(self):
643         DiskCacheBase.tearDown(self)
644
645     def test_default_to_api_client_request_id(self):
646         self.api_client.request_id = self.test_id
647         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
648             self.keep_client.put(self.data)
649         self.assertEqual(2, len(mock.responses))
650         for resp in mock.responses:
651             self.assertProvidedRequestId(resp)
652
653         with tutil.mock_keep_responses(self.data, 200) as mock:
654             self.keep_client.get(self.locator)
655         self.assertProvidedRequestId(mock.responses[0])
656
657         with tutil.mock_keep_responses(b'', 200) as mock:
658             self.keep_client.head(self.locator)
659         self.assertProvidedRequestId(mock.responses[0])
660
661     def test_explicit_request_id(self):
662         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
663             self.keep_client.put(self.data, request_id=self.test_id)
664         self.assertEqual(2, len(mock.responses))
665         for resp in mock.responses:
666             self.assertProvidedRequestId(resp)
667
668         with tutil.mock_keep_responses(self.data, 200) as mock:
669             self.keep_client.get(self.locator, request_id=self.test_id)
670         self.assertProvidedRequestId(mock.responses[0])
671
672         with tutil.mock_keep_responses(b'', 200) as mock:
673             self.keep_client.head(self.locator, request_id=self.test_id)
674         self.assertProvidedRequestId(mock.responses[0])
675
676     def test_automatic_request_id(self):
677         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
678             self.keep_client.put(self.data)
679         self.assertEqual(2, len(mock.responses))
680         for resp in mock.responses:
681             self.assertAutomaticRequestId(resp)
682
683         with tutil.mock_keep_responses(self.data, 200) as mock:
684             self.keep_client.get(self.locator)
685         self.assertAutomaticRequestId(mock.responses[0])
686
687         with tutil.mock_keep_responses(b'', 200) as mock:
688             self.keep_client.head(self.locator)
689         self.assertAutomaticRequestId(mock.responses[0])
690
691     def test_request_id_in_exception(self):
692         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
693             with self.assertRaisesRegex(arvados.errors.KeepReadError, self.test_id):
694                 self.keep_client.head(self.locator, request_id=self.test_id)
695
696         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
697             with self.assertRaisesRegex(arvados.errors.KeepReadError, r'req-[a-z0-9]{20}'):
698                 self.keep_client.get(self.locator)
699
700         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
701             with self.assertRaisesRegex(arvados.errors.KeepWriteError, self.test_id):
702                 self.keep_client.put(self.data, request_id=self.test_id)
703
704         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
705             with self.assertRaisesRegex(arvados.errors.KeepWriteError, r'req-[a-z0-9]{20}'):
706                 self.keep_client.put(self.data)
707
708     def assertAutomaticRequestId(self, resp):
709         hdr = [x for x in resp.getopt(pycurl.HTTPHEADER)
710                if x.startswith('X-Request-Id: ')][0]
711         self.assertNotEqual(hdr, 'X-Request-Id: '+self.test_id)
712         self.assertRegex(hdr, r'^X-Request-Id: req-[a-z0-9]{20}$')
713
714     def assertProvidedRequestId(self, resp):
715         self.assertIn('X-Request-Id: '+self.test_id,
716                       resp.getopt(pycurl.HTTPHEADER))
717
718
719 @tutil.skip_sleep
720 #@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
721 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
722     disk_cache = False
723
724     def setUp(self):
725         # expected_order[i] is the probe order for
726         # hash=md5(sprintf("%064x",i)) where there are 16 services
727         # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
728         # the first probe for the block consisting of 64 "0"
729         # characters is the service whose uuid is
730         # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
731         self.services = 16
732         self.expected_order = [
733             list('3eab2d5fc9681074'),
734             list('097dba52e648f1c3'),
735             list('c5b4e023f8a7d691'),
736             list('9d81c02e76a3bf54'),
737             ]
738         self.blocks = [
739             "{:064x}".format(x).encode()
740             for x in range(len(self.expected_order))]
741         self.hashes = [
742             hashlib.md5(self.blocks[x]).hexdigest()
743             for x in range(len(self.expected_order))]
744         self.api_client = self.mock_keep_services(count=self.services)
745         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
746
747     def tearDown(self):
748         DiskCacheBase.tearDown(self)
749
750     def test_weighted_service_roots_against_reference_set(self):
751         # Confirm weighted_service_roots() returns the correct order
752         for i, hash in enumerate(self.hashes):
753             roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash))
754             got_order = [
755                 re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
756                 for root in roots]
757             self.assertEqual(self.expected_order[i], got_order)
758
759     def test_get_probe_order_against_reference_set(self):
760         self._test_probe_order_against_reference_set(
761             lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
762
763     def test_head_probe_order_against_reference_set(self):
764         self._test_probe_order_against_reference_set(
765             lambda i: self.keep_client.head(self.hashes[i], num_retries=1))
766
767     def test_put_probe_order_against_reference_set(self):
768         # copies=1 prevents the test from being sensitive to races
769         # between writer threads.
770         self._test_probe_order_against_reference_set(
771             lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1))
772
773     def _test_probe_order_against_reference_set(self, op):
774         for i in range(len(self.blocks)):
775             with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \
776                  self.assertRaises(arvados.errors.KeepRequestError):
777                 op(i)
778             got_order = [
779                 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
780                 for resp in mock.responses]
781             self.assertEqual(self.expected_order[i]*2, got_order)
782
783     def test_put_probe_order_multiple_copies(self):
784         for copies in range(2, 4):
785             for i in range(len(self.blocks)):
786                 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \
787                      self.assertRaises(arvados.errors.KeepWriteError):
788                     self.keep_client.put(self.blocks[i], num_retries=2, copies=copies)
789                 got_order = [
790                     re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
791                     for resp in mock.responses]
792                 # With T threads racing to make requests, the position
793                 # of a given server in the sequence of HTTP requests
794                 # (got_order) cannot be more than T-1 positions
795                 # earlier than that server's position in the reference
796                 # probe sequence (expected_order).
797                 #
798                 # Loop invariant: we have accounted for +pos+ expected
799                 # probes, either by seeing them in +got_order+ or by
800                 # putting them in +pending+ in the hope of seeing them
801                 # later. As long as +len(pending)<T+, we haven't
802                 # started a request too early.
803                 pending = []
804                 for pos, expected in enumerate(self.expected_order[i]*3):
805                     got = got_order[pos-len(pending)]
806                     while got in pending:
807                         del pending[pending.index(got)]
808                         got = got_order[pos-len(pending)]
809                     if got != expected:
810                         pending.append(expected)
811                         self.assertLess(
812                             len(pending), copies,
813                             "pending={}, with copies={}, got {}, expected {}".format(
814                                 pending, copies, repr(got_order), repr(self.expected_order[i]*3)))
815
816     def test_probe_waste_adding_one_server(self):
817         hashes = [
818             hashlib.md5("{:064x}".format(x).encode()).hexdigest() for x in range(100)]
819         initial_services = 12
820         self.api_client = self.mock_keep_services(count=initial_services)
821         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
822         probes_before = [
823             self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
824         for added_services in range(1, 12):
825             api_client = self.mock_keep_services(count=initial_services+added_services)
826             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
827             total_penalty = 0
828             for hash_index in range(len(hashes)):
829                 probe_after = keep_client.weighted_service_roots(
830                     arvados.KeepLocator(hashes[hash_index]))
831                 penalty = probe_after.index(probes_before[hash_index][0])
832                 self.assertLessEqual(penalty, added_services)
833                 total_penalty += penalty
834             # Average penalty per block should not exceed
835             # N(added)/N(orig) by more than 20%, and should get closer
836             # to the ideal as we add data points.
837             expect_penalty = (
838                 added_services *
839                 len(hashes) / initial_services)
840             max_penalty = (
841                 expect_penalty *
842                 (120 - added_services)/100)
843             min_penalty = (
844                 expect_penalty * 8/10)
845             self.assertTrue(
846                 min_penalty <= total_penalty <= max_penalty,
847                 "With {}+{} services, {} blocks, penalty {} but expected {}..{}".format(
848                     initial_services,
849                     added_services,
850                     len(hashes),
851                     total_penalty,
852                     min_penalty,
853                     max_penalty))
854
855     def check_64_zeros_error_order(self, verb, exc_class):
856         data = b'0' * 64
857         if verb == 'get':
858             data = tutil.str_keep_locator(data)
859         # Arbitrary port number:
860         aport = random.randint(1024,65535)
861         api_client = self.mock_keep_services(service_port=aport, count=self.services)
862         keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
863         with mock.patch('pycurl.Curl') as curl_mock, \
864              self.assertRaises(exc_class) as err_check:
865             curl_mock.return_value = tutil.FakeCurl.make(code=500, body=b'')
866             getattr(keep_client, verb)(data)
867         urls = [urllib.parse.urlparse(url)
868                 for url in err_check.exception.request_errors()]
869         self.assertEqual([('keep0x' + c, aport) for c in '3eab2d5fc9681074'],
870                          [(url.hostname, url.port) for url in urls])
871
872     def test_get_error_shows_probe_order(self):
873         self.check_64_zeros_error_order('get', arvados.errors.KeepReadError)
874
875     def test_put_error_shows_probe_order(self):
876         self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
877
878 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
879 class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase, DiskCacheBase):
880     disk_cache = False
881
882     # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
883     # 1s worth of data and then trigger bandwidth errors before running
884     # out of data.
885     DATA = b'x'*2**11
886     BANDWIDTH_LOW_LIM = 1024
887     TIMEOUT_TIME = 1.0
888
889     def tearDown(self):
890         DiskCacheBase.tearDown(self)
891
892     class assertTakesBetween(unittest.TestCase):
893         def __init__(self, tmin, tmax):
894             self.tmin = tmin
895             self.tmax = tmax
896
897         def __enter__(self):
898             self.t0 = time.time()
899
900         def __exit__(self, *args, **kwargs):
901             # Round times to milliseconds, like CURL. Otherwise, we
902             # fail when CURL reaches a 1s timeout at 0.9998s.
903             delta = round(time.time() - self.t0, 3)
904             self.assertGreaterEqual(delta, self.tmin)
905             self.assertLessEqual(delta, self.tmax)
906
907     class assertTakesGreater(unittest.TestCase):
908         def __init__(self, tmin):
909             self.tmin = tmin
910
911         def __enter__(self):
912             self.t0 = time.time()
913
914         def __exit__(self, *args, **kwargs):
915             delta = round(time.time() - self.t0, 3)
916             self.assertGreaterEqual(delta, self.tmin)
917
918     def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
919         return arvados.KeepClient(
920             api_client=self.api_client,
921             timeout=timeouts, block_cache=self.make_block_cache(self.disk_cache))
922
923     def test_timeout_slow_connect(self):
924         # Can't simulate TCP delays with our own socket. Leave our
925         # stub server running uselessly, and try to connect to an
926         # unroutable IP address instead.
927         self.api_client = self.mock_keep_services(
928             count=1,
929             service_host='240.0.0.0',
930         )
931         with self.assertTakesBetween(0.1, 0.5):
932             with self.assertRaises(arvados.errors.KeepWriteError):
933                 self.keepClient().put(self.DATA, copies=1, num_retries=0)
934
935     def test_low_bandwidth_no_delays_success(self):
936         self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
937         kc = self.keepClient()
938         loc = kc.put(self.DATA, copies=1, num_retries=0)
939         self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
940
941     def test_too_low_bandwidth_no_delays_failure(self):
942         # Check that lessening bandwidth corresponds to failing
943         kc = self.keepClient()
944         loc = kc.put(self.DATA, copies=1, num_retries=0)
945         self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
946         with self.assertTakesGreater(self.TIMEOUT_TIME):
947             with self.assertRaises(arvados.errors.KeepReadError):
948                 kc.get(loc, num_retries=0)
949         with self.assertTakesGreater(self.TIMEOUT_TIME):
950             with self.assertRaises(arvados.errors.KeepWriteError):
951                 kc.put(self.DATA, copies=1, num_retries=0)
952
953     def test_low_bandwidth_with_server_response_delay_failure(self):
954         kc = self.keepClient()
955         loc = kc.put(self.DATA, copies=1, num_retries=0)
956         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
957         # Note the actual delay must be 1s longer than the low speed
958         # limit interval in order for curl to detect it reliably.
959         self.server.setdelays(response=self.TIMEOUT_TIME+1)
960         with self.assertTakesGreater(self.TIMEOUT_TIME):
961             with self.assertRaises(arvados.errors.KeepReadError):
962                 kc.get(loc, num_retries=0)
963         with self.assertTakesGreater(self.TIMEOUT_TIME):
964             with self.assertRaises(arvados.errors.KeepWriteError):
965                 kc.put(self.DATA, copies=1, num_retries=0)
966         with self.assertTakesGreater(self.TIMEOUT_TIME):
967             kc.head(loc, num_retries=0)
968
969     def test_low_bandwidth_with_server_mid_delay_failure(self):
970         kc = self.keepClient()
971         loc = kc.put(self.DATA, copies=1, num_retries=0)
972         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
973         # Note the actual delay must be 1s longer than the low speed
974         # limit interval in order for curl to detect it reliably.
975         self.server.setdelays(mid_write=self.TIMEOUT_TIME+1, mid_read=self.TIMEOUT_TIME+1)
976         with self.assertTakesGreater(self.TIMEOUT_TIME):
977             with self.assertRaises(arvados.errors.KeepReadError) as e:
978                 kc.get(loc, num_retries=0)
979         with self.assertTakesGreater(self.TIMEOUT_TIME):
980             with self.assertRaises(arvados.errors.KeepWriteError):
981                 kc.put(self.DATA, copies=1, num_retries=0)
982
983     def test_timeout_slow_request(self):
984         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
985         self.server.setdelays(request=.2)
986         self._test_connect_timeout_under_200ms(loc)
987         self.server.setdelays(request=2)
988         self._test_response_timeout_under_2s(loc)
989
990     def test_timeout_slow_response(self):
991         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
992         self.server.setdelays(response=.2)
993         self._test_connect_timeout_under_200ms(loc)
994         self.server.setdelays(response=2)
995         self._test_response_timeout_under_2s(loc)
996
997     def test_timeout_slow_response_body(self):
998         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
999         self.server.setdelays(response_body=.2)
1000         self._test_connect_timeout_under_200ms(loc)
1001         self.server.setdelays(response_body=2)
1002         self._test_response_timeout_under_2s(loc)
1003
1004     def _test_connect_timeout_under_200ms(self, loc):
1005         # Allow 100ms to connect, then 1s for response. Everything
1006         # should work, and everything should take at least 200ms to
1007         # return.
1008         kc = self.keepClient(timeouts=(.1, 1))
1009         with self.assertTakesBetween(.2, .3):
1010             kc.put(self.DATA, copies=1, num_retries=0)
1011         with self.assertTakesBetween(.2, .3):
1012             self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
1013
1014     def _test_response_timeout_under_2s(self, loc):
1015         # Allow 10s to connect, then 1s for response. Nothing should
1016         # work, and everything should take at least 1s to return.
1017         kc = self.keepClient(timeouts=(10, 1))
1018         with self.assertTakesBetween(1, 9):
1019             with self.assertRaises(arvados.errors.KeepReadError):
1020                 kc.get(loc, num_retries=0)
1021         with self.assertTakesBetween(1, 9):
1022             with self.assertRaises(arvados.errors.KeepWriteError):
1023                 kc.put(self.DATA, copies=1, num_retries=0)
1024
1025 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1026 class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
1027     disk_cache = False
1028
1029     def tearDown(self):
1030         DiskCacheBase.tearDown(self)
1031
1032     def mock_disks_and_gateways(self, disks=3, gateways=1):
1033         self.gateways = [{
1034                 'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
1035                 'owner_uuid': 'zzzzz-tpzed-000000000000000',
1036                 'service_host': 'gatewayhost{}'.format(i),
1037                 'service_port': 12345,
1038                 'service_ssl_flag': True,
1039                 'service_type': 'gateway:test',
1040         } for i in range(gateways)]
1041         self.gateway_roots = [
1042             "https://{service_host}:{service_port}/".format(**gw)
1043             for gw in self.gateways]
1044         self.api_client = self.mock_keep_services(
1045             count=disks, additional_services=self.gateways)
1046         self.keepClient = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
1047
1048     @mock.patch('pycurl.Curl')
1049     def test_get_with_gateway_hint_first(self, MockCurl):
1050         MockCurl.return_value = tutil.FakeCurl.make(
1051             code=200, body='foo', headers={'Content-Length': 3})
1052         self.mock_disks_and_gateways()
1053         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@' + self.gateways[0]['uuid']
1054         self.assertEqual(b'foo', self.keepClient.get(locator))
1055         self.assertEqual(self.gateway_roots[0]+locator,
1056                          MockCurl.return_value.getopt(pycurl.URL).decode())
1057         self.assertEqual(True, self.keepClient.head(locator))
1058
1059     @mock.patch('pycurl.Curl')
1060     def test_get_with_gateway_hints_in_order(self, MockCurl):
1061         gateways = 4
1062         disks = 3
1063         mocks = [
1064             tutil.FakeCurl.make(code=404, body='')
1065             for _ in range(gateways+disks)
1066         ]
1067         MockCurl.side_effect = tutil.queue_with(mocks)
1068         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1069         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1070                            ['K@'+gw['uuid'] for gw in self.gateways])
1071         with self.assertRaises(arvados.errors.NotFoundError):
1072             self.keepClient.get(locator)
1073         # Gateways are tried first, in the order given.
1074         for i, root in enumerate(self.gateway_roots):
1075             self.assertEqual(root+locator,
1076                              mocks[i].getopt(pycurl.URL).decode())
1077         # Disk services are tried next.
1078         for i in range(gateways, gateways+disks):
1079             self.assertRegex(
1080                 mocks[i].getopt(pycurl.URL).decode(),
1081                 r'keep0x')
1082
1083     @mock.patch('pycurl.Curl')
1084     def test_head_with_gateway_hints_in_order(self, MockCurl):
1085         gateways = 4
1086         disks = 3
1087         mocks = [
1088             tutil.FakeCurl.make(code=404, body=b'')
1089             for _ in range(gateways+disks)
1090         ]
1091         MockCurl.side_effect = tutil.queue_with(mocks)
1092         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1093         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1094                            ['K@'+gw['uuid'] for gw in self.gateways])
1095         with self.assertRaises(arvados.errors.NotFoundError):
1096             self.keepClient.head(locator)
1097         # Gateways are tried first, in the order given.
1098         for i, root in enumerate(self.gateway_roots):
1099             self.assertEqual(root+locator,
1100                              mocks[i].getopt(pycurl.URL).decode())
1101         # Disk services are tried next.
1102         for i in range(gateways, gateways+disks):
1103             self.assertRegex(
1104                 mocks[i].getopt(pycurl.URL).decode(),
1105                 r'keep0x')
1106
1107     @mock.patch('pycurl.Curl')
1108     def test_get_with_remote_proxy_hint(self, MockCurl):
1109         MockCurl.return_value = tutil.FakeCurl.make(
1110             code=200, body=b'foo', headers={'Content-Length': 3})
1111         self.mock_disks_and_gateways()
1112         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1113         self.assertEqual(b'foo', self.keepClient.get(locator))
1114         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1115                          MockCurl.return_value.getopt(pycurl.URL).decode())
1116
1117     @mock.patch('pycurl.Curl')
1118     def test_head_with_remote_proxy_hint(self, MockCurl):
1119         MockCurl.return_value = tutil.FakeCurl.make(
1120             code=200, body=b'foo', headers={'Content-Length': 3})
1121         self.mock_disks_and_gateways()
1122         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1123         self.assertEqual(True, self.keepClient.head(locator))
1124         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1125                          MockCurl.return_value.getopt(pycurl.URL).decode())
1126
1127 class KeepClientRetryTestMixin(object):
1128     disk_cache = False
1129
1130     # Testing with a local Keep store won't exercise the retry behavior.
1131     # Instead, our strategy is:
1132     # * Create a client with one proxy specified (pointed at a black
1133     #   hole), so there's no need to instantiate an API client, and
1134     #   all HTTP requests come from one place.
1135     # * Mock httplib's request method to provide simulated responses.
1136     # This lets us test the retry logic extensively without relying on any
1137     # supporting servers, and prevents side effects in case something hiccups.
1138     # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
1139     # run_method().
1140     #
1141     # Test classes must define TEST_PATCHER to a method that mocks
1142     # out appropriate methods in the client.
1143
1144     PROXY_ADDR = 'http://[%s]:65535/' % (tutil.TEST_HOST,)
1145     TEST_DATA = b'testdata'
1146     TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
1147
1148     def setUp(self):
1149         self.client_kwargs = {'proxy': self.PROXY_ADDR, 'local_store': ''}
1150
1151     def new_client(self, **caller_kwargs):
1152         kwargs = self.client_kwargs.copy()
1153         kwargs.update(caller_kwargs)
1154         kwargs['block_cache'] = self.make_block_cache(self.disk_cache)
1155         return arvados.KeepClient(**kwargs)
1156
1157     def run_method(self, *args, **kwargs):
1158         raise NotImplementedError("test subclasses must define run_method")
1159
1160     def check_success(self, expected=None, *args, **kwargs):
1161         if expected is None:
1162             expected = self.DEFAULT_EXPECT
1163         self.assertEqual(expected, self.run_method(*args, **kwargs))
1164
1165     def check_exception(self, error_class=None, *args, **kwargs):
1166         if error_class is None:
1167             error_class = self.DEFAULT_EXCEPTION
1168         with self.assertRaises(error_class) as err:
1169             self.run_method(*args, **kwargs)
1170         return err
1171
1172     def test_immediate_success(self):
1173         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 200):
1174             self.check_success()
1175
1176     def test_retry_then_success(self):
1177         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1178             self.check_success(num_retries=3)
1179
1180     def test_exception_then_success(self):
1181         with self.TEST_PATCHER(self.DEFAULT_EXPECT, Exception('mock err'), 200):
1182             self.check_success(num_retries=3)
1183
1184     def test_no_retry_after_permanent_error(self):
1185         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 403, 200):
1186             self.check_exception(num_retries=3)
1187
1188     def test_error_after_retries_exhausted(self):
1189         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 500, 200):
1190             err = self.check_exception(num_retries=1)
1191         self.assertRegex(str(err.exception), r'failed to .* after 2 attempts')
1192
1193     def test_num_retries_instance_fallback(self):
1194         self.client_kwargs['num_retries'] = 3
1195         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1196             self.check_success()
1197
1198
1199 @tutil.skip_sleep
1200 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1201 class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1202     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
1203     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1204     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1205     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1206
1207     def tearDown(self):
1208         DiskCacheBase.tearDown(self)
1209
1210     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1211                    *args, **kwargs):
1212         return self.new_client().get(locator, *args, **kwargs)
1213
1214     def test_specific_exception_when_not_found(self):
1215         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1216             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1217
1218     def test_general_exception_with_mixed_errors(self):
1219         # get should raise a NotFoundError if no server returns the block,
1220         # and a high threshold of servers report that it's not found.
1221         # This test rigs up 50/50 disagreement between two servers, and
1222         # checks that it does not become a NotFoundError.
1223         client = self.new_client(num_retries=0)
1224         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1225             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1226                 client.get(self.HINTED_LOCATOR)
1227             self.assertNotIsInstance(
1228                 exc_check.exception, arvados.errors.NotFoundError,
1229                 "mixed errors raised NotFoundError")
1230
1231     def test_hint_server_can_succeed_without_retries(self):
1232         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1233             self.check_success(locator=self.HINTED_LOCATOR)
1234
1235     def test_try_next_server_after_timeout(self):
1236         with tutil.mock_keep_responses(
1237                 (socket.timeout("timed out"), 200),
1238                 (self.DEFAULT_EXPECT, 200)):
1239             self.check_success(locator=self.HINTED_LOCATOR)
1240
1241     def test_retry_data_with_wrong_checksum(self):
1242         with tutil.mock_keep_responses(
1243                 ('baddata', 200),
1244                 (self.DEFAULT_EXPECT, 200)):
1245             self.check_success(locator=self.HINTED_LOCATOR)
1246
1247 @tutil.skip_sleep
1248 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1249 class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1250     DEFAULT_EXPECT = True
1251     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1252     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1253     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1254
1255     def tearDown(self):
1256         DiskCacheBase.tearDown(self)
1257
1258     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1259                    *args, **kwargs):
1260         return self.new_client().head(locator, *args, **kwargs)
1261
1262     def test_specific_exception_when_not_found(self):
1263         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1264             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1265
1266     def test_general_exception_with_mixed_errors(self):
1267         # head should raise a NotFoundError if no server returns the block,
1268         # and a high threshold of servers report that it's not found.
1269         # This test rigs up 50/50 disagreement between two servers, and
1270         # checks that it does not become a NotFoundError.
1271         client = self.new_client(num_retries=0)
1272         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1273             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1274                 client.head(self.HINTED_LOCATOR)
1275             self.assertNotIsInstance(
1276                 exc_check.exception, arvados.errors.NotFoundError,
1277                 "mixed errors raised NotFoundError")
1278
1279     def test_hint_server_can_succeed_without_retries(self):
1280         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1281             self.check_success(locator=self.HINTED_LOCATOR)
1282
1283     def test_try_next_server_after_timeout(self):
1284         with tutil.mock_keep_responses(
1285                 (socket.timeout("timed out"), 200),
1286                 (self.DEFAULT_EXPECT, 200)):
1287             self.check_success(locator=self.HINTED_LOCATOR)
1288
1289 @tutil.skip_sleep
1290 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1291 class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1292     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
1293     DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
1294     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1295
1296     def tearDown(self):
1297         DiskCacheBase.tearDown(self)
1298
1299     def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
1300                    copies=1, *args, **kwargs):
1301         return self.new_client().put(data, copies, *args, **kwargs)
1302
1303     def test_do_not_send_multiple_copies_to_same_server(self):
1304         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200):
1305             self.check_exception(copies=2, num_retries=3)
1306
1307
1308 class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
1309
1310     class FakeKeepService(object):
1311         def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
1312             self.delay = delay
1313             self.will_succeed = will_succeed
1314             self.will_raise = will_raise
1315             self._result = {}
1316             self._result['headers'] = {}
1317             self._result['headers']['x-keep-replicas-stored'] = str(replicas)
1318             self._result['headers']['x-keep-storage-classes-confirmed'] = 'default={}'.format(replicas)
1319             self._result['body'] = 'foobar'
1320
1321         def put(self, data_hash, data, timeout, headers):
1322             time.sleep(self.delay)
1323             if self.will_raise is not None:
1324                 raise self.will_raise
1325             return self.will_succeed
1326
1327         def last_result(self):
1328             if self.will_succeed:
1329                 return self._result
1330             else:
1331                 return {"status_code": 500, "body": "didn't succeed"}
1332
1333         def finished(self):
1334             return False
1335
1336     def setUp(self):
1337         self.copies = 3
1338         self.pool = arvados.KeepClient.KeepWriterThreadPool(
1339             data = 'foo',
1340             data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1341             max_service_replicas = self.copies,
1342             copies = self.copies
1343         )
1344
1345     def test_only_write_enough_on_success(self):
1346         for i in range(10):
1347             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1348             self.pool.add_task(ks, None)
1349         self.pool.join()
1350         self.assertEqual(self.pool.done(), (self.copies, []))
1351
1352     def test_only_write_enough_on_partial_success(self):
1353         for i in range(5):
1354             ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
1355             self.pool.add_task(ks, None)
1356             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1357             self.pool.add_task(ks, None)
1358         self.pool.join()
1359         self.assertEqual(self.pool.done(), (self.copies, []))
1360
1361     def test_only_write_enough_when_some_crash(self):
1362         for i in range(5):
1363             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1364             self.pool.add_task(ks, None)
1365             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1366             self.pool.add_task(ks, None)
1367         self.pool.join()
1368         self.assertEqual(self.pool.done(), (self.copies, []))
1369
1370     def test_fail_when_too_many_crash(self):
1371         for i in range(self.copies+1):
1372             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1373             self.pool.add_task(ks, None)
1374         for i in range(self.copies-1):
1375             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1376             self.pool.add_task(ks, None)
1377         self.pool.join()
1378         self.assertEqual(self.pool.done(), (self.copies-1, []))
1379
1380
1381 @tutil.skip_sleep
1382 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1383 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
1384     block_cache = False
1385
1386     # Test put()s that need two distinct servers to succeed, possibly
1387     # requiring multiple passes through the retry loop.
1388
1389     def setUp(self):
1390         self.api_client = self.mock_keep_services(count=2)
1391         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
1392
1393     def tearDown(self):
1394         DiskCacheBase.tearDown(self)
1395
1396     def test_success_after_exception(self):
1397         with tutil.mock_keep_responses(
1398                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1399                 Exception('mock err'), 200, 200) as req_mock:
1400             self.keep_client.put('foo', num_retries=1, copies=2)
1401         self.assertEqual(3, req_mock.call_count)
1402
1403     def test_success_after_retryable_error(self):
1404         with tutil.mock_keep_responses(
1405                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1406                 500, 200, 200) as req_mock:
1407             self.keep_client.put('foo', num_retries=1, copies=2)
1408         self.assertEqual(3, req_mock.call_count)
1409
1410     def test_fail_after_final_error(self):
1411         # First retry loop gets a 200 (can't achieve replication by
1412         # storing again on that server) and a 400 (can't retry that
1413         # server at all), so we shouldn't try a third request.
1414         with tutil.mock_keep_responses(
1415                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1416                 200, 400, 200) as req_mock:
1417             with self.assertRaises(arvados.errors.KeepWriteError):
1418                 self.keep_client.put('foo', num_retries=1, copies=2)
1419         self.assertEqual(2, req_mock.call_count)
1420
1421 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1422 class KeepClientAPIErrorTest(unittest.TestCase, DiskCacheBase):
1423     disk_cache = False
1424
1425     def tearDown(self):
1426         DiskCacheBase.tearDown(self)
1427
1428     def test_api_fail(self):
1429         class ApiMock(object):
1430             def __getattr__(self, r):
1431                 if r == "api_token":
1432                     return "abc"
1433                 elif r == "insecure":
1434                     return False
1435                 elif r == "config":
1436                     return lambda: {}
1437                 else:
1438                     raise arvados.errors.KeepReadError()
1439         keep_client = arvados.KeepClient(api_client=ApiMock(),
1440                                          proxy='', local_store='',
1441                                          block_cache=self.make_block_cache(self.disk_cache))
1442
1443         # The bug this is testing for is that if an API (not
1444         # keepstore) exception is thrown as part of a get(), the next
1445         # attempt to get that same block will result in a deadlock.
1446         # This is why there are two get()s in a row.  Unfortunately,
1447         # the failure mode for this test is that the test suite
1448         # deadlocks, there isn't a good way to avoid that without
1449         # adding a special case that has no use except for this test.
1450
1451         with self.assertRaises(arvados.errors.KeepReadError):
1452             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1453         with self.assertRaises(arvados.errors.KeepReadError):
1454             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1455
1456
1457 class KeepDiskCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
1458     def setUp(self):
1459         self.api_client = self.mock_keep_services(count=2)
1460         self.data = b'xyzzy'
1461         self.locator = '1271ed5ef305aadabc605b1609e24c52'
1462         self.disk_cache_dir = tempfile.mkdtemp()
1463
1464     def tearDown(self):
1465         shutil.rmtree(self.disk_cache_dir)
1466
1467
1468     @mock.patch('arvados.KeepClient.KeepService.get')
1469     def test_disk_cache_read(self, get_mock):
1470         # confirm it finds an existing cache block when the cache is
1471         # initialized.
1472
1473         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1474         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1475             f.write(self.data)
1476
1477         # block cache should have found the existing block
1478         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1479                                                   disk_cache_dir=self.disk_cache_dir)
1480         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1481
1482         self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1483
1484         get_mock.assert_not_called()
1485
1486
1487     @mock.patch('arvados.KeepClient.KeepService.get')
1488     def test_disk_cache_share(self, get_mock):
1489         # confirm it finds a cache block written after the disk cache
1490         # was initialized.
1491
1492         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1493                                                   disk_cache_dir=self.disk_cache_dir)
1494         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1495
1496         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1497         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1498             f.write(self.data)
1499
1500         # when we try to get the block, it'll check the disk and find it.
1501         self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1502
1503         get_mock.assert_not_called()
1504
1505
1506     def test_disk_cache_write(self):
1507         # confirm the cache block was created
1508
1509         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1510                                                   disk_cache_dir=self.disk_cache_dir)
1511         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1512
1513         with tutil.mock_keep_responses(self.data, 200) as mock:
1514             self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1515
1516         self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1517
1518         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1519             self.assertTrue(tutil.binary_compare(f.read(), self.data))
1520
1521
1522     def test_disk_cache_clean(self):
1523         # confirm that a tmp file in the cache is cleaned up
1524
1525         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1526         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock"), "wb") as f:
1527             f.write(b"abc1")
1528
1529         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC"), "wb") as f:
1530             f.write(b"abc2")
1531
1532         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC"), "wb") as f:
1533             f.write(b"abc3")
1534
1535         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1536         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1537         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1538
1539         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1540                                                   disk_cache_dir=self.disk_cache_dir)
1541
1542         # The tmp still hasn't been deleted because it was created in the last 60 seconds
1543         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1544         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1545         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1546
1547         # Set the mtime to 61s in the past
1548         os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock"), times=(time.time()-61, time.time()-61))
1549         os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC"), times=(time.time()-61, time.time()-61))
1550         os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC"), times=(time.time()-61, time.time()-61))
1551
1552         block_cache2 = arvados.keep.KeepBlockCache(disk_cache=True,
1553                                                    disk_cache_dir=self.disk_cache_dir)
1554
1555         # Tmp should be gone but the other ones are safe.
1556         self.assertFalse(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1557         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1558         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1559
1560
1561     @mock.patch('arvados.KeepClient.KeepService.get')
1562     def test_disk_cache_cap(self, get_mock):
1563         # confirm that the cache is kept to the desired limit
1564
1565         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1566         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1567             f.write(self.data)
1568
1569         os.makedirs(os.path.join(self.disk_cache_dir, "acb"))
1570         with open(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock"), "wb") as f:
1571             f.write(b"foo")
1572
1573         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1574         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1575
1576         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1577                                                    disk_cache_dir=self.disk_cache_dir,
1578                                                    max_slots=1)
1579
1580         self.assertFalse(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1581         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1582
1583
1584     @mock.patch('arvados.KeepClient.KeepService.get')
1585     def test_disk_cache_share(self, get_mock):
1586         # confirm that a second cache doesn't delete files that belong to the first cache.
1587
1588         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1589         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1590             f.write(self.data)
1591
1592         os.makedirs(os.path.join(self.disk_cache_dir, "acb"))
1593         with open(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock"), "wb") as f:
1594             f.write(b"foo")
1595
1596         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1597         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1598
1599         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1600                                                    disk_cache_dir=self.disk_cache_dir,
1601                                                    max_slots=2)
1602
1603         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1604         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1605
1606         block_cache2 = arvados.keep.KeepBlockCache(disk_cache=True,
1607                                                    disk_cache_dir=self.disk_cache_dir,
1608                                                    max_slots=1)
1609
1610         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1611         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1612
1613
1614
1615     def test_disk_cache_error(self):
1616         os.chmod(self.disk_cache_dir, stat.S_IRUSR)
1617
1618         # Fail during cache initialization.
1619         with self.assertRaises(OSError):
1620             block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1621                                                       disk_cache_dir=self.disk_cache_dir)
1622
1623
1624     def test_disk_cache_write_error(self):
1625         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1626                                                   disk_cache_dir=self.disk_cache_dir)
1627
1628         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1629
1630         # Make the cache dir read-only
1631         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1632         os.chmod(os.path.join(self.disk_cache_dir, self.locator[0:3]), stat.S_IRUSR)
1633
1634         # Cache fails
1635         with self.assertRaises(arvados.errors.KeepCacheError):
1636             with tutil.mock_keep_responses(self.data, 200) as mock:
1637                 keep_client.get(self.locator)
1638
1639
1640     def test_disk_cache_retry_write_error(self):
1641         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1642                                                   disk_cache_dir=self.disk_cache_dir)
1643
1644         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1645
1646         called = False
1647         realmmap = mmap.mmap
1648         def sideeffect_mmap(*args, **kwargs):
1649             nonlocal called
1650             if not called:
1651                 called = True
1652                 raise OSError(errno.ENOSPC, "no space")
1653             else:
1654                 return realmmap(*args, **kwargs)
1655
1656         with patch('mmap.mmap') as mockmmap:
1657             mockmmap.side_effect = sideeffect_mmap
1658
1659             cache_max_before = block_cache.cache_max
1660
1661             with tutil.mock_keep_responses(self.data, 200) as mock:
1662                 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1663
1664             self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1665
1666         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1667             self.assertTrue(tutil.binary_compare(f.read(), self.data))
1668
1669         # shrank the cache in response to ENOSPC
1670         self.assertTrue(cache_max_before > block_cache.cache_max)
1671
1672
1673     def test_disk_cache_retry_write_error2(self):
1674         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1675                                                   disk_cache_dir=self.disk_cache_dir)
1676
1677         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1678
1679         called = False
1680         realmmap = mmap.mmap
1681         def sideeffect_mmap(*args, **kwargs):
1682             nonlocal called
1683             if not called:
1684                 called = True
1685                 raise OSError(errno.ENOMEM, "no memory")
1686             else:
1687                 return realmmap(*args, **kwargs)
1688
1689         with patch('mmap.mmap') as mockmmap:
1690             mockmmap.side_effect = sideeffect_mmap
1691
1692             slots_before = block_cache._max_slots
1693
1694             with tutil.mock_keep_responses(self.data, 200) as mock:
1695                 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1696
1697             self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1698
1699         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1700             self.assertTrue(tutil.binary_compare(f.read(), self.data))
1701
1702         # shrank the cache in response to ENOMEM
1703         self.assertTrue(slots_before > block_cache._max_slots)