21224: fixed project card test
[arvados.git] / sdk / python / tests / test_keep_client.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 standard_library.install_aliases()
9 from builtins import str
10 from builtins import range
11 from builtins import object
12 import hashlib
13 import mock
14 from mock import patch
15 import os
16 import errno
17 import pycurl
18 import random
19 import re
20 import shutil
21 import socket
22 import sys
23 import stat
24 import tempfile
25 import time
26 import unittest
27 import urllib.parse
28 import mmap
29
30 import parameterized
31
32 import arvados
33 import arvados.retry
34 import arvados.util
35 from . import arvados_testutil as tutil
36 from . import keepstub
37 from . import run_test_server
38
39 from .arvados_testutil import DiskCacheBase
40
41 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
42 class KeepTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
43     disk_cache = False
44     MAIN_SERVER = {}
45     KEEP_SERVER = {}
46     block_cache_test = None
47
48     @classmethod
49     def setUpClass(cls):
50         super(KeepTestCase, cls).setUpClass()
51         run_test_server.authorize_with("admin")
52         cls.api_client = arvados.api('v1')
53         cls.block_cache_test = DiskCacheBase()
54         cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
55                                              proxy='', local_store='',
56                                              block_cache=cls.block_cache_test.make_block_cache(cls.disk_cache))
57
58     @classmethod
59     def tearDownClass(cls):
60         super(KeepTestCase, cls).setUpClass()
61         cls.block_cache_test.tearDown()
62
63     def test_KeepBasicRWTest(self):
64         self.assertEqual(0, self.keep_client.upload_counter.get())
65         foo_locator = self.keep_client.put('foo')
66         self.assertRegex(
67             foo_locator,
68             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
69             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
70
71         # 6 bytes because uploaded 2 copies
72         self.assertEqual(6, self.keep_client.upload_counter.get())
73
74         self.assertEqual(0, self.keep_client.download_counter.get())
75         self.assertTrue(tutil.binary_compare(self.keep_client.get(foo_locator),
76                          b'foo'),
77                          'wrong content from Keep.get(md5("foo"))')
78         self.assertEqual(3, self.keep_client.download_counter.get())
79
80     def test_KeepBinaryRWTest(self):
81         blob_str = b'\xff\xfe\xf7\x00\x01\x02'
82         blob_locator = self.keep_client.put(blob_str)
83         self.assertRegex(
84             blob_locator,
85             '^7fc7c53b45e53926ba52821140fef396\+6',
86             ('wrong locator from Keep.put(<binarydata>):' + blob_locator))
87         self.assertEqual(self.keep_client.get(blob_locator),
88                          blob_str,
89                          'wrong content from Keep.get(md5(<binarydata>))')
90
91     def test_KeepLongBinaryRWTest(self):
92         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
93         for i in range(0, 23):
94             blob_data = blob_data + blob_data
95         blob_locator = self.keep_client.put(blob_data)
96         self.assertRegex(
97             blob_locator,
98             '^84d90fc0d8175dd5dcfab04b999bc956\+67108864',
99             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
100         self.assertEqual(self.keep_client.get(blob_locator),
101                          blob_data,
102                          'wrong content from Keep.get(md5(<binarydata>))')
103
104     @unittest.skip("unreliable test - please fix and close #8752")
105     def test_KeepSingleCopyRWTest(self):
106         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
107         blob_locator = self.keep_client.put(blob_data, copies=1)
108         self.assertRegex(
109             blob_locator,
110             '^c902006bc98a3eb4a3663b65ab4a6fab\+8',
111             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
112         self.assertEqual(self.keep_client.get(blob_locator),
113                          blob_data,
114                          'wrong content from Keep.get(md5(<binarydata>))')
115
116     def test_KeepEmptyCollectionTest(self):
117         blob_locator = self.keep_client.put('', copies=1)
118         self.assertRegex(
119             blob_locator,
120             '^d41d8cd98f00b204e9800998ecf8427e\+0',
121             ('wrong locator from Keep.put(""): ' + blob_locator))
122
123     def test_unicode_must_be_ascii(self):
124         # If unicode type, must only consist of valid ASCII
125         foo_locator = self.keep_client.put(u'foo')
126         self.assertRegex(
127             foo_locator,
128             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
129             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
130
131         if sys.version_info < (3, 0):
132             with self.assertRaises(UnicodeEncodeError):
133                 # Error if it is not ASCII
134                 self.keep_client.put(u'\xe2')
135
136         with self.assertRaises(AttributeError):
137             # Must be bytes or have an encode() method
138             self.keep_client.put({})
139
140     def test_KeepHeadTest(self):
141         locator = self.keep_client.put('test_head')
142         self.assertRegex(
143             locator,
144             '^b9a772c7049325feb7130fff1f8333e9\+9',
145             'wrong md5 hash from Keep.put for "test_head": ' + locator)
146         self.assertEqual(True, self.keep_client.head(locator))
147         self.assertEqual(self.keep_client.get(locator),
148                          b'test_head',
149                          'wrong content from Keep.get for "test_head"')
150
151 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
152 class KeepPermissionTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
153     disk_cache = False
154     MAIN_SERVER = {}
155     KEEP_SERVER = {'blob_signing': True}
156
157     def tearDown(self):
158         DiskCacheBase.tearDown(self)
159
160     def test_KeepBasicRWTest(self):
161         run_test_server.authorize_with('active')
162         keep_client = arvados.KeepClient(block_cache=self.make_block_cache(self.disk_cache))
163         foo_locator = keep_client.put('foo')
164         self.assertRegex(
165             foo_locator,
166             r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
167             'invalid locator from Keep.put("foo"): ' + foo_locator)
168         self.assertEqual(keep_client.get(foo_locator),
169                          b'foo',
170                          'wrong content from Keep.get(md5("foo"))')
171
172         # GET with an unsigned locator => bad request
173         bar_locator = keep_client.put('bar')
174         unsigned_bar_locator = "37b51d194a7513e45b56f6524f2d51f2+3"
175         self.assertRegex(
176             bar_locator,
177             r'^37b51d194a7513e45b56f6524f2d51f2\+3\+A[a-f0-9]+@[a-f0-9]+$',
178             'invalid locator from Keep.put("bar"): ' + bar_locator)
179         self.assertRaises(arvados.errors.KeepReadError,
180                           keep_client.get,
181                           unsigned_bar_locator)
182
183         # GET from a different user => bad request
184         run_test_server.authorize_with('spectator')
185         self.assertRaises(arvados.errors.KeepReadError,
186                           arvados.Keep.get,
187                           bar_locator)
188
189         # Unauthenticated GET for a signed locator => bad request
190         # Unauthenticated GET for an unsigned locator => bad request
191         keep_client.api_token = ''
192         self.assertRaises(arvados.errors.KeepReadError,
193                           keep_client.get,
194                           bar_locator)
195         self.assertRaises(arvados.errors.KeepReadError,
196                           keep_client.get,
197                           unsigned_bar_locator)
198
199 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
200 class KeepProxyTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
201     disk_cache = False
202     MAIN_SERVER = {}
203     KEEP_SERVER = {}
204     KEEP_PROXY_SERVER = {}
205
206     @classmethod
207     def setUpClass(cls):
208         super(KeepProxyTestCase, cls).setUpClass()
209         run_test_server.authorize_with('active')
210         cls.api_client = arvados.api('v1')
211
212     def tearDown(self):
213         super(KeepProxyTestCase, self).tearDown()
214         DiskCacheBase.tearDown(self)
215
216     def test_KeepProxyTest1(self):
217         # Will use ARVADOS_KEEP_SERVICES environment variable that
218         # is set by setUpClass().
219         keep_client = arvados.KeepClient(api_client=self.api_client,
220                                          local_store='', block_cache=self.make_block_cache(self.disk_cache))
221         baz_locator = keep_client.put('baz')
222         self.assertRegex(
223             baz_locator,
224             '^73feffa4b7f6bb68e44cf984c85f6e88\+3',
225             'wrong md5 hash from Keep.put("baz"): ' + baz_locator)
226         self.assertEqual(keep_client.get(baz_locator),
227                          b'baz',
228                          'wrong content from Keep.get(md5("baz"))')
229         self.assertTrue(keep_client.using_proxy)
230
231     def test_KeepProxyTestMultipleURIs(self):
232         # Test using ARVADOS_KEEP_SERVICES env var overriding any
233         # existing proxy setting and setting multiple proxies
234         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
235         keep_client = arvados.KeepClient(api_client=self.api_client,
236                                          local_store='',
237                                          block_cache=self.make_block_cache(self.disk_cache))
238         uris = [x['_service_root'] for x in keep_client._keep_services]
239         self.assertEqual(uris, ['http://10.0.0.1/',
240                                 'https://foo.example.org:1234/'])
241
242     def test_KeepProxyTestInvalidURI(self):
243         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org'
244         with self.assertRaises(arvados.errors.ArgumentError):
245             keep_client = arvados.KeepClient(api_client=self.api_client,
246                                              local_store='',
247                                              block_cache=self.make_block_cache(self.disk_cache))
248
249 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
250 class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
251     disk_cache = False
252
253     def tearDown(self):
254         DiskCacheBase.tearDown(self)
255
256     def get_service_roots(self, api_client):
257         keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
258         services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
259         return [urllib.parse.urlparse(url) for url in sorted(services)]
260
261     def test_ssl_flag_respected_in_roots(self):
262         for ssl_flag in [False, True]:
263             services = self.get_service_roots(self.mock_keep_services(
264                 service_ssl_flag=ssl_flag))
265             self.assertEqual(
266                 ('https' if ssl_flag else 'http'), services[0].scheme)
267
268     def test_correct_ports_with_ipv6_addresses(self):
269         service = self.get_service_roots(self.mock_keep_services(
270             service_type='proxy', service_host='100::1', service_port=10, count=1))[0]
271         self.assertEqual('100::1', service.hostname)
272         self.assertEqual(10, service.port)
273
274     def test_recognize_proxy_services_in_controller_response(self):
275         keep_client = arvados.KeepClient(api_client=self.mock_keep_services(
276             service_type='proxy', service_host='localhost', service_port=9, count=1),
277                                          block_cache=self.make_block_cache(self.disk_cache))
278         try:
279             # this will fail, but it ensures we get the service
280             # discovery response
281             keep_client.put('baz2', num_retries=0)
282         except:
283             pass
284         self.assertTrue(keep_client.using_proxy)
285
286     def test_insecure_disables_tls_verify(self):
287         api_client = self.mock_keep_services(count=1)
288         force_timeout = socket.timeout("timed out")
289
290         api_client.insecure = True
291         with tutil.mock_keep_responses(b'foo', 200) as mock:
292             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
293             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
294             self.assertEqual(
295                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
296                 0)
297             self.assertEqual(
298                 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
299                 0)
300
301         api_client.insecure = False
302         with tutil.mock_keep_responses(b'foo', 200) as mock:
303             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
304             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
305             # getopt()==None here means we didn't change the
306             # default. If we were using real pycurl instead of a mock,
307             # it would return the default value 1.
308             self.assertEqual(
309                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
310                 None)
311             self.assertEqual(
312                 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
313                 None)
314
315     def test_refresh_signature(self):
316         blk_digest = '6f5902ac237024bdd0c176cb93063dc4+11'
317         blk_sig = 'da39a3ee5e6b4b0d3255bfef95601890afd80709@53bed294'
318         local_loc = blk_digest+'+A'+blk_sig
319         remote_loc = blk_digest+'+R'+blk_sig
320         api_client = self.mock_keep_services(count=1)
321         headers = {'X-Keep-Locator':local_loc}
322         with tutil.mock_keep_responses('', 200, **headers):
323             # Check that the translated locator gets returned
324             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
325             self.assertEqual(local_loc, keep_client.refresh_signature(remote_loc))
326             # Check that refresh_signature() uses the correct method and headers
327             keep_client._get_or_head = mock.MagicMock()
328             keep_client.refresh_signature(remote_loc)
329             args, kwargs = keep_client._get_or_head.call_args_list[0]
330             self.assertIn(remote_loc, args)
331             self.assertEqual("HEAD", kwargs['method'])
332             self.assertIn('X-Keep-Signature', kwargs['headers'])
333
334     # test_*_timeout verify that KeepClient instructs pycurl to use
335     # the appropriate connection and read timeouts. They don't care
336     # whether pycurl actually exhibits the expected timeout behavior
337     # -- those tests are in the KeepClientTimeout test class.
338
339     def test_get_timeout(self):
340         api_client = self.mock_keep_services(count=1)
341         force_timeout = socket.timeout("timed out")
342         with tutil.mock_keep_responses(force_timeout, 0) as mock:
343             keep_client = arvados.KeepClient(
344                 api_client=api_client,
345                 block_cache=self.make_block_cache(self.disk_cache),
346                 num_retries=0,
347             )
348             with self.assertRaises(arvados.errors.KeepReadError):
349                 keep_client.get('ffffffffffffffffffffffffffffffff')
350             self.assertEqual(
351                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
352                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
353             self.assertEqual(
354                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
355                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
356             self.assertEqual(
357                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
358                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
359
360     def test_put_timeout(self):
361         api_client = self.mock_keep_services(count=1)
362         force_timeout = socket.timeout("timed out")
363         with tutil.mock_keep_responses(force_timeout, 0) as mock:
364             keep_client = arvados.KeepClient(
365                 api_client=api_client,
366                 block_cache=self.make_block_cache(self.disk_cache),
367                 num_retries=0,
368             )
369             with self.assertRaises(arvados.errors.KeepWriteError):
370                 keep_client.put(b'foo')
371             self.assertEqual(
372                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
373                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
374             self.assertEqual(
375                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
376                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
377             self.assertEqual(
378                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
379                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
380
381     def test_head_timeout(self):
382         api_client = self.mock_keep_services(count=1)
383         force_timeout = socket.timeout("timed out")
384         with tutil.mock_keep_responses(force_timeout, 0) as mock:
385             keep_client = arvados.KeepClient(
386                 api_client=api_client,
387                 block_cache=self.make_block_cache(self.disk_cache),
388                 num_retries=0,
389             )
390             with self.assertRaises(arvados.errors.KeepReadError):
391                 keep_client.head('ffffffffffffffffffffffffffffffff')
392             self.assertEqual(
393                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
394                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
395             self.assertEqual(
396                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
397                 None)
398             self.assertEqual(
399                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
400                 None)
401
402     def test_proxy_get_timeout(self):
403         api_client = self.mock_keep_services(service_type='proxy', count=1)
404         force_timeout = socket.timeout("timed out")
405         with tutil.mock_keep_responses(force_timeout, 0) as mock:
406             keep_client = arvados.KeepClient(
407                 api_client=api_client,
408                 block_cache=self.make_block_cache(self.disk_cache),
409                 num_retries=0,
410             )
411             with self.assertRaises(arvados.errors.KeepReadError):
412                 keep_client.get('ffffffffffffffffffffffffffffffff')
413             self.assertEqual(
414                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
415                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
416             self.assertEqual(
417                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
418                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
419             self.assertEqual(
420                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
421                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
422
423     def test_proxy_head_timeout(self):
424         api_client = self.mock_keep_services(service_type='proxy', count=1)
425         force_timeout = socket.timeout("timed out")
426         with tutil.mock_keep_responses(force_timeout, 0) as mock:
427             keep_client = arvados.KeepClient(
428                 api_client=api_client,
429                 block_cache=self.make_block_cache(self.disk_cache),
430                 num_retries=0,
431             )
432             with self.assertRaises(arvados.errors.KeepReadError):
433                 keep_client.head('ffffffffffffffffffffffffffffffff')
434             self.assertEqual(
435                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
436                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
437             self.assertEqual(
438                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
439                 None)
440             self.assertEqual(
441                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
442                 None)
443
444     def test_proxy_put_timeout(self):
445         self.disk_cache_dir = None
446         api_client = self.mock_keep_services(service_type='proxy', count=1)
447         force_timeout = socket.timeout("timed out")
448         with tutil.mock_keep_responses(force_timeout, 0) as mock:
449             keep_client = arvados.KeepClient(
450                 api_client=api_client,
451                 num_retries=0,
452             )
453             with self.assertRaises(arvados.errors.KeepWriteError):
454                 keep_client.put('foo')
455             self.assertEqual(
456                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
457                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
458             self.assertEqual(
459                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
460                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
461             self.assertEqual(
462                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
463                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
464
465     def check_no_services_error(self, verb, exc_class):
466         api_client = mock.MagicMock(name='api_client')
467         api_client.keep_services().accessible().execute.side_effect = (
468             arvados.errors.ApiError)
469         keep_client = arvados.KeepClient(
470             api_client=api_client,
471             block_cache=self.make_block_cache(self.disk_cache),
472             num_retries=0,
473         )
474         with self.assertRaises(exc_class) as err_check:
475             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
476         self.assertEqual(0, len(err_check.exception.request_errors()))
477
478     def test_get_error_with_no_services(self):
479         self.check_no_services_error('get', arvados.errors.KeepReadError)
480
481     def test_head_error_with_no_services(self):
482         self.check_no_services_error('head', arvados.errors.KeepReadError)
483
484     def test_put_error_with_no_services(self):
485         self.check_no_services_error('put', arvados.errors.KeepWriteError)
486
487     def check_errors_from_last_retry(self, verb, exc_class):
488         api_client = self.mock_keep_services(count=2)
489         req_mock = tutil.mock_keep_responses(
490             "retry error reporting test", 500, 500, 500, 500, 500, 500, 502, 502)
491         with req_mock, tutil.skip_sleep, \
492                 self.assertRaises(exc_class) as err_check:
493             keep_client = arvados.KeepClient(
494                 api_client=api_client,
495                 block_cache=self.make_block_cache(self.disk_cache),
496                 num_retries=0,
497             )
498             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
499                                        num_retries=3)
500         self.assertEqual([502, 502], [
501                 getattr(error, 'status_code', None)
502                 for error in err_check.exception.request_errors().values()])
503         self.assertRegex(str(err_check.exception), r'failed to (read|write) .* after 4 attempts')
504
505     def test_get_error_reflects_last_retry(self):
506         self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
507
508     def test_head_error_reflects_last_retry(self):
509         self.check_errors_from_last_retry('head', arvados.errors.KeepReadError)
510
511     def test_put_error_reflects_last_retry(self):
512         self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
513
514     def test_put_error_does_not_include_successful_puts(self):
515         data = 'partial failure test'
516         data_loc = tutil.str_keep_locator(data)
517         api_client = self.mock_keep_services(count=3)
518         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
519                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
520             keep_client = arvados.KeepClient(
521                 api_client=api_client,
522                 block_cache=self.make_block_cache(self.disk_cache),
523                 num_retries=0,
524             )
525             keep_client.put(data)
526         self.assertEqual(2, len(exc_check.exception.request_errors()))
527
528     def test_proxy_put_with_no_writable_services(self):
529         data = 'test with no writable services'
530         data_loc = tutil.str_keep_locator(data)
531         api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
532         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
533                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
534             keep_client = arvados.KeepClient(
535                 api_client=api_client,
536                 block_cache=self.make_block_cache(self.disk_cache),
537                 num_retries=0,
538             )
539             keep_client.put(data)
540         self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
541         self.assertEqual(0, len(exc_check.exception.request_errors()))
542
543     def test_oddball_service_get(self):
544         body = b'oddball service get'
545         api_client = self.mock_keep_services(service_type='fancynewblobstore')
546         with tutil.mock_keep_responses(body, 200):
547             keep_client = arvados.KeepClient(
548                 api_client=api_client,
549                 block_cache=self.make_block_cache(self.disk_cache),
550                 num_retries=0,
551             )
552             actual = keep_client.get(tutil.str_keep_locator(body))
553         self.assertEqual(body, actual)
554
555     def test_oddball_service_put(self):
556         body = b'oddball service put'
557         pdh = tutil.str_keep_locator(body)
558         api_client = self.mock_keep_services(service_type='fancynewblobstore')
559         with tutil.mock_keep_responses(pdh, 200):
560             keep_client = arvados.KeepClient(
561                 api_client=api_client,
562                 block_cache=self.make_block_cache(self.disk_cache),
563                 num_retries=0,
564             )
565             actual = keep_client.put(body, copies=1)
566         self.assertEqual(pdh, actual)
567
568     def test_oddball_service_writer_count(self):
569         body = b'oddball service writer count'
570         pdh = tutil.str_keep_locator(body)
571         api_client = self.mock_keep_services(service_type='fancynewblobstore',
572                                              count=4)
573         headers = {'x-keep-replicas-stored': 3}
574         with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
575                                        **headers) as req_mock:
576             keep_client = arvados.KeepClient(
577                 api_client=api_client,
578                 block_cache=self.make_block_cache(self.disk_cache),
579                 num_retries=0,
580             )
581             actual = keep_client.put(body, copies=2)
582         self.assertEqual(pdh, actual)
583         self.assertEqual(1, req_mock.call_count)
584
585 @tutil.skip_sleep
586 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
587 class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
588     disk_cache = False
589
590     def setUp(self):
591         self.api_client = self.mock_keep_services(count=2)
592         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
593         self.data = b'xyzzy'
594         self.locator = '1271ed5ef305aadabc605b1609e24c52'
595
596     def tearDown(self):
597         DiskCacheBase.tearDown(self)
598
599     @mock.patch('arvados.KeepClient.KeepService.get')
600     def test_get_request_cache(self, get_mock):
601         with tutil.mock_keep_responses(self.data, 200, 200):
602             self.keep_client.get(self.locator)
603             self.keep_client.get(self.locator)
604         # Request already cached, don't require more than one request
605         get_mock.assert_called_once()
606
607     @mock.patch('arvados.KeepClient.KeepService.get')
608     def test_head_request_cache(self, get_mock):
609         with tutil.mock_keep_responses(self.data, 200, 200):
610             self.keep_client.head(self.locator)
611             self.keep_client.head(self.locator)
612         # Don't cache HEAD requests so that they're not confused with GET reqs
613         self.assertEqual(2, get_mock.call_count)
614
615     @mock.patch('arvados.KeepClient.KeepService.get')
616     def test_head_and_then_get_return_different_responses(self, get_mock):
617         head_resp = None
618         get_resp = None
619         get_mock.side_effect = [b'first response', b'second response']
620         with tutil.mock_keep_responses(self.data, 200, 200):
621             head_resp = self.keep_client.head(self.locator)
622             get_resp = self.keep_client.get(self.locator)
623         self.assertEqual(b'first response', head_resp)
624         # First reponse was not cached because it was from a HEAD request.
625         self.assertNotEqual(head_resp, get_resp)
626
627
628
629
630
631 @tutil.skip_sleep
632 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
633 class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
634     disk_cache = False
635
636     def setUp(self):
637         self.api_client = self.mock_keep_services(count=2)
638         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
639         self.data = b'xyzzy'
640         self.locator = '1271ed5ef305aadabc605b1609e24c52'
641         self.test_id = arvados.util.new_request_id()
642         self.assertRegex(self.test_id, r'^req-[a-z0-9]{20}$')
643         # If we don't set request_id to None explicitly here, it will
644         # return <MagicMock name='api_client_mock.request_id'
645         # id='123456789'>:
646         self.api_client.request_id = None
647
648     def tearDown(self):
649         DiskCacheBase.tearDown(self)
650
651     def test_default_to_api_client_request_id(self):
652         self.api_client.request_id = self.test_id
653         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
654             self.keep_client.put(self.data)
655         self.assertEqual(2, len(mock.responses))
656         for resp in mock.responses:
657             self.assertProvidedRequestId(resp)
658
659         with tutil.mock_keep_responses(self.data, 200) as mock:
660             self.keep_client.get(self.locator)
661         self.assertProvidedRequestId(mock.responses[0])
662
663         with tutil.mock_keep_responses(b'', 200) as mock:
664             self.keep_client.head(self.locator)
665         self.assertProvidedRequestId(mock.responses[0])
666
667     def test_explicit_request_id(self):
668         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
669             self.keep_client.put(self.data, request_id=self.test_id)
670         self.assertEqual(2, len(mock.responses))
671         for resp in mock.responses:
672             self.assertProvidedRequestId(resp)
673
674         with tutil.mock_keep_responses(self.data, 200) as mock:
675             self.keep_client.get(self.locator, request_id=self.test_id)
676         self.assertProvidedRequestId(mock.responses[0])
677
678         with tutil.mock_keep_responses(b'', 200) as mock:
679             self.keep_client.head(self.locator, request_id=self.test_id)
680         self.assertProvidedRequestId(mock.responses[0])
681
682     def test_automatic_request_id(self):
683         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
684             self.keep_client.put(self.data)
685         self.assertEqual(2, len(mock.responses))
686         for resp in mock.responses:
687             self.assertAutomaticRequestId(resp)
688
689         with tutil.mock_keep_responses(self.data, 200) as mock:
690             self.keep_client.get(self.locator)
691         self.assertAutomaticRequestId(mock.responses[0])
692
693         with tutil.mock_keep_responses(b'', 200) as mock:
694             self.keep_client.head(self.locator)
695         self.assertAutomaticRequestId(mock.responses[0])
696
697     def test_request_id_in_exception(self):
698         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
699             with self.assertRaisesRegex(arvados.errors.KeepReadError, self.test_id):
700                 self.keep_client.head(self.locator, request_id=self.test_id)
701
702         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
703             with self.assertRaisesRegex(arvados.errors.KeepReadError, r'req-[a-z0-9]{20}'):
704                 self.keep_client.get(self.locator)
705
706         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
707             with self.assertRaisesRegex(arvados.errors.KeepWriteError, self.test_id):
708                 self.keep_client.put(self.data, request_id=self.test_id)
709
710         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
711             with self.assertRaisesRegex(arvados.errors.KeepWriteError, r'req-[a-z0-9]{20}'):
712                 self.keep_client.put(self.data)
713
714     def assertAutomaticRequestId(self, resp):
715         hdr = [x for x in resp.getopt(pycurl.HTTPHEADER)
716                if x.startswith('X-Request-Id: ')][0]
717         self.assertNotEqual(hdr, 'X-Request-Id: '+self.test_id)
718         self.assertRegex(hdr, r'^X-Request-Id: req-[a-z0-9]{20}$')
719
720     def assertProvidedRequestId(self, resp):
721         self.assertIn('X-Request-Id: '+self.test_id,
722                       resp.getopt(pycurl.HTTPHEADER))
723
724
725 @tutil.skip_sleep
726 #@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
727 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
728     disk_cache = False
729
730     def setUp(self):
731         # expected_order[i] is the probe order for
732         # hash=md5(sprintf("%064x",i)) where there are 16 services
733         # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
734         # the first probe for the block consisting of 64 "0"
735         # characters is the service whose uuid is
736         # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
737         self.services = 16
738         self.expected_order = [
739             list('3eab2d5fc9681074'),
740             list('097dba52e648f1c3'),
741             list('c5b4e023f8a7d691'),
742             list('9d81c02e76a3bf54'),
743             ]
744         self.blocks = [
745             "{:064x}".format(x).encode()
746             for x in range(len(self.expected_order))]
747         self.hashes = [
748             hashlib.md5(self.blocks[x]).hexdigest()
749             for x in range(len(self.expected_order))]
750         self.api_client = self.mock_keep_services(count=self.services)
751         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
752
753     def tearDown(self):
754         DiskCacheBase.tearDown(self)
755
756     def test_weighted_service_roots_against_reference_set(self):
757         # Confirm weighted_service_roots() returns the correct order
758         for i, hash in enumerate(self.hashes):
759             roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash))
760             got_order = [
761                 re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
762                 for root in roots]
763             self.assertEqual(self.expected_order[i], got_order)
764
765     def test_get_probe_order_against_reference_set(self):
766         self._test_probe_order_against_reference_set(
767             lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
768
769     def test_head_probe_order_against_reference_set(self):
770         self._test_probe_order_against_reference_set(
771             lambda i: self.keep_client.head(self.hashes[i], num_retries=1))
772
773     def test_put_probe_order_against_reference_set(self):
774         # copies=1 prevents the test from being sensitive to races
775         # between writer threads.
776         self._test_probe_order_against_reference_set(
777             lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1))
778
779     def _test_probe_order_against_reference_set(self, op):
780         for i in range(len(self.blocks)):
781             with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \
782                  self.assertRaises(arvados.errors.KeepRequestError):
783                 op(i)
784             got_order = [
785                 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
786                 for resp in mock.responses]
787             self.assertEqual(self.expected_order[i]*2, got_order)
788
789     def test_put_probe_order_multiple_copies(self):
790         for copies in range(2, 4):
791             for i in range(len(self.blocks)):
792                 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \
793                      self.assertRaises(arvados.errors.KeepWriteError):
794                     self.keep_client.put(self.blocks[i], num_retries=2, copies=copies)
795                 got_order = [
796                     re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
797                     for resp in mock.responses]
798                 # With T threads racing to make requests, the position
799                 # of a given server in the sequence of HTTP requests
800                 # (got_order) cannot be more than T-1 positions
801                 # earlier than that server's position in the reference
802                 # probe sequence (expected_order).
803                 #
804                 # Loop invariant: we have accounted for +pos+ expected
805                 # probes, either by seeing them in +got_order+ or by
806                 # putting them in +pending+ in the hope of seeing them
807                 # later. As long as +len(pending)<T+, we haven't
808                 # started a request too early.
809                 pending = []
810                 for pos, expected in enumerate(self.expected_order[i]*3):
811                     got = got_order[pos-len(pending)]
812                     while got in pending:
813                         del pending[pending.index(got)]
814                         got = got_order[pos-len(pending)]
815                     if got != expected:
816                         pending.append(expected)
817                         self.assertLess(
818                             len(pending), copies,
819                             "pending={}, with copies={}, got {}, expected {}".format(
820                                 pending, copies, repr(got_order), repr(self.expected_order[i]*3)))
821
822     def test_probe_waste_adding_one_server(self):
823         hashes = [
824             hashlib.md5("{:064x}".format(x).encode()).hexdigest() for x in range(100)]
825         initial_services = 12
826         self.api_client = self.mock_keep_services(count=initial_services)
827         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
828         probes_before = [
829             self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
830         for added_services in range(1, 12):
831             api_client = self.mock_keep_services(count=initial_services+added_services)
832             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
833             total_penalty = 0
834             for hash_index in range(len(hashes)):
835                 probe_after = keep_client.weighted_service_roots(
836                     arvados.KeepLocator(hashes[hash_index]))
837                 penalty = probe_after.index(probes_before[hash_index][0])
838                 self.assertLessEqual(penalty, added_services)
839                 total_penalty += penalty
840             # Average penalty per block should not exceed
841             # N(added)/N(orig) by more than 20%, and should get closer
842             # to the ideal as we add data points.
843             expect_penalty = (
844                 added_services *
845                 len(hashes) / initial_services)
846             max_penalty = (
847                 expect_penalty *
848                 (120 - added_services)/100)
849             min_penalty = (
850                 expect_penalty * 8/10)
851             self.assertTrue(
852                 min_penalty <= total_penalty <= max_penalty,
853                 "With {}+{} services, {} blocks, penalty {} but expected {}..{}".format(
854                     initial_services,
855                     added_services,
856                     len(hashes),
857                     total_penalty,
858                     min_penalty,
859                     max_penalty))
860
861     def check_64_zeros_error_order(self, verb, exc_class):
862         data = b'0' * 64
863         if verb == 'get':
864             data = tutil.str_keep_locator(data)
865         # Arbitrary port number:
866         aport = random.randint(1024,65535)
867         api_client = self.mock_keep_services(service_port=aport, count=self.services)
868         keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
869         with mock.patch('pycurl.Curl') as curl_mock, \
870              self.assertRaises(exc_class) as err_check:
871             curl_mock.return_value = tutil.FakeCurl.make(code=500, body=b'')
872             getattr(keep_client, verb)(data)
873         urls = [urllib.parse.urlparse(url)
874                 for url in err_check.exception.request_errors()]
875         self.assertEqual([('keep0x' + c, aport) for c in '3eab2d5fc9681074'],
876                          [(url.hostname, url.port) for url in urls])
877
878     def test_get_error_shows_probe_order(self):
879         self.check_64_zeros_error_order('get', arvados.errors.KeepReadError)
880
881     def test_put_error_shows_probe_order(self):
882         self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
883
884 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
885 class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase, DiskCacheBase):
886     disk_cache = False
887
888     # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
889     # 1s worth of data and then trigger bandwidth errors before running
890     # out of data.
891     DATA = b'x'*2**11
892     BANDWIDTH_LOW_LIM = 1024
893     TIMEOUT_TIME = 1.0
894
895     def tearDown(self):
896         DiskCacheBase.tearDown(self)
897
898     class assertTakesBetween(unittest.TestCase):
899         def __init__(self, tmin, tmax):
900             self.tmin = tmin
901             self.tmax = tmax
902
903         def __enter__(self):
904             self.t0 = time.time()
905
906         def __exit__(self, *args, **kwargs):
907             # Round times to milliseconds, like CURL. Otherwise, we
908             # fail when CURL reaches a 1s timeout at 0.9998s.
909             delta = round(time.time() - self.t0, 3)
910             self.assertGreaterEqual(delta, self.tmin)
911             self.assertLessEqual(delta, self.tmax)
912
913     class assertTakesGreater(unittest.TestCase):
914         def __init__(self, tmin):
915             self.tmin = tmin
916
917         def __enter__(self):
918             self.t0 = time.time()
919
920         def __exit__(self, *args, **kwargs):
921             delta = round(time.time() - self.t0, 3)
922             self.assertGreaterEqual(delta, self.tmin)
923
924     def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
925         return arvados.KeepClient(
926             api_client=self.api_client,
927             timeout=timeouts, block_cache=self.make_block_cache(self.disk_cache))
928
929     def test_timeout_slow_connect(self):
930         # Can't simulate TCP delays with our own socket. Leave our
931         # stub server running uselessly, and try to connect to an
932         # unroutable IP address instead.
933         self.api_client = self.mock_keep_services(
934             count=1,
935             service_host='240.0.0.0',
936         )
937         with self.assertTakesBetween(0.1, 0.5):
938             with self.assertRaises(arvados.errors.KeepWriteError):
939                 self.keepClient().put(self.DATA, copies=1, num_retries=0)
940
941     def test_low_bandwidth_no_delays_success(self):
942         self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
943         kc = self.keepClient()
944         loc = kc.put(self.DATA, copies=1, num_retries=0)
945         self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
946
947     def test_too_low_bandwidth_no_delays_failure(self):
948         # Check that lessening bandwidth corresponds to failing
949         kc = self.keepClient()
950         loc = kc.put(self.DATA, copies=1, num_retries=0)
951         self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
952         with self.assertTakesGreater(self.TIMEOUT_TIME):
953             with self.assertRaises(arvados.errors.KeepReadError):
954                 kc.get(loc, num_retries=0)
955         with self.assertTakesGreater(self.TIMEOUT_TIME):
956             with self.assertRaises(arvados.errors.KeepWriteError):
957                 kc.put(self.DATA, copies=1, num_retries=0)
958
959     def test_low_bandwidth_with_server_response_delay_failure(self):
960         kc = self.keepClient()
961         loc = kc.put(self.DATA, copies=1, num_retries=0)
962         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
963         # Note the actual delay must be 1s longer than the low speed
964         # limit interval in order for curl to detect it reliably.
965         self.server.setdelays(response=self.TIMEOUT_TIME+1)
966         with self.assertTakesGreater(self.TIMEOUT_TIME):
967             with self.assertRaises(arvados.errors.KeepReadError):
968                 kc.get(loc, num_retries=0)
969         with self.assertTakesGreater(self.TIMEOUT_TIME):
970             with self.assertRaises(arvados.errors.KeepWriteError):
971                 kc.put(self.DATA, copies=1, num_retries=0)
972         with self.assertTakesGreater(self.TIMEOUT_TIME):
973             kc.head(loc, num_retries=0)
974
975     def test_low_bandwidth_with_server_mid_delay_failure(self):
976         kc = self.keepClient()
977         loc = kc.put(self.DATA, copies=1, num_retries=0)
978         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
979         # Note the actual delay must be 1s longer than the low speed
980         # limit interval in order for curl to detect it reliably.
981         self.server.setdelays(mid_write=self.TIMEOUT_TIME+1, mid_read=self.TIMEOUT_TIME+1)
982         with self.assertTakesGreater(self.TIMEOUT_TIME):
983             with self.assertRaises(arvados.errors.KeepReadError) as e:
984                 kc.get(loc, num_retries=0)
985         with self.assertTakesGreater(self.TIMEOUT_TIME):
986             with self.assertRaises(arvados.errors.KeepWriteError):
987                 kc.put(self.DATA, copies=1, num_retries=0)
988
989     def test_timeout_slow_request(self):
990         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
991         self.server.setdelays(request=.2)
992         self._test_connect_timeout_under_200ms(loc)
993         self.server.setdelays(request=2)
994         self._test_response_timeout_under_2s(loc)
995
996     def test_timeout_slow_response(self):
997         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
998         self.server.setdelays(response=.2)
999         self._test_connect_timeout_under_200ms(loc)
1000         self.server.setdelays(response=2)
1001         self._test_response_timeout_under_2s(loc)
1002
1003     def test_timeout_slow_response_body(self):
1004         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
1005         self.server.setdelays(response_body=.2)
1006         self._test_connect_timeout_under_200ms(loc)
1007         self.server.setdelays(response_body=2)
1008         self._test_response_timeout_under_2s(loc)
1009
1010     def _test_connect_timeout_under_200ms(self, loc):
1011         # Allow 100ms to connect, then 1s for response. Everything
1012         # should work, and everything should take at least 200ms to
1013         # return.
1014         kc = self.keepClient(timeouts=(.1, 1))
1015         with self.assertTakesBetween(.2, .3):
1016             kc.put(self.DATA, copies=1, num_retries=0)
1017         with self.assertTakesBetween(.2, .3):
1018             self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
1019
1020     def _test_response_timeout_under_2s(self, loc):
1021         # Allow 10s to connect, then 1s for response. Nothing should
1022         # work, and everything should take at least 1s to return.
1023         kc = self.keepClient(timeouts=(10, 1))
1024         with self.assertTakesBetween(1, 9):
1025             with self.assertRaises(arvados.errors.KeepReadError):
1026                 kc.get(loc, num_retries=0)
1027         with self.assertTakesBetween(1, 9):
1028             with self.assertRaises(arvados.errors.KeepWriteError):
1029                 kc.put(self.DATA, copies=1, num_retries=0)
1030
1031 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1032 class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
1033     disk_cache = False
1034
1035     def tearDown(self):
1036         DiskCacheBase.tearDown(self)
1037
1038     def mock_disks_and_gateways(self, disks=3, gateways=1):
1039         self.gateways = [{
1040                 'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
1041                 'owner_uuid': 'zzzzz-tpzed-000000000000000',
1042                 'service_host': 'gatewayhost{}'.format(i),
1043                 'service_port': 12345,
1044                 'service_ssl_flag': True,
1045                 'service_type': 'gateway:test',
1046         } for i in range(gateways)]
1047         self.gateway_roots = [
1048             "https://{service_host}:{service_port}/".format(**gw)
1049             for gw in self.gateways]
1050         self.api_client = self.mock_keep_services(
1051             count=disks, additional_services=self.gateways)
1052         self.keepClient = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
1053
1054     @mock.patch('pycurl.Curl')
1055     def test_get_with_gateway_hint_first(self, MockCurl):
1056         MockCurl.return_value = tutil.FakeCurl.make(
1057             code=200, body='foo', headers={'Content-Length': 3})
1058         self.mock_disks_and_gateways()
1059         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@' + self.gateways[0]['uuid']
1060         self.assertEqual(b'foo', self.keepClient.get(locator))
1061         self.assertEqual(self.gateway_roots[0]+locator,
1062                          MockCurl.return_value.getopt(pycurl.URL).decode())
1063         self.assertEqual(True, self.keepClient.head(locator))
1064
1065     @mock.patch('pycurl.Curl')
1066     def test_get_with_gateway_hints_in_order(self, MockCurl):
1067         gateways = 4
1068         disks = 3
1069         mocks = [
1070             tutil.FakeCurl.make(code=404, body='')
1071             for _ in range(gateways+disks)
1072         ]
1073         MockCurl.side_effect = tutil.queue_with(mocks)
1074         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1075         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1076                            ['K@'+gw['uuid'] for gw in self.gateways])
1077         with self.assertRaises(arvados.errors.NotFoundError):
1078             self.keepClient.get(locator)
1079         # Gateways are tried first, in the order given.
1080         for i, root in enumerate(self.gateway_roots):
1081             self.assertEqual(root+locator,
1082                              mocks[i].getopt(pycurl.URL).decode())
1083         # Disk services are tried next.
1084         for i in range(gateways, gateways+disks):
1085             self.assertRegex(
1086                 mocks[i].getopt(pycurl.URL).decode(),
1087                 r'keep0x')
1088
1089     @mock.patch('pycurl.Curl')
1090     def test_head_with_gateway_hints_in_order(self, MockCurl):
1091         gateways = 4
1092         disks = 3
1093         mocks = [
1094             tutil.FakeCurl.make(code=404, body=b'')
1095             for _ in range(gateways+disks)
1096         ]
1097         MockCurl.side_effect = tutil.queue_with(mocks)
1098         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1099         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1100                            ['K@'+gw['uuid'] for gw in self.gateways])
1101         with self.assertRaises(arvados.errors.NotFoundError):
1102             self.keepClient.head(locator)
1103         # Gateways are tried first, in the order given.
1104         for i, root in enumerate(self.gateway_roots):
1105             self.assertEqual(root+locator,
1106                              mocks[i].getopt(pycurl.URL).decode())
1107         # Disk services are tried next.
1108         for i in range(gateways, gateways+disks):
1109             self.assertRegex(
1110                 mocks[i].getopt(pycurl.URL).decode(),
1111                 r'keep0x')
1112
1113     @mock.patch('pycurl.Curl')
1114     def test_get_with_remote_proxy_hint(self, MockCurl):
1115         MockCurl.return_value = tutil.FakeCurl.make(
1116             code=200, body=b'foo', headers={'Content-Length': 3})
1117         self.mock_disks_and_gateways()
1118         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1119         self.assertEqual(b'foo', self.keepClient.get(locator))
1120         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1121                          MockCurl.return_value.getopt(pycurl.URL).decode())
1122
1123     @mock.patch('pycurl.Curl')
1124     def test_head_with_remote_proxy_hint(self, MockCurl):
1125         MockCurl.return_value = tutil.FakeCurl.make(
1126             code=200, body=b'foo', headers={'Content-Length': 3})
1127         self.mock_disks_and_gateways()
1128         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1129         self.assertEqual(True, self.keepClient.head(locator))
1130         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1131                          MockCurl.return_value.getopt(pycurl.URL).decode())
1132
1133 class KeepClientRetryTestMixin(object):
1134     disk_cache = False
1135
1136     # Testing with a local Keep store won't exercise the retry behavior.
1137     # Instead, our strategy is:
1138     # * Create a client with one proxy specified (pointed at a black
1139     #   hole), so there's no need to instantiate an API client, and
1140     #   all HTTP requests come from one place.
1141     # * Mock httplib's request method to provide simulated responses.
1142     # This lets us test the retry logic extensively without relying on any
1143     # supporting servers, and prevents side effects in case something hiccups.
1144     # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
1145     # run_method().
1146     #
1147     # Test classes must define TEST_PATCHER to a method that mocks
1148     # out appropriate methods in the client.
1149
1150     PROXY_ADDR = 'http://[%s]:65535/' % (tutil.TEST_HOST,)
1151     TEST_DATA = b'testdata'
1152     TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
1153
1154     def setUp(self):
1155         self.client_kwargs = {'proxy': self.PROXY_ADDR, 'local_store': ''}
1156
1157     def new_client(self, **caller_kwargs):
1158         kwargs = self.client_kwargs.copy()
1159         kwargs.update(caller_kwargs)
1160         kwargs['block_cache'] = self.make_block_cache(self.disk_cache)
1161         return arvados.KeepClient(**kwargs)
1162
1163     def run_method(self, *args, **kwargs):
1164         raise NotImplementedError("test subclasses must define run_method")
1165
1166     def check_success(self, expected=None, *args, **kwargs):
1167         if expected is None:
1168             expected = self.DEFAULT_EXPECT
1169         self.assertEqual(expected, self.run_method(*args, **kwargs))
1170
1171     def check_exception(self, error_class=None, *args, **kwargs):
1172         if error_class is None:
1173             error_class = self.DEFAULT_EXCEPTION
1174         with self.assertRaises(error_class) as err:
1175             self.run_method(*args, **kwargs)
1176         return err
1177
1178     def test_immediate_success(self):
1179         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 200):
1180             self.check_success()
1181
1182     def test_retry_then_success(self):
1183         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1184             self.check_success(num_retries=3)
1185
1186     def test_exception_then_success(self):
1187         with self.TEST_PATCHER(self.DEFAULT_EXPECT, Exception('mock err'), 200):
1188             self.check_success(num_retries=3)
1189
1190     def test_no_retry_after_permanent_error(self):
1191         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 403, 200):
1192             self.check_exception(num_retries=3)
1193
1194     def test_error_after_retries_exhausted(self):
1195         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 500, 200):
1196             err = self.check_exception(num_retries=1)
1197         self.assertRegex(str(err.exception), r'failed to .* after 2 attempts')
1198
1199     def test_num_retries_instance_fallback(self):
1200         self.client_kwargs['num_retries'] = 3
1201         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1202             self.check_success()
1203
1204
1205 @tutil.skip_sleep
1206 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1207 class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1208     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
1209     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1210     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1211     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1212
1213     def tearDown(self):
1214         DiskCacheBase.tearDown(self)
1215
1216     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1217                    *args, **kwargs):
1218         return self.new_client().get(locator, *args, **kwargs)
1219
1220     def test_specific_exception_when_not_found(self):
1221         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1222             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1223
1224     def test_general_exception_with_mixed_errors(self):
1225         # get should raise a NotFoundError if no server returns the block,
1226         # and a high threshold of servers report that it's not found.
1227         # This test rigs up 50/50 disagreement between two servers, and
1228         # checks that it does not become a NotFoundError.
1229         client = self.new_client(num_retries=0)
1230         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1231             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1232                 client.get(self.HINTED_LOCATOR)
1233             self.assertNotIsInstance(
1234                 exc_check.exception, arvados.errors.NotFoundError,
1235                 "mixed errors raised NotFoundError")
1236
1237     def test_hint_server_can_succeed_without_retries(self):
1238         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1239             self.check_success(locator=self.HINTED_LOCATOR)
1240
1241     def test_try_next_server_after_timeout(self):
1242         with tutil.mock_keep_responses(
1243                 (socket.timeout("timed out"), 200),
1244                 (self.DEFAULT_EXPECT, 200)):
1245             self.check_success(locator=self.HINTED_LOCATOR)
1246
1247     def test_retry_data_with_wrong_checksum(self):
1248         with tutil.mock_keep_responses(
1249                 ('baddata', 200),
1250                 (self.DEFAULT_EXPECT, 200)):
1251             self.check_success(locator=self.HINTED_LOCATOR)
1252
1253 @tutil.skip_sleep
1254 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1255 class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1256     DEFAULT_EXPECT = True
1257     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1258     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1259     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1260
1261     def tearDown(self):
1262         DiskCacheBase.tearDown(self)
1263
1264     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1265                    *args, **kwargs):
1266         return self.new_client().head(locator, *args, **kwargs)
1267
1268     def test_specific_exception_when_not_found(self):
1269         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1270             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1271
1272     def test_general_exception_with_mixed_errors(self):
1273         # head should raise a NotFoundError if no server returns the block,
1274         # and a high threshold of servers report that it's not found.
1275         # This test rigs up 50/50 disagreement between two servers, and
1276         # checks that it does not become a NotFoundError.
1277         client = self.new_client(num_retries=0)
1278         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1279             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1280                 client.head(self.HINTED_LOCATOR)
1281             self.assertNotIsInstance(
1282                 exc_check.exception, arvados.errors.NotFoundError,
1283                 "mixed errors raised NotFoundError")
1284
1285     def test_hint_server_can_succeed_without_retries(self):
1286         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1287             self.check_success(locator=self.HINTED_LOCATOR)
1288
1289     def test_try_next_server_after_timeout(self):
1290         with tutil.mock_keep_responses(
1291                 (socket.timeout("timed out"), 200),
1292                 (self.DEFAULT_EXPECT, 200)):
1293             self.check_success(locator=self.HINTED_LOCATOR)
1294
1295 @tutil.skip_sleep
1296 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1297 class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1298     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
1299     DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
1300     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1301
1302     def tearDown(self):
1303         DiskCacheBase.tearDown(self)
1304
1305     def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
1306                    copies=1, *args, **kwargs):
1307         return self.new_client().put(data, copies, *args, **kwargs)
1308
1309     def test_do_not_send_multiple_copies_to_same_server(self):
1310         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200):
1311             self.check_exception(copies=2, num_retries=3)
1312
1313
1314 class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
1315
1316     class FakeKeepService(object):
1317         def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
1318             self.delay = delay
1319             self.will_succeed = will_succeed
1320             self.will_raise = will_raise
1321             self._result = {}
1322             self._result['headers'] = {}
1323             self._result['headers']['x-keep-replicas-stored'] = str(replicas)
1324             self._result['headers']['x-keep-storage-classes-confirmed'] = 'default={}'.format(replicas)
1325             self._result['body'] = 'foobar'
1326
1327         def put(self, data_hash, data, timeout, headers):
1328             time.sleep(self.delay)
1329             if self.will_raise is not None:
1330                 raise self.will_raise
1331             return self.will_succeed
1332
1333         def last_result(self):
1334             if self.will_succeed:
1335                 return self._result
1336             else:
1337                 return {"status_code": 500, "body": "didn't succeed"}
1338
1339         def finished(self):
1340             return False
1341
1342     def setUp(self):
1343         self.copies = 3
1344         self.pool = arvados.KeepClient.KeepWriterThreadPool(
1345             data = 'foo',
1346             data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1347             max_service_replicas = self.copies,
1348             copies = self.copies
1349         )
1350
1351     def test_only_write_enough_on_success(self):
1352         for i in range(10):
1353             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1354             self.pool.add_task(ks, None)
1355         self.pool.join()
1356         self.assertEqual(self.pool.done(), (self.copies, []))
1357
1358     def test_only_write_enough_on_partial_success(self):
1359         for i in range(5):
1360             ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
1361             self.pool.add_task(ks, None)
1362             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1363             self.pool.add_task(ks, None)
1364         self.pool.join()
1365         self.assertEqual(self.pool.done(), (self.copies, []))
1366
1367     def test_only_write_enough_when_some_crash(self):
1368         for i in range(5):
1369             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1370             self.pool.add_task(ks, None)
1371             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1372             self.pool.add_task(ks, None)
1373         self.pool.join()
1374         self.assertEqual(self.pool.done(), (self.copies, []))
1375
1376     def test_fail_when_too_many_crash(self):
1377         for i in range(self.copies+1):
1378             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1379             self.pool.add_task(ks, None)
1380         for i in range(self.copies-1):
1381             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1382             self.pool.add_task(ks, None)
1383         self.pool.join()
1384         self.assertEqual(self.pool.done(), (self.copies-1, []))
1385
1386
1387 @tutil.skip_sleep
1388 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1389 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
1390     block_cache = False
1391
1392     # Test put()s that need two distinct servers to succeed, possibly
1393     # requiring multiple passes through the retry loop.
1394
1395     def setUp(self):
1396         self.api_client = self.mock_keep_services(count=2)
1397         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
1398
1399     def tearDown(self):
1400         DiskCacheBase.tearDown(self)
1401
1402     def test_success_after_exception(self):
1403         with tutil.mock_keep_responses(
1404                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1405                 Exception('mock err'), 200, 200) as req_mock:
1406             self.keep_client.put('foo', num_retries=1, copies=2)
1407         self.assertEqual(3, req_mock.call_count)
1408
1409     def test_success_after_retryable_error(self):
1410         with tutil.mock_keep_responses(
1411                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1412                 500, 200, 200) as req_mock:
1413             self.keep_client.put('foo', num_retries=1, copies=2)
1414         self.assertEqual(3, req_mock.call_count)
1415
1416     def test_fail_after_final_error(self):
1417         # First retry loop gets a 200 (can't achieve replication by
1418         # storing again on that server) and a 400 (can't retry that
1419         # server at all), so we shouldn't try a third request.
1420         with tutil.mock_keep_responses(
1421                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1422                 200, 400, 200) as req_mock:
1423             with self.assertRaises(arvados.errors.KeepWriteError):
1424                 self.keep_client.put('foo', num_retries=1, copies=2)
1425         self.assertEqual(2, req_mock.call_count)
1426
1427 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1428 class KeepClientAPIErrorTest(unittest.TestCase, DiskCacheBase):
1429     disk_cache = False
1430
1431     def tearDown(self):
1432         DiskCacheBase.tearDown(self)
1433
1434     def test_api_fail(self):
1435         class ApiMock(object):
1436             def __getattr__(self, r):
1437                 if r == "api_token":
1438                     return "abc"
1439                 elif r == "insecure":
1440                     return False
1441                 elif r == "config":
1442                     return lambda: {}
1443                 else:
1444                     raise arvados.errors.KeepReadError()
1445         keep_client = arvados.KeepClient(api_client=ApiMock(),
1446                                          proxy='', local_store='',
1447                                          block_cache=self.make_block_cache(self.disk_cache))
1448
1449         # The bug this is testing for is that if an API (not
1450         # keepstore) exception is thrown as part of a get(), the next
1451         # attempt to get that same block will result in a deadlock.
1452         # This is why there are two get()s in a row.  Unfortunately,
1453         # the failure mode for this test is that the test suite
1454         # deadlocks, there isn't a good way to avoid that without
1455         # adding a special case that has no use except for this test.
1456
1457         with self.assertRaises(arvados.errors.KeepReadError):
1458             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1459         with self.assertRaises(arvados.errors.KeepReadError):
1460             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1461
1462
1463 class KeepDiskCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
1464     def setUp(self):
1465         self.api_client = self.mock_keep_services(count=2)
1466         self.data = b'xyzzy'
1467         self.locator = '1271ed5ef305aadabc605b1609e24c52'
1468         self.disk_cache_dir = tempfile.mkdtemp()
1469
1470     def tearDown(self):
1471         shutil.rmtree(self.disk_cache_dir)
1472
1473
1474     @mock.patch('arvados.KeepClient.KeepService.get')
1475     def test_disk_cache_read(self, get_mock):
1476         # confirm it finds an existing cache block when the cache is
1477         # initialized.
1478
1479         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1480         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1481             f.write(self.data)
1482
1483         # block cache should have found the existing block
1484         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1485                                                   disk_cache_dir=self.disk_cache_dir)
1486         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1487
1488         self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1489
1490         get_mock.assert_not_called()
1491
1492
1493     @mock.patch('arvados.KeepClient.KeepService.get')
1494     def test_disk_cache_share(self, get_mock):
1495         # confirm it finds a cache block written after the disk cache
1496         # was initialized.
1497
1498         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1499                                                   disk_cache_dir=self.disk_cache_dir)
1500         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1501
1502         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1503         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1504             f.write(self.data)
1505
1506         # when we try to get the block, it'll check the disk and find it.
1507         self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1508
1509         get_mock.assert_not_called()
1510
1511
1512     def test_disk_cache_write(self):
1513         # confirm the cache block was created
1514
1515         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1516                                                   disk_cache_dir=self.disk_cache_dir)
1517         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1518
1519         with tutil.mock_keep_responses(self.data, 200) as mock:
1520             self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1521
1522         self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1523
1524         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1525             self.assertTrue(tutil.binary_compare(f.read(), self.data))
1526
1527
1528     def test_disk_cache_clean(self):
1529         # confirm that a tmp file in the cache is cleaned up
1530
1531         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1532         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock"), "wb") as f:
1533             f.write(b"abc1")
1534
1535         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC"), "wb") as f:
1536             f.write(b"abc2")
1537
1538         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC"), "wb") as f:
1539             f.write(b"abc3")
1540
1541         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1542         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1543         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1544
1545         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1546                                                   disk_cache_dir=self.disk_cache_dir)
1547
1548         # The tmp still hasn't been deleted because it was created in the last 60 seconds
1549         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1550         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1551         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1552
1553         # Set the mtime to 61s in the past
1554         os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock"), times=(time.time()-61, time.time()-61))
1555         os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC"), times=(time.time()-61, time.time()-61))
1556         os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC"), times=(time.time()-61, time.time()-61))
1557
1558         block_cache2 = arvados.keep.KeepBlockCache(disk_cache=True,
1559                                                    disk_cache_dir=self.disk_cache_dir)
1560
1561         # Tmp should be gone but the other ones are safe.
1562         self.assertFalse(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1563         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1564         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1565
1566
1567     @mock.patch('arvados.KeepClient.KeepService.get')
1568     def test_disk_cache_cap(self, get_mock):
1569         # confirm that the cache is kept to the desired limit
1570
1571         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1572         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1573             f.write(self.data)
1574
1575         os.makedirs(os.path.join(self.disk_cache_dir, "acb"))
1576         with open(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock"), "wb") as f:
1577             f.write(b"foo")
1578
1579         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1580         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1581
1582         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1583                                                    disk_cache_dir=self.disk_cache_dir,
1584                                                    max_slots=1)
1585
1586         self.assertFalse(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1587         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1588
1589
1590     @mock.patch('arvados.KeepClient.KeepService.get')
1591     def test_disk_cache_share(self, get_mock):
1592         # confirm that a second cache doesn't delete files that belong to the first cache.
1593
1594         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1595         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1596             f.write(self.data)
1597
1598         os.makedirs(os.path.join(self.disk_cache_dir, "acb"))
1599         with open(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock"), "wb") as f:
1600             f.write(b"foo")
1601
1602         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1603         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1604
1605         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1606                                                    disk_cache_dir=self.disk_cache_dir,
1607                                                    max_slots=2)
1608
1609         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1610         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1611
1612         block_cache2 = arvados.keep.KeepBlockCache(disk_cache=True,
1613                                                    disk_cache_dir=self.disk_cache_dir,
1614                                                    max_slots=1)
1615
1616         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1617         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1618
1619
1620
1621     def test_disk_cache_error(self):
1622         os.chmod(self.disk_cache_dir, stat.S_IRUSR)
1623
1624         # Fail during cache initialization.
1625         with self.assertRaises(OSError):
1626             block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1627                                                       disk_cache_dir=self.disk_cache_dir)
1628
1629
1630     def test_disk_cache_write_error(self):
1631         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1632                                                   disk_cache_dir=self.disk_cache_dir)
1633
1634         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1635
1636         # Make the cache dir read-only
1637         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1638         os.chmod(os.path.join(self.disk_cache_dir, self.locator[0:3]), stat.S_IRUSR)
1639
1640         # Cache fails
1641         with self.assertRaises(arvados.errors.KeepCacheError):
1642             with tutil.mock_keep_responses(self.data, 200) as mock:
1643                 keep_client.get(self.locator)
1644
1645
1646     def test_disk_cache_retry_write_error(self):
1647         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1648                                                   disk_cache_dir=self.disk_cache_dir)
1649
1650         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1651
1652         called = False
1653         realmmap = mmap.mmap
1654         def sideeffect_mmap(*args, **kwargs):
1655             nonlocal called
1656             if not called:
1657                 called = True
1658                 raise OSError(errno.ENOSPC, "no space")
1659             else:
1660                 return realmmap(*args, **kwargs)
1661
1662         with patch('mmap.mmap') as mockmmap:
1663             mockmmap.side_effect = sideeffect_mmap
1664
1665             cache_max_before = block_cache.cache_max
1666
1667             with tutil.mock_keep_responses(self.data, 200) as mock:
1668                 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1669
1670             self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1671
1672         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1673             self.assertTrue(tutil.binary_compare(f.read(), self.data))
1674
1675         # shrank the cache in response to ENOSPC
1676         self.assertTrue(cache_max_before > block_cache.cache_max)
1677
1678
1679     def test_disk_cache_retry_write_error2(self):
1680         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1681                                                   disk_cache_dir=self.disk_cache_dir)
1682
1683         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1684
1685         called = False
1686         realmmap = mmap.mmap
1687         def sideeffect_mmap(*args, **kwargs):
1688             nonlocal called
1689             if not called:
1690                 called = True
1691                 raise OSError(errno.ENOMEM, "no memory")
1692             else:
1693                 return realmmap(*args, **kwargs)
1694
1695         with patch('mmap.mmap') as mockmmap:
1696             mockmmap.side_effect = sideeffect_mmap
1697
1698             slots_before = block_cache._max_slots
1699
1700             with tutil.mock_keep_responses(self.data, 200) as mock:
1701                 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1702
1703             self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1704
1705         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1706             self.assertTrue(tutil.binary_compare(f.read(), self.data))
1707
1708         # shrank the cache in response to ENOMEM
1709         self.assertTrue(slots_before > block_cache._max_slots)