21388: Update deb instructions in Python READMEs
[arvados.git] / sdk / python / tests / test_keep_client.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import errno
6 import hashlib
7 import mmap
8 import os
9 import random
10 import re
11 import shutil
12 import socket
13 import stat
14 import sys
15 import tempfile
16 import time
17 import unittest
18 import urllib.parse
19
20 from pathlib import Path
21 from unittest import mock
22 from unittest.mock import patch
23
24 import parameterized
25 import pycurl
26
27 import arvados
28 import arvados.retry
29 import arvados.util
30 from . import arvados_testutil as tutil
31 from . import keepstub
32 from . import run_test_server
33
34 from .arvados_testutil import DiskCacheBase
35
36 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
37 class KeepTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
38     disk_cache = False
39     MAIN_SERVER = {}
40     KEEP_SERVER = {}
41     block_cache_test = None
42
43     @classmethod
44     def setUpClass(cls):
45         super(KeepTestCase, cls).setUpClass()
46         run_test_server.authorize_with("admin")
47         cls.api_client = arvados.api('v1')
48         cls.block_cache_test = DiskCacheBase()
49         cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
50                                              proxy='', local_store='',
51                                              block_cache=cls.block_cache_test.make_block_cache(cls.disk_cache))
52
53     @classmethod
54     def tearDownClass(cls):
55         super(KeepTestCase, cls).setUpClass()
56         cls.block_cache_test.tearDown()
57
58     def test_KeepBasicRWTest(self):
59         self.assertEqual(0, self.keep_client.upload_counter.get())
60         foo_locator = self.keep_client.put('foo')
61         self.assertRegex(
62             foo_locator,
63             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
64             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
65
66         # 6 bytes because uploaded 2 copies
67         self.assertEqual(6, self.keep_client.upload_counter.get())
68
69         self.assertEqual(0, self.keep_client.download_counter.get())
70         self.assertTrue(tutil.binary_compare(self.keep_client.get(foo_locator),
71                          b'foo'),
72                          'wrong content from Keep.get(md5("foo"))')
73         self.assertEqual(3, self.keep_client.download_counter.get())
74
75     def test_KeepBinaryRWTest(self):
76         blob_str = b'\xff\xfe\xf7\x00\x01\x02'
77         blob_locator = self.keep_client.put(blob_str)
78         self.assertRegex(
79             blob_locator,
80             '^7fc7c53b45e53926ba52821140fef396\+6',
81             ('wrong locator from Keep.put(<binarydata>):' + blob_locator))
82         self.assertEqual(self.keep_client.get(blob_locator),
83                          blob_str,
84                          'wrong content from Keep.get(md5(<binarydata>))')
85
86     def test_KeepLongBinaryRWTest(self):
87         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
88         for i in range(0, 23):
89             blob_data = blob_data + blob_data
90         blob_locator = self.keep_client.put(blob_data)
91         self.assertRegex(
92             blob_locator,
93             '^84d90fc0d8175dd5dcfab04b999bc956\+67108864',
94             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
95         self.assertEqual(self.keep_client.get(blob_locator),
96                          blob_data,
97                          'wrong content from Keep.get(md5(<binarydata>))')
98
99     @unittest.skip("unreliable test - please fix and close #8752")
100     def test_KeepSingleCopyRWTest(self):
101         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
102         blob_locator = self.keep_client.put(blob_data, copies=1)
103         self.assertRegex(
104             blob_locator,
105             '^c902006bc98a3eb4a3663b65ab4a6fab\+8',
106             ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
107         self.assertEqual(self.keep_client.get(blob_locator),
108                          blob_data,
109                          'wrong content from Keep.get(md5(<binarydata>))')
110
111     def test_KeepEmptyCollectionTest(self):
112         blob_locator = self.keep_client.put('', copies=1)
113         self.assertRegex(
114             blob_locator,
115             '^d41d8cd98f00b204e9800998ecf8427e\+0',
116             ('wrong locator from Keep.put(""): ' + blob_locator))
117
118     def test_unicode_must_be_ascii(self):
119         # If unicode type, must only consist of valid ASCII
120         foo_locator = self.keep_client.put(u'foo')
121         self.assertRegex(
122             foo_locator,
123             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
124             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
125
126         if sys.version_info < (3, 0):
127             with self.assertRaises(UnicodeEncodeError):
128                 # Error if it is not ASCII
129                 self.keep_client.put(u'\xe2')
130
131         with self.assertRaises(AttributeError):
132             # Must be bytes or have an encode() method
133             self.keep_client.put({})
134
135     def test_KeepHeadTest(self):
136         locator = self.keep_client.put('test_head')
137         self.assertRegex(
138             locator,
139             '^b9a772c7049325feb7130fff1f8333e9\+9',
140             'wrong md5 hash from Keep.put for "test_head": ' + locator)
141         self.assertEqual(True, self.keep_client.head(locator))
142         self.assertEqual(self.keep_client.get(locator),
143                          b'test_head',
144                          'wrong content from Keep.get for "test_head"')
145
146 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
147 class KeepPermissionTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
148     disk_cache = False
149     MAIN_SERVER = {}
150     KEEP_SERVER = {'blob_signing': True}
151
152     def tearDown(self):
153         DiskCacheBase.tearDown(self)
154
155     def test_KeepBasicRWTest(self):
156         run_test_server.authorize_with('active')
157         keep_client = arvados.KeepClient(block_cache=self.make_block_cache(self.disk_cache))
158         foo_locator = keep_client.put('foo')
159         self.assertRegex(
160             foo_locator,
161             r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
162             'invalid locator from Keep.put("foo"): ' + foo_locator)
163         self.assertEqual(keep_client.get(foo_locator),
164                          b'foo',
165                          'wrong content from Keep.get(md5("foo"))')
166
167         # GET with an unsigned locator => bad request
168         bar_locator = keep_client.put('bar')
169         unsigned_bar_locator = "37b51d194a7513e45b56f6524f2d51f2+3"
170         self.assertRegex(
171             bar_locator,
172             r'^37b51d194a7513e45b56f6524f2d51f2\+3\+A[a-f0-9]+@[a-f0-9]+$',
173             'invalid locator from Keep.put("bar"): ' + bar_locator)
174         self.assertRaises(arvados.errors.KeepReadError,
175                           keep_client.get,
176                           unsigned_bar_locator)
177
178         # GET from a different user => bad request
179         run_test_server.authorize_with('spectator')
180         self.assertRaises(arvados.errors.KeepReadError,
181                           arvados.Keep.get,
182                           bar_locator)
183
184         # Unauthenticated GET for a signed locator => bad request
185         # Unauthenticated GET for an unsigned locator => bad request
186         keep_client.api_token = ''
187         self.assertRaises(arvados.errors.KeepReadError,
188                           keep_client.get,
189                           bar_locator)
190         self.assertRaises(arvados.errors.KeepReadError,
191                           keep_client.get,
192                           unsigned_bar_locator)
193
194
195 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
196 class KeepProxyTestCase(run_test_server.TestCaseWithServers, DiskCacheBase):
197     disk_cache = False
198     MAIN_SERVER = {}
199     KEEP_SERVER = {}
200     KEEP_PROXY_SERVER = {}
201
202     @classmethod
203     def setUpClass(cls):
204         super(KeepProxyTestCase, cls).setUpClass()
205         run_test_server.authorize_with('active')
206         cls.api_client = arvados.api('v1')
207
208     def tearDown(self):
209         super(KeepProxyTestCase, self).tearDown()
210         DiskCacheBase.tearDown(self)
211
212     def test_KeepProxyTest1(self):
213         # Will use ARVADOS_KEEP_SERVICES environment variable that
214         # is set by setUpClass().
215         keep_client = arvados.KeepClient(api_client=self.api_client,
216                                          local_store='', block_cache=self.make_block_cache(self.disk_cache))
217         baz_locator = keep_client.put('baz')
218         self.assertRegex(
219             baz_locator,
220             '^73feffa4b7f6bb68e44cf984c85f6e88\+3',
221             'wrong md5 hash from Keep.put("baz"): ' + baz_locator)
222         self.assertEqual(keep_client.get(baz_locator),
223                          b'baz',
224                          'wrong content from Keep.get(md5("baz"))')
225         self.assertTrue(keep_client.using_proxy)
226
227     def test_KeepProxyTestMultipleURIs(self):
228         # Test using ARVADOS_KEEP_SERVICES env var overriding any
229         # existing proxy setting and setting multiple proxies
230         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
231         keep_client = arvados.KeepClient(api_client=self.api_client,
232                                          local_store='',
233                                          block_cache=self.make_block_cache(self.disk_cache))
234         uris = [x['_service_root'] for x in keep_client._keep_services]
235         self.assertEqual(uris, ['http://10.0.0.1/',
236                                 'https://foo.example.org:1234/'])
237
238     def test_KeepProxyTestInvalidURI(self):
239         arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org'
240         with self.assertRaises(arvados.errors.ArgumentError):
241             keep_client = arvados.KeepClient(api_client=self.api_client,
242                                              local_store='',
243                                              block_cache=self.make_block_cache(self.disk_cache))
244
245
246 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
247 class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
248     disk_cache = False
249
250     def tearDown(self):
251         DiskCacheBase.tearDown(self)
252
253     def get_service_roots(self, api_client):
254         keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
255         services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
256         return [urllib.parse.urlparse(url) for url in sorted(services)]
257
258     def test_ssl_flag_respected_in_roots(self):
259         for ssl_flag in [False, True]:
260             services = self.get_service_roots(self.mock_keep_services(
261                 service_ssl_flag=ssl_flag))
262             self.assertEqual(
263                 ('https' if ssl_flag else 'http'), services[0].scheme)
264
265     def test_correct_ports_with_ipv6_addresses(self):
266         service = self.get_service_roots(self.mock_keep_services(
267             service_type='proxy', service_host='100::1', service_port=10, count=1))[0]
268         self.assertEqual('100::1', service.hostname)
269         self.assertEqual(10, service.port)
270
271     def test_recognize_proxy_services_in_controller_response(self):
272         keep_client = arvados.KeepClient(api_client=self.mock_keep_services(
273             service_type='proxy', service_host='localhost', service_port=9, count=1),
274                                          block_cache=self.make_block_cache(self.disk_cache))
275         try:
276             # this will fail, but it ensures we get the service
277             # discovery response
278             keep_client.put('baz2', num_retries=0)
279         except:
280             pass
281         self.assertTrue(keep_client.using_proxy)
282
283     def test_insecure_disables_tls_verify(self):
284         api_client = self.mock_keep_services(count=1)
285         force_timeout = socket.timeout("timed out")
286
287         api_client.insecure = True
288         with tutil.mock_keep_responses(b'foo', 200) as mock:
289             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
290             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
291             self.assertEqual(
292                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
293                 0)
294             self.assertEqual(
295                 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
296                 0)
297
298         api_client.insecure = False
299         with tutil.mock_keep_responses(b'foo', 200) as mock:
300             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
301             keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3')
302             # getopt()==None here means we didn't change the
303             # default. If we were using real pycurl instead of a mock,
304             # it would return the default value 1.
305             self.assertEqual(
306                 mock.responses[0].getopt(pycurl.SSL_VERIFYPEER),
307                 None)
308             self.assertEqual(
309                 mock.responses[0].getopt(pycurl.SSL_VERIFYHOST),
310                 None)
311
312     def test_refresh_signature(self):
313         blk_digest = '6f5902ac237024bdd0c176cb93063dc4+11'
314         blk_sig = 'da39a3ee5e6b4b0d3255bfef95601890afd80709@53bed294'
315         local_loc = blk_digest+'+A'+blk_sig
316         remote_loc = blk_digest+'+R'+blk_sig
317         api_client = self.mock_keep_services(count=1)
318         headers = {'X-Keep-Locator':local_loc}
319         with tutil.mock_keep_responses('', 200, **headers):
320             # Check that the translated locator gets returned
321             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
322             self.assertEqual(local_loc, keep_client.refresh_signature(remote_loc))
323             # Check that refresh_signature() uses the correct method and headers
324             keep_client._get_or_head = mock.MagicMock()
325             keep_client.refresh_signature(remote_loc)
326             args, kwargs = keep_client._get_or_head.call_args_list[0]
327             self.assertIn(remote_loc, args)
328             self.assertEqual("HEAD", kwargs['method'])
329             self.assertIn('X-Keep-Signature', kwargs['headers'])
330
331     # test_*_timeout verify that KeepClient instructs pycurl to use
332     # the appropriate connection and read timeouts. They don't care
333     # whether pycurl actually exhibits the expected timeout behavior
334     # -- those tests are in the KeepClientTimeout test class.
335
336     def test_get_timeout(self):
337         api_client = self.mock_keep_services(count=1)
338         force_timeout = socket.timeout("timed out")
339         with tutil.mock_keep_responses(force_timeout, 0) as mock:
340             keep_client = arvados.KeepClient(
341                 api_client=api_client,
342                 block_cache=self.make_block_cache(self.disk_cache),
343                 num_retries=0,
344             )
345             with self.assertRaises(arvados.errors.KeepReadError):
346                 keep_client.get('ffffffffffffffffffffffffffffffff')
347             self.assertEqual(
348                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
349                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
350             self.assertEqual(
351                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
352                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
353             self.assertEqual(
354                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
355                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
356
357     def test_put_timeout(self):
358         api_client = self.mock_keep_services(count=1)
359         force_timeout = socket.timeout("timed out")
360         with tutil.mock_keep_responses(force_timeout, 0) as mock:
361             keep_client = arvados.KeepClient(
362                 api_client=api_client,
363                 block_cache=self.make_block_cache(self.disk_cache),
364                 num_retries=0,
365             )
366             with self.assertRaises(arvados.errors.KeepWriteError):
367                 keep_client.put(b'foo')
368             self.assertEqual(
369                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
370                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
371             self.assertEqual(
372                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
373                 int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
374             self.assertEqual(
375                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
376                 int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
377
378     def test_head_timeout(self):
379         api_client = self.mock_keep_services(count=1)
380         force_timeout = socket.timeout("timed out")
381         with tutil.mock_keep_responses(force_timeout, 0) as mock:
382             keep_client = arvados.KeepClient(
383                 api_client=api_client,
384                 block_cache=self.make_block_cache(self.disk_cache),
385                 num_retries=0,
386             )
387             with self.assertRaises(arvados.errors.KeepReadError):
388                 keep_client.head('ffffffffffffffffffffffffffffffff')
389             self.assertEqual(
390                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
391                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
392             self.assertEqual(
393                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
394                 None)
395             self.assertEqual(
396                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
397                 None)
398
399     def test_proxy_get_timeout(self):
400         api_client = self.mock_keep_services(service_type='proxy', count=1)
401         force_timeout = socket.timeout("timed out")
402         with tutil.mock_keep_responses(force_timeout, 0) as mock:
403             keep_client = arvados.KeepClient(
404                 api_client=api_client,
405                 block_cache=self.make_block_cache(self.disk_cache),
406                 num_retries=0,
407             )
408             with self.assertRaises(arvados.errors.KeepReadError):
409                 keep_client.get('ffffffffffffffffffffffffffffffff')
410             self.assertEqual(
411                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
412                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
413             self.assertEqual(
414                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
415                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
416             self.assertEqual(
417                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
418                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
419
420     def test_proxy_head_timeout(self):
421         api_client = self.mock_keep_services(service_type='proxy', count=1)
422         force_timeout = socket.timeout("timed out")
423         with tutil.mock_keep_responses(force_timeout, 0) as mock:
424             keep_client = arvados.KeepClient(
425                 api_client=api_client,
426                 block_cache=self.make_block_cache(self.disk_cache),
427                 num_retries=0,
428             )
429             with self.assertRaises(arvados.errors.KeepReadError):
430                 keep_client.head('ffffffffffffffffffffffffffffffff')
431             self.assertEqual(
432                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
433                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
434             self.assertEqual(
435                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
436                 None)
437             self.assertEqual(
438                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
439                 None)
440
441     def test_proxy_put_timeout(self):
442         self.disk_cache_dir = None
443         api_client = self.mock_keep_services(service_type='proxy', count=1)
444         force_timeout = socket.timeout("timed out")
445         with tutil.mock_keep_responses(force_timeout, 0) as mock:
446             keep_client = arvados.KeepClient(
447                 api_client=api_client,
448                 num_retries=0,
449             )
450             with self.assertRaises(arvados.errors.KeepWriteError):
451                 keep_client.put('foo')
452             self.assertEqual(
453                 mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
454                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
455             self.assertEqual(
456                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
457                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
458             self.assertEqual(
459                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
460                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
461
462     def check_no_services_error(self, verb, exc_class):
463         api_client = mock.MagicMock(name='api_client')
464         api_client.keep_services().accessible().execute.side_effect = (
465             arvados.errors.ApiError)
466         keep_client = arvados.KeepClient(
467             api_client=api_client,
468             block_cache=self.make_block_cache(self.disk_cache),
469             num_retries=0,
470         )
471         with self.assertRaises(exc_class) as err_check:
472             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
473         self.assertEqual(0, len(err_check.exception.request_errors()))
474
475     def test_get_error_with_no_services(self):
476         self.check_no_services_error('get', arvados.errors.KeepReadError)
477
478     def test_head_error_with_no_services(self):
479         self.check_no_services_error('head', arvados.errors.KeepReadError)
480
481     def test_put_error_with_no_services(self):
482         self.check_no_services_error('put', arvados.errors.KeepWriteError)
483
484     def check_errors_from_last_retry(self, verb, exc_class):
485         api_client = self.mock_keep_services(count=2)
486         req_mock = tutil.mock_keep_responses(
487             "retry error reporting test", 500, 500, 500, 500, 500, 500, 502, 502)
488         with req_mock, tutil.skip_sleep, \
489                 self.assertRaises(exc_class) as err_check:
490             keep_client = arvados.KeepClient(
491                 api_client=api_client,
492                 block_cache=self.make_block_cache(self.disk_cache),
493                 num_retries=0,
494             )
495             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
496                                        num_retries=3)
497         self.assertEqual([502, 502], [
498                 getattr(error, 'status_code', None)
499                 for error in err_check.exception.request_errors().values()])
500         self.assertRegex(str(err_check.exception), r'failed to (read|write) .* after 4 attempts')
501
502     def test_get_error_reflects_last_retry(self):
503         self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
504
505     def test_head_error_reflects_last_retry(self):
506         self.check_errors_from_last_retry('head', arvados.errors.KeepReadError)
507
508     def test_put_error_reflects_last_retry(self):
509         self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
510
511     def test_put_error_does_not_include_successful_puts(self):
512         data = 'partial failure test'
513         data_loc = tutil.str_keep_locator(data)
514         api_client = self.mock_keep_services(count=3)
515         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
516                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
517             keep_client = arvados.KeepClient(
518                 api_client=api_client,
519                 block_cache=self.make_block_cache(self.disk_cache),
520                 num_retries=0,
521             )
522             keep_client.put(data)
523         self.assertEqual(2, len(exc_check.exception.request_errors()))
524
525     def test_proxy_put_with_no_writable_services(self):
526         data = 'test with no writable services'
527         data_loc = tutil.str_keep_locator(data)
528         api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
529         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
530                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
531             keep_client = arvados.KeepClient(
532                 api_client=api_client,
533                 block_cache=self.make_block_cache(self.disk_cache),
534                 num_retries=0,
535             )
536             keep_client.put(data)
537         self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
538         self.assertEqual(0, len(exc_check.exception.request_errors()))
539
540     def test_oddball_service_get(self):
541         body = b'oddball service get'
542         api_client = self.mock_keep_services(service_type='fancynewblobstore')
543         with tutil.mock_keep_responses(body, 200):
544             keep_client = arvados.KeepClient(
545                 api_client=api_client,
546                 block_cache=self.make_block_cache(self.disk_cache),
547                 num_retries=0,
548             )
549             actual = keep_client.get(tutil.str_keep_locator(body))
550         self.assertEqual(body, actual)
551
552     def test_oddball_service_put(self):
553         body = b'oddball service put'
554         pdh = tutil.str_keep_locator(body)
555         api_client = self.mock_keep_services(service_type='fancynewblobstore')
556         with tutil.mock_keep_responses(pdh, 200):
557             keep_client = arvados.KeepClient(
558                 api_client=api_client,
559                 block_cache=self.make_block_cache(self.disk_cache),
560                 num_retries=0,
561             )
562             actual = keep_client.put(body, copies=1)
563         self.assertEqual(pdh, actual)
564
565     def test_oddball_service_writer_count(self):
566         body = b'oddball service writer count'
567         pdh = tutil.str_keep_locator(body)
568         api_client = self.mock_keep_services(service_type='fancynewblobstore',
569                                              count=4)
570         headers = {'x-keep-replicas-stored': 3}
571         with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
572                                        **headers) as req_mock:
573             keep_client = arvados.KeepClient(
574                 api_client=api_client,
575                 block_cache=self.make_block_cache(self.disk_cache),
576                 num_retries=0,
577             )
578             actual = keep_client.put(body, copies=2)
579         self.assertEqual(pdh, actual)
580         self.assertEqual(1, req_mock.call_count)
581
582
583 @tutil.skip_sleep
584 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
585 class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
586     disk_cache = False
587
588     def setUp(self):
589         self.api_client = self.mock_keep_services(count=2)
590         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
591         self.data = b'xyzzy'
592         self.locator = '1271ed5ef305aadabc605b1609e24c52'
593
594     def tearDown(self):
595         DiskCacheBase.tearDown(self)
596
597     @mock.patch('arvados.KeepClient.KeepService.get')
598     def test_get_request_cache(self, get_mock):
599         with tutil.mock_keep_responses(self.data, 200, 200):
600             self.keep_client.get(self.locator)
601             self.keep_client.get(self.locator)
602         # Request already cached, don't require more than one request
603         get_mock.assert_called_once()
604
605     @mock.patch('arvados.KeepClient.KeepService.get')
606     def test_head_request_cache(self, get_mock):
607         with tutil.mock_keep_responses(self.data, 200, 200):
608             self.keep_client.head(self.locator)
609             self.keep_client.head(self.locator)
610         # Don't cache HEAD requests so that they're not confused with GET reqs
611         self.assertEqual(2, get_mock.call_count)
612
613     @mock.patch('arvados.KeepClient.KeepService.get')
614     def test_head_and_then_get_return_different_responses(self, get_mock):
615         head_resp = None
616         get_resp = None
617         get_mock.side_effect = [b'first response', b'second response']
618         with tutil.mock_keep_responses(self.data, 200, 200):
619             head_resp = self.keep_client.head(self.locator)
620             get_resp = self.keep_client.get(self.locator)
621         self.assertEqual(b'first response', head_resp)
622         # First reponse was not cached because it was from a HEAD request.
623         self.assertNotEqual(head_resp, get_resp)
624
625
626 @tutil.skip_sleep
627 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
628 class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
629     disk_cache = False
630
631     def setUp(self):
632         self.api_client = self.mock_keep_services(count=2)
633         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
634         self.data = b'xyzzy'
635         self.locator = '1271ed5ef305aadabc605b1609e24c52'
636         self.test_id = arvados.util.new_request_id()
637         self.assertRegex(self.test_id, r'^req-[a-z0-9]{20}$')
638         # If we don't set request_id to None explicitly here, it will
639         # return <MagicMock name='api_client_mock.request_id'
640         # id='123456789'>:
641         self.api_client.request_id = None
642
643     def tearDown(self):
644         DiskCacheBase.tearDown(self)
645
646     def test_default_to_api_client_request_id(self):
647         self.api_client.request_id = self.test_id
648         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
649             self.keep_client.put(self.data)
650         self.assertEqual(2, len(mock.responses))
651         for resp in mock.responses:
652             self.assertProvidedRequestId(resp)
653
654         with tutil.mock_keep_responses(self.data, 200) as mock:
655             self.keep_client.get(self.locator)
656         self.assertProvidedRequestId(mock.responses[0])
657
658         with tutil.mock_keep_responses(b'', 200) as mock:
659             self.keep_client.head(self.locator)
660         self.assertProvidedRequestId(mock.responses[0])
661
662     def test_explicit_request_id(self):
663         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
664             self.keep_client.put(self.data, request_id=self.test_id)
665         self.assertEqual(2, len(mock.responses))
666         for resp in mock.responses:
667             self.assertProvidedRequestId(resp)
668
669         with tutil.mock_keep_responses(self.data, 200) as mock:
670             self.keep_client.get(self.locator, request_id=self.test_id)
671         self.assertProvidedRequestId(mock.responses[0])
672
673         with tutil.mock_keep_responses(b'', 200) as mock:
674             self.keep_client.head(self.locator, request_id=self.test_id)
675         self.assertProvidedRequestId(mock.responses[0])
676
677     def test_automatic_request_id(self):
678         with tutil.mock_keep_responses(self.locator, 200, 200) as mock:
679             self.keep_client.put(self.data)
680         self.assertEqual(2, len(mock.responses))
681         for resp in mock.responses:
682             self.assertAutomaticRequestId(resp)
683
684         with tutil.mock_keep_responses(self.data, 200) as mock:
685             self.keep_client.get(self.locator)
686         self.assertAutomaticRequestId(mock.responses[0])
687
688         with tutil.mock_keep_responses(b'', 200) as mock:
689             self.keep_client.head(self.locator)
690         self.assertAutomaticRequestId(mock.responses[0])
691
692     def test_request_id_in_exception(self):
693         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
694             with self.assertRaisesRegex(arvados.errors.KeepReadError, self.test_id):
695                 self.keep_client.head(self.locator, request_id=self.test_id)
696
697         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
698             with self.assertRaisesRegex(arvados.errors.KeepReadError, r'req-[a-z0-9]{20}'):
699                 self.keep_client.get(self.locator)
700
701         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
702             with self.assertRaisesRegex(arvados.errors.KeepWriteError, self.test_id):
703                 self.keep_client.put(self.data, request_id=self.test_id)
704
705         with tutil.mock_keep_responses(b'', 400, 400, 400) as mock:
706             with self.assertRaisesRegex(arvados.errors.KeepWriteError, r'req-[a-z0-9]{20}'):
707                 self.keep_client.put(self.data)
708
709     def assertAutomaticRequestId(self, resp):
710         hdr = [x for x in resp.getopt(pycurl.HTTPHEADER)
711                if x.startswith('X-Request-Id: ')][0]
712         self.assertNotEqual(hdr, 'X-Request-Id: '+self.test_id)
713         self.assertRegex(hdr, r'^X-Request-Id: req-[a-z0-9]{20}$')
714
715     def assertProvidedRequestId(self, resp):
716         self.assertIn('X-Request-Id: '+self.test_id,
717                       resp.getopt(pycurl.HTTPHEADER))
718
719
720 @tutil.skip_sleep
721 #@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
722 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
723     disk_cache = False
724
725     def setUp(self):
726         # expected_order[i] is the probe order for
727         # hash=md5(sprintf("%064x",i)) where there are 16 services
728         # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
729         # the first probe for the block consisting of 64 "0"
730         # characters is the service whose uuid is
731         # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
732         self.services = 16
733         self.expected_order = [
734             list('3eab2d5fc9681074'),
735             list('097dba52e648f1c3'),
736             list('c5b4e023f8a7d691'),
737             list('9d81c02e76a3bf54'),
738             ]
739         self.blocks = [
740             "{:064x}".format(x).encode()
741             for x in range(len(self.expected_order))]
742         self.hashes = [
743             hashlib.md5(self.blocks[x]).hexdigest()
744             for x in range(len(self.expected_order))]
745         self.api_client = self.mock_keep_services(count=self.services)
746         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
747
748     def tearDown(self):
749         DiskCacheBase.tearDown(self)
750
751     def test_weighted_service_roots_against_reference_set(self):
752         # Confirm weighted_service_roots() returns the correct order
753         for i, hash in enumerate(self.hashes):
754             roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash))
755             got_order = [
756                 re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
757                 for root in roots]
758             self.assertEqual(self.expected_order[i], got_order)
759
760     def test_get_probe_order_against_reference_set(self):
761         self._test_probe_order_against_reference_set(
762             lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
763
764     def test_head_probe_order_against_reference_set(self):
765         self._test_probe_order_against_reference_set(
766             lambda i: self.keep_client.head(self.hashes[i], num_retries=1))
767
768     def test_put_probe_order_against_reference_set(self):
769         # copies=1 prevents the test from being sensitive to races
770         # between writer threads.
771         self._test_probe_order_against_reference_set(
772             lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1))
773
774     def _test_probe_order_against_reference_set(self, op):
775         for i in range(len(self.blocks)):
776             with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \
777                  self.assertRaises(arvados.errors.KeepRequestError):
778                 op(i)
779             got_order = [
780                 re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
781                 for resp in mock.responses]
782             self.assertEqual(self.expected_order[i]*2, got_order)
783
784     def test_put_probe_order_multiple_copies(self):
785         for copies in range(2, 4):
786             for i in range(len(self.blocks)):
787                 with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \
788                      self.assertRaises(arvados.errors.KeepWriteError):
789                     self.keep_client.put(self.blocks[i], num_retries=2, copies=copies)
790                 got_order = [
791                     re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1)
792                     for resp in mock.responses]
793                 # With T threads racing to make requests, the position
794                 # of a given server in the sequence of HTTP requests
795                 # (got_order) cannot be more than T-1 positions
796                 # earlier than that server's position in the reference
797                 # probe sequence (expected_order).
798                 #
799                 # Loop invariant: we have accounted for +pos+ expected
800                 # probes, either by seeing them in +got_order+ or by
801                 # putting them in +pending+ in the hope of seeing them
802                 # later. As long as +len(pending)<T+, we haven't
803                 # started a request too early.
804                 pending = []
805                 for pos, expected in enumerate(self.expected_order[i]*3):
806                     got = got_order[pos-len(pending)]
807                     while got in pending:
808                         del pending[pending.index(got)]
809                         got = got_order[pos-len(pending)]
810                     if got != expected:
811                         pending.append(expected)
812                         self.assertLess(
813                             len(pending), copies,
814                             "pending={}, with copies={}, got {}, expected {}".format(
815                                 pending, copies, repr(got_order), repr(self.expected_order[i]*3)))
816
817     def test_probe_waste_adding_one_server(self):
818         hashes = [
819             hashlib.md5("{:064x}".format(x).encode()).hexdigest() for x in range(100)]
820         initial_services = 12
821         self.api_client = self.mock_keep_services(count=initial_services)
822         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
823         probes_before = [
824             self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
825         for added_services in range(1, 12):
826             api_client = self.mock_keep_services(count=initial_services+added_services)
827             keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
828             total_penalty = 0
829             for hash_index in range(len(hashes)):
830                 probe_after = keep_client.weighted_service_roots(
831                     arvados.KeepLocator(hashes[hash_index]))
832                 penalty = probe_after.index(probes_before[hash_index][0])
833                 self.assertLessEqual(penalty, added_services)
834                 total_penalty += penalty
835             # Average penalty per block should not exceed
836             # N(added)/N(orig) by more than 20%, and should get closer
837             # to the ideal as we add data points.
838             expect_penalty = (
839                 added_services *
840                 len(hashes) / initial_services)
841             max_penalty = (
842                 expect_penalty *
843                 (120 - added_services)/100)
844             min_penalty = (
845                 expect_penalty * 8/10)
846             self.assertTrue(
847                 min_penalty <= total_penalty <= max_penalty,
848                 "With {}+{} services, {} blocks, penalty {} but expected {}..{}".format(
849                     initial_services,
850                     added_services,
851                     len(hashes),
852                     total_penalty,
853                     min_penalty,
854                     max_penalty))
855
856     def check_64_zeros_error_order(self, verb, exc_class):
857         data = b'0' * 64
858         if verb == 'get':
859             data = tutil.str_keep_locator(data)
860         # Arbitrary port number:
861         aport = random.randint(1024,65535)
862         api_client = self.mock_keep_services(service_port=aport, count=self.services)
863         keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache))
864         with mock.patch('pycurl.Curl') as curl_mock, \
865              self.assertRaises(exc_class) as err_check:
866             curl_mock.return_value = tutil.FakeCurl.make(code=500, body=b'')
867             getattr(keep_client, verb)(data)
868         urls = [urllib.parse.urlparse(url)
869                 for url in err_check.exception.request_errors()]
870         self.assertEqual([('keep0x' + c, aport) for c in '3eab2d5fc9681074'],
871                          [(url.hostname, url.port) for url in urls])
872
873     def test_get_error_shows_probe_order(self):
874         self.check_64_zeros_error_order('get', arvados.errors.KeepReadError)
875
876     def test_put_error_shows_probe_order(self):
877         self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
878
879
880 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
881 class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase, DiskCacheBase):
882     disk_cache = False
883
884     # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
885     # 1s worth of data and then trigger bandwidth errors before running
886     # out of data.
887     DATA = b'x'*2**11
888     BANDWIDTH_LOW_LIM = 1024
889     TIMEOUT_TIME = 1.0
890
891     def tearDown(self):
892         DiskCacheBase.tearDown(self)
893
894     class assertTakesBetween(unittest.TestCase):
895         def __init__(self, tmin, tmax):
896             self.tmin = tmin
897             self.tmax = tmax
898
899         def __enter__(self):
900             self.t0 = time.time()
901
902         def __exit__(self, *args, **kwargs):
903             # Round times to milliseconds, like CURL. Otherwise, we
904             # fail when CURL reaches a 1s timeout at 0.9998s.
905             delta = round(time.time() - self.t0, 3)
906             self.assertGreaterEqual(delta, self.tmin)
907             self.assertLessEqual(delta, self.tmax)
908
909     class assertTakesGreater(unittest.TestCase):
910         def __init__(self, tmin):
911             self.tmin = tmin
912
913         def __enter__(self):
914             self.t0 = time.time()
915
916         def __exit__(self, *args, **kwargs):
917             delta = round(time.time() - self.t0, 3)
918             self.assertGreaterEqual(delta, self.tmin)
919
920     def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
921         return arvados.KeepClient(
922             api_client=self.api_client,
923             timeout=timeouts, block_cache=self.make_block_cache(self.disk_cache))
924
925     def test_timeout_slow_connect(self):
926         # Can't simulate TCP delays with our own socket. Leave our
927         # stub server running uselessly, and try to connect to an
928         # unroutable IP address instead.
929         self.api_client = self.mock_keep_services(
930             count=1,
931             service_host='240.0.0.0',
932         )
933         with self.assertTakesBetween(0.1, 0.5):
934             with self.assertRaises(arvados.errors.KeepWriteError):
935                 self.keepClient().put(self.DATA, copies=1, num_retries=0)
936
937     def test_low_bandwidth_no_delays_success(self):
938         self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
939         kc = self.keepClient()
940         loc = kc.put(self.DATA, copies=1, num_retries=0)
941         self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
942
943     def test_too_low_bandwidth_no_delays_failure(self):
944         # Check that lessening bandwidth corresponds to failing
945         kc = self.keepClient()
946         loc = kc.put(self.DATA, copies=1, num_retries=0)
947         self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
948         with self.assertTakesGreater(self.TIMEOUT_TIME):
949             with self.assertRaises(arvados.errors.KeepReadError):
950                 kc.get(loc, num_retries=0)
951         with self.assertTakesGreater(self.TIMEOUT_TIME):
952             with self.assertRaises(arvados.errors.KeepWriteError):
953                 kc.put(self.DATA, copies=1, num_retries=0)
954
955     def test_low_bandwidth_with_server_response_delay_failure(self):
956         kc = self.keepClient()
957         loc = kc.put(self.DATA, copies=1, num_retries=0)
958         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
959         # Note the actual delay must be 1s longer than the low speed
960         # limit interval in order for curl to detect it reliably.
961         self.server.setdelays(response=self.TIMEOUT_TIME+1)
962         with self.assertTakesGreater(self.TIMEOUT_TIME):
963             with self.assertRaises(arvados.errors.KeepReadError):
964                 kc.get(loc, num_retries=0)
965         with self.assertTakesGreater(self.TIMEOUT_TIME):
966             with self.assertRaises(arvados.errors.KeepWriteError):
967                 kc.put(self.DATA, copies=1, num_retries=0)
968         with self.assertTakesGreater(self.TIMEOUT_TIME):
969             kc.head(loc, num_retries=0)
970
971     def test_low_bandwidth_with_server_mid_delay_failure(self):
972         kc = self.keepClient()
973         loc = kc.put(self.DATA, copies=1, num_retries=0)
974         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
975         # Note the actual delay must be 1s longer than the low speed
976         # limit interval in order for curl to detect it reliably.
977         self.server.setdelays(mid_write=self.TIMEOUT_TIME+1, mid_read=self.TIMEOUT_TIME+1)
978         with self.assertTakesGreater(self.TIMEOUT_TIME):
979             with self.assertRaises(arvados.errors.KeepReadError) as e:
980                 kc.get(loc, num_retries=0)
981         with self.assertTakesGreater(self.TIMEOUT_TIME):
982             with self.assertRaises(arvados.errors.KeepWriteError):
983                 kc.put(self.DATA, copies=1, num_retries=0)
984
985     def test_timeout_slow_request(self):
986         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
987         self.server.setdelays(request=.2)
988         self._test_connect_timeout_under_200ms(loc)
989         self.server.setdelays(request=2)
990         self._test_response_timeout_under_2s(loc)
991
992     def test_timeout_slow_response(self):
993         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
994         self.server.setdelays(response=.2)
995         self._test_connect_timeout_under_200ms(loc)
996         self.server.setdelays(response=2)
997         self._test_response_timeout_under_2s(loc)
998
999     def test_timeout_slow_response_body(self):
1000         loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
1001         self.server.setdelays(response_body=.2)
1002         self._test_connect_timeout_under_200ms(loc)
1003         self.server.setdelays(response_body=2)
1004         self._test_response_timeout_under_2s(loc)
1005
1006     def _test_connect_timeout_under_200ms(self, loc):
1007         # Allow 100ms to connect, then 1s for response. Everything
1008         # should work, and everything should take at least 200ms to
1009         # return.
1010         kc = self.keepClient(timeouts=(.1, 1))
1011         with self.assertTakesBetween(.2, .3):
1012             kc.put(self.DATA, copies=1, num_retries=0)
1013         with self.assertTakesBetween(.2, .3):
1014             self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
1015
1016     def _test_response_timeout_under_2s(self, loc):
1017         # Allow 10s to connect, then 1s for response. Nothing should
1018         # work, and everything should take at least 1s to return.
1019         kc = self.keepClient(timeouts=(10, 1))
1020         with self.assertTakesBetween(1, 9):
1021             with self.assertRaises(arvados.errors.KeepReadError):
1022                 kc.get(loc, num_retries=0)
1023         with self.assertTakesBetween(1, 9):
1024             with self.assertRaises(arvados.errors.KeepWriteError):
1025                 kc.put(self.DATA, copies=1, num_retries=0)
1026
1027
1028 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1029 class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
1030     disk_cache = False
1031
1032     def tearDown(self):
1033         DiskCacheBase.tearDown(self)
1034
1035     def mock_disks_and_gateways(self, disks=3, gateways=1):
1036         self.gateways = [{
1037                 'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
1038                 'owner_uuid': 'zzzzz-tpzed-000000000000000',
1039                 'service_host': 'gatewayhost{}'.format(i),
1040                 'service_port': 12345,
1041                 'service_ssl_flag': True,
1042                 'service_type': 'gateway:test',
1043         } for i in range(gateways)]
1044         self.gateway_roots = [
1045             "https://{service_host}:{service_port}/".format(**gw)
1046             for gw in self.gateways]
1047         self.api_client = self.mock_keep_services(
1048             count=disks, additional_services=self.gateways)
1049         self.keepClient = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
1050
1051     @mock.patch('pycurl.Curl')
1052     def test_get_with_gateway_hint_first(self, MockCurl):
1053         MockCurl.return_value = tutil.FakeCurl.make(
1054             code=200, body='foo', headers={'Content-Length': 3})
1055         self.mock_disks_and_gateways()
1056         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@' + self.gateways[0]['uuid']
1057         self.assertEqual(b'foo', self.keepClient.get(locator))
1058         self.assertEqual(self.gateway_roots[0]+locator,
1059                          MockCurl.return_value.getopt(pycurl.URL).decode())
1060         self.assertEqual(True, self.keepClient.head(locator))
1061
1062     @mock.patch('pycurl.Curl')
1063     def test_get_with_gateway_hints_in_order(self, MockCurl):
1064         gateways = 4
1065         disks = 3
1066         mocks = [
1067             tutil.FakeCurl.make(code=404, body='')
1068             for _ in range(gateways+disks)
1069         ]
1070         MockCurl.side_effect = tutil.queue_with(mocks)
1071         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1072         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1073                            ['K@'+gw['uuid'] for gw in self.gateways])
1074         with self.assertRaises(arvados.errors.NotFoundError):
1075             self.keepClient.get(locator)
1076         # Gateways are tried first, in the order given.
1077         for i, root in enumerate(self.gateway_roots):
1078             self.assertEqual(root+locator,
1079                              mocks[i].getopt(pycurl.URL).decode())
1080         # Disk services are tried next.
1081         for i in range(gateways, gateways+disks):
1082             self.assertRegex(
1083                 mocks[i].getopt(pycurl.URL).decode(),
1084                 r'keep0x')
1085
1086     @mock.patch('pycurl.Curl')
1087     def test_head_with_gateway_hints_in_order(self, MockCurl):
1088         gateways = 4
1089         disks = 3
1090         mocks = [
1091             tutil.FakeCurl.make(code=404, body=b'')
1092             for _ in range(gateways+disks)
1093         ]
1094         MockCurl.side_effect = tutil.queue_with(mocks)
1095         self.mock_disks_and_gateways(gateways=gateways, disks=disks)
1096         locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
1097                            ['K@'+gw['uuid'] for gw in self.gateways])
1098         with self.assertRaises(arvados.errors.NotFoundError):
1099             self.keepClient.head(locator)
1100         # Gateways are tried first, in the order given.
1101         for i, root in enumerate(self.gateway_roots):
1102             self.assertEqual(root+locator,
1103                              mocks[i].getopt(pycurl.URL).decode())
1104         # Disk services are tried next.
1105         for i in range(gateways, gateways+disks):
1106             self.assertRegex(
1107                 mocks[i].getopt(pycurl.URL).decode(),
1108                 r'keep0x')
1109
1110     @mock.patch('pycurl.Curl')
1111     def test_get_with_remote_proxy_hint(self, MockCurl):
1112         MockCurl.return_value = tutil.FakeCurl.make(
1113             code=200, body=b'foo', headers={'Content-Length': 3})
1114         self.mock_disks_and_gateways()
1115         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1116         self.assertEqual(b'foo', self.keepClient.get(locator))
1117         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1118                          MockCurl.return_value.getopt(pycurl.URL).decode())
1119
1120     @mock.patch('pycurl.Curl')
1121     def test_head_with_remote_proxy_hint(self, MockCurl):
1122         MockCurl.return_value = tutil.FakeCurl.make(
1123             code=200, body=b'foo', headers={'Content-Length': 3})
1124         self.mock_disks_and_gateways()
1125         locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
1126         self.assertEqual(True, self.keepClient.head(locator))
1127         self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
1128                          MockCurl.return_value.getopt(pycurl.URL).decode())
1129
1130
1131 class KeepClientRetryTestMixin(object):
1132     disk_cache = False
1133
1134     # Testing with a local Keep store won't exercise the retry behavior.
1135     # Instead, our strategy is:
1136     # * Create a client with one proxy specified (pointed at a black
1137     #   hole), so there's no need to instantiate an API client, and
1138     #   all HTTP requests come from one place.
1139     # * Mock httplib's request method to provide simulated responses.
1140     # This lets us test the retry logic extensively without relying on any
1141     # supporting servers, and prevents side effects in case something hiccups.
1142     # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
1143     # run_method().
1144     #
1145     # Test classes must define TEST_PATCHER to a method that mocks
1146     # out appropriate methods in the client.
1147
1148     PROXY_ADDR = 'http://[%s]:65535/' % (tutil.TEST_HOST,)
1149     TEST_DATA = b'testdata'
1150     TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
1151
1152     def setUp(self):
1153         self.client_kwargs = {'proxy': self.PROXY_ADDR, 'local_store': ''}
1154
1155     def new_client(self, **caller_kwargs):
1156         kwargs = self.client_kwargs.copy()
1157         kwargs.update(caller_kwargs)
1158         kwargs['block_cache'] = self.make_block_cache(self.disk_cache)
1159         return arvados.KeepClient(**kwargs)
1160
1161     def run_method(self, *args, **kwargs):
1162         raise NotImplementedError("test subclasses must define run_method")
1163
1164     def check_success(self, expected=None, *args, **kwargs):
1165         if expected is None:
1166             expected = self.DEFAULT_EXPECT
1167         self.assertEqual(expected, self.run_method(*args, **kwargs))
1168
1169     def check_exception(self, error_class=None, *args, **kwargs):
1170         if error_class is None:
1171             error_class = self.DEFAULT_EXCEPTION
1172         with self.assertRaises(error_class) as err:
1173             self.run_method(*args, **kwargs)
1174         return err
1175
1176     def test_immediate_success(self):
1177         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 200):
1178             self.check_success()
1179
1180     def test_retry_then_success(self):
1181         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1182             self.check_success(num_retries=3)
1183
1184     def test_exception_then_success(self):
1185         with self.TEST_PATCHER(self.DEFAULT_EXPECT, Exception('mock err'), 200):
1186             self.check_success(num_retries=3)
1187
1188     def test_no_retry_after_permanent_error(self):
1189         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 403, 200):
1190             self.check_exception(num_retries=3)
1191
1192     def test_error_after_retries_exhausted(self):
1193         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 500, 200):
1194             err = self.check_exception(num_retries=1)
1195         self.assertRegex(str(err.exception), r'failed to .* after 2 attempts')
1196
1197     def test_num_retries_instance_fallback(self):
1198         self.client_kwargs['num_retries'] = 3
1199         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
1200             self.check_success()
1201
1202
1203 @tutil.skip_sleep
1204 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1205 class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1206     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
1207     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1208     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1209     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1210
1211     def tearDown(self):
1212         DiskCacheBase.tearDown(self)
1213
1214     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1215                    *args, **kwargs):
1216         return self.new_client().get(locator, *args, **kwargs)
1217
1218     def test_specific_exception_when_not_found(self):
1219         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1220             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1221
1222     def test_general_exception_with_mixed_errors(self):
1223         # get should raise a NotFoundError if no server returns the block,
1224         # and a high threshold of servers report that it's not found.
1225         # This test rigs up 50/50 disagreement between two servers, and
1226         # checks that it does not become a NotFoundError.
1227         client = self.new_client(num_retries=0)
1228         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1229             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1230                 client.get(self.HINTED_LOCATOR)
1231             self.assertNotIsInstance(
1232                 exc_check.exception, arvados.errors.NotFoundError,
1233                 "mixed errors raised NotFoundError")
1234
1235     def test_hint_server_can_succeed_without_retries(self):
1236         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1237             self.check_success(locator=self.HINTED_LOCATOR)
1238
1239     def test_try_next_server_after_timeout(self):
1240         with tutil.mock_keep_responses(
1241                 (socket.timeout("timed out"), 200),
1242                 (self.DEFAULT_EXPECT, 200)):
1243             self.check_success(locator=self.HINTED_LOCATOR)
1244
1245     def test_retry_data_with_wrong_checksum(self):
1246         with tutil.mock_keep_responses(
1247                 ('baddata', 200),
1248                 (self.DEFAULT_EXPECT, 200)):
1249             self.check_success(locator=self.HINTED_LOCATOR)
1250
1251
1252 @tutil.skip_sleep
1253 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1254 class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1255     DEFAULT_EXPECT = True
1256     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1257     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1258     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1259
1260     def tearDown(self):
1261         DiskCacheBase.tearDown(self)
1262
1263     def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1264                    *args, **kwargs):
1265         return self.new_client().head(locator, *args, **kwargs)
1266
1267     def test_specific_exception_when_not_found(self):
1268         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1269             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1270
1271     def test_general_exception_with_mixed_errors(self):
1272         # head should raise a NotFoundError if no server returns the block,
1273         # and a high threshold of servers report that it's not found.
1274         # This test rigs up 50/50 disagreement between two servers, and
1275         # checks that it does not become a NotFoundError.
1276         client = self.new_client(num_retries=0)
1277         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1278             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1279                 client.head(self.HINTED_LOCATOR)
1280             self.assertNotIsInstance(
1281                 exc_check.exception, arvados.errors.NotFoundError,
1282                 "mixed errors raised NotFoundError")
1283
1284     def test_hint_server_can_succeed_without_retries(self):
1285         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1286             self.check_success(locator=self.HINTED_LOCATOR)
1287
1288     def test_try_next_server_after_timeout(self):
1289         with tutil.mock_keep_responses(
1290                 (socket.timeout("timed out"), 200),
1291                 (self.DEFAULT_EXPECT, 200)):
1292             self.check_success(locator=self.HINTED_LOCATOR)
1293
1294
1295 @tutil.skip_sleep
1296 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1297 class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase, DiskCacheBase):
1298     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
1299     DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
1300     TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1301
1302     def tearDown(self):
1303         DiskCacheBase.tearDown(self)
1304
1305     def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
1306                    copies=1, *args, **kwargs):
1307         return self.new_client().put(data, copies, *args, **kwargs)
1308
1309     def test_do_not_send_multiple_copies_to_same_server(self):
1310         with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200):
1311             self.check_exception(copies=2, num_retries=3)
1312
1313
1314 class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
1315     class FakeKeepService(object):
1316         def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
1317             self.delay = delay
1318             self.will_succeed = will_succeed
1319             self.will_raise = will_raise
1320             self._result = {}
1321             self._result['headers'] = {}
1322             self._result['headers']['x-keep-replicas-stored'] = str(replicas)
1323             self._result['headers']['x-keep-storage-classes-confirmed'] = 'default={}'.format(replicas)
1324             self._result['body'] = 'foobar'
1325
1326         def put(self, data_hash, data, timeout, headers):
1327             time.sleep(self.delay)
1328             if self.will_raise is not None:
1329                 raise self.will_raise
1330             return self.will_succeed
1331
1332         def last_result(self):
1333             if self.will_succeed:
1334                 return self._result
1335             else:
1336                 return {"status_code": 500, "body": "didn't succeed"}
1337
1338         def finished(self):
1339             return False
1340
1341
1342     def setUp(self):
1343         self.copies = 3
1344         self.pool = arvados.KeepClient.KeepWriterThreadPool(
1345             data = 'foo',
1346             data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1347             max_service_replicas = self.copies,
1348             copies = self.copies
1349         )
1350
1351     def test_only_write_enough_on_success(self):
1352         for i in range(10):
1353             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1354             self.pool.add_task(ks, None)
1355         self.pool.join()
1356         self.assertEqual(self.pool.done(), (self.copies, []))
1357
1358     def test_only_write_enough_on_partial_success(self):
1359         for i in range(5):
1360             ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
1361             self.pool.add_task(ks, None)
1362             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1363             self.pool.add_task(ks, None)
1364         self.pool.join()
1365         self.assertEqual(self.pool.done(), (self.copies, []))
1366
1367     def test_only_write_enough_when_some_crash(self):
1368         for i in range(5):
1369             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1370             self.pool.add_task(ks, None)
1371             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1372             self.pool.add_task(ks, None)
1373         self.pool.join()
1374         self.assertEqual(self.pool.done(), (self.copies, []))
1375
1376     def test_fail_when_too_many_crash(self):
1377         for i in range(self.copies+1):
1378             ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1379             self.pool.add_task(ks, None)
1380         for i in range(self.copies-1):
1381             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1382             self.pool.add_task(ks, None)
1383         self.pool.join()
1384         self.assertEqual(self.pool.done(), (self.copies-1, []))
1385
1386
1387 @tutil.skip_sleep
1388 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1389 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
1390     block_cache = False
1391
1392     # Test put()s that need two distinct servers to succeed, possibly
1393     # requiring multiple passes through the retry loop.
1394
1395     def setUp(self):
1396         self.api_client = self.mock_keep_services(count=2)
1397         self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache))
1398
1399     def tearDown(self):
1400         DiskCacheBase.tearDown(self)
1401
1402     def test_success_after_exception(self):
1403         with tutil.mock_keep_responses(
1404                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1405                 Exception('mock err'), 200, 200) as req_mock:
1406             self.keep_client.put('foo', num_retries=1, copies=2)
1407         self.assertEqual(3, req_mock.call_count)
1408
1409     def test_success_after_retryable_error(self):
1410         with tutil.mock_keep_responses(
1411                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1412                 500, 200, 200) as req_mock:
1413             self.keep_client.put('foo', num_retries=1, copies=2)
1414         self.assertEqual(3, req_mock.call_count)
1415
1416     def test_fail_after_final_error(self):
1417         # First retry loop gets a 200 (can't achieve replication by
1418         # storing again on that server) and a 400 (can't retry that
1419         # server at all), so we shouldn't try a third request.
1420         with tutil.mock_keep_responses(
1421                 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1422                 200, 400, 200) as req_mock:
1423             with self.assertRaises(arvados.errors.KeepWriteError):
1424                 self.keep_client.put('foo', num_retries=1, copies=2)
1425         self.assertEqual(2, req_mock.call_count)
1426
1427
1428 @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
1429 class KeepClientAPIErrorTest(unittest.TestCase, DiskCacheBase):
1430     disk_cache = False
1431
1432     def tearDown(self):
1433         DiskCacheBase.tearDown(self)
1434
1435     def test_api_fail(self):
1436         class ApiMock(object):
1437             def __getattr__(self, r):
1438                 if r == "api_token":
1439                     return "abc"
1440                 elif r == "insecure":
1441                     return False
1442                 elif r == "config":
1443                     return lambda: {}
1444                 else:
1445                     raise arvados.errors.KeepReadError()
1446         keep_client = arvados.KeepClient(api_client=ApiMock(),
1447                                          proxy='', local_store='',
1448                                          block_cache=self.make_block_cache(self.disk_cache))
1449
1450         # The bug this is testing for is that if an API (not
1451         # keepstore) exception is thrown as part of a get(), the next
1452         # attempt to get that same block will result in a deadlock.
1453         # This is why there are two get()s in a row.  Unfortunately,
1454         # the failure mode for this test is that the test suite
1455         # deadlocks, there isn't a good way to avoid that without
1456         # adding a special case that has no use except for this test.
1457
1458         with self.assertRaises(arvados.errors.KeepReadError):
1459             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1460         with self.assertRaises(arvados.errors.KeepReadError):
1461             keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
1462
1463
1464 class KeepDiskCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
1465     def setUp(self):
1466         self.api_client = self.mock_keep_services(count=2)
1467         self.data = b'xyzzy'
1468         self.locator = '1271ed5ef305aadabc605b1609e24c52'
1469         self.disk_cache_dir = tempfile.mkdtemp()
1470
1471     def tearDown(self):
1472         shutil.rmtree(self.disk_cache_dir)
1473
1474     @mock.patch('arvados.util._BaseDirectories.storage_path')
1475     def test_default_disk_cache_dir(self, storage_path):
1476         expected = Path(self.disk_cache_dir)
1477         storage_path.return_value = expected
1478         cache = arvados.keep.KeepBlockCache(disk_cache=True)
1479         storage_path.assert_called_with('keep')
1480         self.assertEqual(cache._disk_cache_dir, str(expected))
1481
1482     @mock.patch('arvados.KeepClient.KeepService.get')
1483     def test_disk_cache_read(self, get_mock):
1484         # confirm it finds an existing cache block when the cache is
1485         # initialized.
1486
1487         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1488         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1489             f.write(self.data)
1490
1491         # block cache should have found the existing block
1492         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1493                                                   disk_cache_dir=self.disk_cache_dir)
1494         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1495
1496         self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1497
1498         get_mock.assert_not_called()
1499
1500     @mock.patch('arvados.KeepClient.KeepService.get')
1501     def test_disk_cache_share(self, get_mock):
1502         # confirm it finds a cache block written after the disk cache
1503         # was initialized.
1504
1505         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1506                                                   disk_cache_dir=self.disk_cache_dir)
1507         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1508
1509         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1510         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1511             f.write(self.data)
1512
1513         # when we try to get the block, it'll check the disk and find it.
1514         self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1515
1516         get_mock.assert_not_called()
1517
1518     def test_disk_cache_write(self):
1519         # confirm the cache block was created
1520
1521         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1522                                                   disk_cache_dir=self.disk_cache_dir)
1523         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1524
1525         with tutil.mock_keep_responses(self.data, 200) as mock:
1526             self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1527
1528         self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1529
1530         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1531             self.assertTrue(tutil.binary_compare(f.read(), self.data))
1532
1533     def test_disk_cache_clean(self):
1534         # confirm that a tmp file in the cache is cleaned up
1535
1536         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1537         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock"), "wb") as f:
1538             f.write(b"abc1")
1539
1540         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC"), "wb") as f:
1541             f.write(b"abc2")
1542
1543         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC"), "wb") as f:
1544             f.write(b"abc3")
1545
1546         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1547         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1548         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1549
1550         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1551                                                   disk_cache_dir=self.disk_cache_dir)
1552
1553         # The tmp still hasn't been deleted because it was created in the last 60 seconds
1554         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1555         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1556         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1557
1558         # Set the mtime to 61s in the past
1559         os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock"), times=(time.time()-61, time.time()-61))
1560         os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC"), times=(time.time()-61, time.time()-61))
1561         os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC"), times=(time.time()-61, time.time()-61))
1562
1563         block_cache2 = arvados.keep.KeepBlockCache(disk_cache=True,
1564                                                    disk_cache_dir=self.disk_cache_dir)
1565
1566         # Tmp should be gone but the other ones are safe.
1567         self.assertFalse(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
1568         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
1569         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
1570
1571     @mock.patch('arvados.KeepClient.KeepService.get')
1572     def test_disk_cache_cap(self, get_mock):
1573         # confirm that the cache is kept to the desired limit
1574
1575         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1576         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1577             f.write(self.data)
1578
1579         os.makedirs(os.path.join(self.disk_cache_dir, "acb"))
1580         with open(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock"), "wb") as f:
1581             f.write(b"foo")
1582
1583         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1584         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1585
1586         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1587                                                    disk_cache_dir=self.disk_cache_dir,
1588                                                    max_slots=1)
1589
1590         self.assertFalse(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1591         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1592
1593     @mock.patch('arvados.KeepClient.KeepService.get')
1594     def test_disk_cache_share(self, get_mock):
1595         # confirm that a second cache doesn't delete files that belong to the first cache.
1596
1597         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1598         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
1599             f.write(self.data)
1600
1601         os.makedirs(os.path.join(self.disk_cache_dir, "acb"))
1602         with open(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock"), "wb") as f:
1603             f.write(b"foo")
1604
1605         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1606         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1607
1608         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1609                                                    disk_cache_dir=self.disk_cache_dir,
1610                                                    max_slots=2)
1611
1612         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1613         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1614
1615         block_cache2 = arvados.keep.KeepBlockCache(disk_cache=True,
1616                                                    disk_cache_dir=self.disk_cache_dir,
1617                                                    max_slots=1)
1618
1619         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
1620         self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
1621
1622     def test_disk_cache_error(self):
1623         os.chmod(self.disk_cache_dir, stat.S_IRUSR)
1624
1625         # Fail during cache initialization.
1626         with self.assertRaises(OSError):
1627             block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1628                                                       disk_cache_dir=self.disk_cache_dir)
1629
1630     def test_disk_cache_write_error(self):
1631         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1632                                                   disk_cache_dir=self.disk_cache_dir)
1633
1634         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1635
1636         # Make the cache dir read-only
1637         os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
1638         os.chmod(os.path.join(self.disk_cache_dir, self.locator[0:3]), stat.S_IRUSR)
1639
1640         # Cache fails
1641         with self.assertRaises(arvados.errors.KeepCacheError):
1642             with tutil.mock_keep_responses(self.data, 200) as mock:
1643                 keep_client.get(self.locator)
1644
1645     def test_disk_cache_retry_write_error(self):
1646         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1647                                                   disk_cache_dir=self.disk_cache_dir)
1648
1649         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1650
1651         called = False
1652         realmmap = mmap.mmap
1653         def sideeffect_mmap(*args, **kwargs):
1654             nonlocal called
1655             if not called:
1656                 called = True
1657                 raise OSError(errno.ENOSPC, "no space")
1658             else:
1659                 return realmmap(*args, **kwargs)
1660
1661         with patch('mmap.mmap') as mockmmap:
1662             mockmmap.side_effect = sideeffect_mmap
1663
1664             cache_max_before = block_cache.cache_max
1665
1666             with tutil.mock_keep_responses(self.data, 200) as mock:
1667                 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1668
1669             self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1670
1671         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1672             self.assertTrue(tutil.binary_compare(f.read(), self.data))
1673
1674         # shrank the cache in response to ENOSPC
1675         self.assertTrue(cache_max_before > block_cache.cache_max)
1676
1677     def test_disk_cache_retry_write_error2(self):
1678         block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
1679                                                   disk_cache_dir=self.disk_cache_dir)
1680
1681         keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
1682
1683         called = False
1684         realmmap = mmap.mmap
1685         def sideeffect_mmap(*args, **kwargs):
1686             nonlocal called
1687             if not called:
1688                 called = True
1689                 raise OSError(errno.ENOMEM, "no memory")
1690             else:
1691                 return realmmap(*args, **kwargs)
1692
1693         with patch('mmap.mmap') as mockmmap:
1694             mockmmap.side_effect = sideeffect_mmap
1695
1696             slots_before = block_cache._max_slots
1697
1698             with tutil.mock_keep_responses(self.data, 200) as mock:
1699                 self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
1700
1701             self.assertIsNotNone(keep_client.get_from_cache(self.locator))
1702
1703         with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
1704             self.assertTrue(tutil.binary_compare(f.read(), self.data))
1705
1706         # shrank the cache in response to ENOMEM
1707         self.assertTrue(slots_before > block_cache._max_slots)